Source code for mne_rt.osc

"""Open Sound Control (OSC) output for the MNE-RT.

Allows real-time feature values to be streamed in real-time to any OSC-capable
application — Max/MSP, SuperCollider, Pure Data, TouchDesigner, Unity, etc.

Requirements
------------
``python-osc`` is bundled with ANT (no extra install needed).  If for
any reason it is missing, :class:`OSCSender` raises :exc:`ImportError`
at construction time.

OSC address scheme
------------------
Each modality is sent to a unique address::

    /ant/<modality>   float32   <value>

All active modalities can also be bundled into a single UDP datagram::

    /ant/bundle   str str ...   "mod1 mod2 ..."   float32 float32 ...

Examples
--------
Send alpha power to SuperCollider on localhost::

    sender = OSCSender(host="127.0.0.1", port=57120)
    sender.send("sensor_power", 0.42)
    sender.close()

Or pass it to :meth:`~mne_rt.RTStream.record_main`::

    nf.record_main(duration=300, modality="sensor_power",
                   osc_sender=OSCSender(port=9000))

Classes
-------
OSCSender
    Thread-safe OSC client for real-time NF data streaming.
"""
from __future__ import annotations

import threading
from typing import Sequence, Union


[docs] class OSCSender: """Thread-safe OSC client that broadcasts NF feature values. Sends one UDP packet per NF value (or one bundle per update cycle) to an OSC server at the given host/port. Safe to call from any thread. Parameters ---------- host : str, default "127.0.0.1" Destination IP address or hostname. port : int, default 9000 Destination UDP port. prefix : str, default "/ant" OSC address prefix. Each modality is sent to ``<prefix>/<modality>``. bundle : bool, default False If ``True``, pack all modality values into a single OSC bundle per update cycle (one UDP packet) instead of one packet per modality. Raises ------ ImportError If ``python-osc`` is not installed (should not occur with a standard ANT install). Examples -------- Basic usage:: sender = OSCSender(host="127.0.0.1", port=9000) sender.send("sensor_power", 0.35) sender.send_all(["sensor_power", "erd_ers"], [0.35, -12.4]) sender.close() Custom prefix (maps to ``/nf/sensor_power``):: sender = OSCSender(prefix="/nf") .. versionadded:: 1.0.0 """
[docs] def __init__( self, host: str = "127.0.0.1", port: int = 9000, prefix: str = "/ant", bundle: bool = False, ) -> None: try: from pythonosc.udp_client import SimpleUDPClient # type: ignore from pythonosc.osc_bundle_builder import OscBundleBuilder # type: ignore from pythonosc.osc_message_builder import OscMessageBuilder # type: ignore import pythonosc.osc_bundle as _osc_bundle self._SimpleUDPClient = SimpleUDPClient self._OscBundleBuilder = OscBundleBuilder self._OscMessageBuilder = OscMessageBuilder self._osc_bundle = _osc_bundle except ImportError as exc: raise ImportError( "python-osc is required for OSC output. " "Re-install ANT or run: pip install python-osc" ) from exc self.host = host self.port = port self.prefix = prefix.rstrip("/") self.bundle = bundle self._client = SimpleUDPClient(host, port) self._lock = threading.Lock()
# ------------------------------------------------------------------ # Public interface # ------------------------------------------------------------------
[docs] def send(self, modality: str, value: float) -> None: """Send a single NF value immediately. Parameters ---------- modality : str Modality name (e.g. ``"sensor_power"``). Appended to the OSC prefix to form the address. value : float Current NF feature value. Notes ----- The OSC message sent is:: <prefix>/<modality> float32 <value> """ address = f"{self.prefix}/{modality}" with self._lock: self._client.send_message(address, float(value))
[docs] def send_all( self, modalities: Sequence[str], values: Sequence[float], ) -> None: """Send all NF values for one update cycle. When ``bundle=True`` (set in :meth:`__init__`), packs everything into a single OSC bundle datagram. Otherwise sends one message per modality. Parameters ---------- modalities : sequence of str Active modality names, in order. values : sequence of float Corresponding feature values. Raises ------ ValueError If ``modalities`` and ``values`` have different lengths. """ if len(modalities) != len(values): raise ValueError( f"modalities and values must have the same length; " f"got {len(modalities)} and {len(values)}." ) with self._lock: if self.bundle: self._send_bundle(modalities, values) else: for mod, val in zip(modalities, values): self._client.send_message( f"{self.prefix}/{mod}", float(val) )
[docs] def send_raw(self, address: str, *args) -> None: """Send an arbitrary OSC message to a custom address. Parameters ---------- address : str Full OSC address (e.g. ``"/my/custom/path"``). *args OSC arguments (int, float, str, bytes). """ with self._lock: self._client.send_message(address, list(args) if len(args) > 1 else (args[0] if args else 0))
[docs] def close(self) -> None: """Close the underlying UDP socket. After calling this method the sender should not be used again. """ try: self._client._sock.close() except Exception: pass
# ------------------------------------------------------------------ # Properties # ------------------------------------------------------------------ @property def target(self) -> str: """Human-readable target as ``"host:port"``.""" return f"{self.host}:{self.port}" def __repr__(self) -> str: return ( f"OSCSender(host={self.host!r}, port={self.port}, " f"prefix={self.prefix!r}, bundle={self.bundle})" ) def __enter__(self) -> "OSCSender": return self def __exit__(self, *_) -> None: self.close() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _send_bundle( self, modalities: Sequence[str], values: Sequence[float], ) -> None: """Pack all messages into one OSC bundle and dispatch.""" import pythonosc.osc_bundle_builder as bb import pythonosc.osc_message_builder as mb import time builder = bb.OscBundleBuilder(bb.IMMEDIATELY) for mod, val in zip(modalities, values): msg = mb.OscMessageBuilder(address=f"{self.prefix}/{mod}") msg.add_arg(float(val)) builder.add_content(msg.build()) bundle = builder.build() self._client._sock.sendto(bundle.dgram, (self.host, self.port))