"""LSL stream outlet for broadcasting real-time feature values.
:class:`LSLSender` creates a Lab Streaming Layer (LSL) outlet that pushes
computed feature values as a float-channel stream. Any LSL-aware application
(Psychtoolbox, PsychoPy, OpenViBE, BCI2000, another MNE-RT instance, …) can
subscribe to this stream and use the values for stimulus control or
further analysis.
This is faster and more reliable than OSC for same-machine communication
because it uses shared memory / localhost TCP rather than UDP, and LSL
handles timestamping, buffering, and clock synchronisation automatically.
Use :class:`~mne_rt.osc.OSCSender` instead when the feedback application runs
on a *different machine* and supports OSC but not LSL.
Classes
-------
LSLSender
Thread-safe LSL outlet that pushes NF values to downstream subscribers.
Examples
--------
Send alpha power into an LSL stream named ``"ANT_NF"``::
sender = LSLSender(stream_name="ANT_NF", n_channels=1)
sender.push(["sensor_power"], [0.42])
sender.close()
Pass to :meth:`~mne_rt.RTStream.record_main` alongside (or instead of) OSC::
nf.record_main(duration=300, modality="sensor_power",
lsl_sender=LSLSender())
"""
from __future__ import annotations
import threading
from typing import Sequence
[docs]
class LSLSender:
"""Thread-safe LSL outlet that broadcasts NF feature values.
Creates a single-source LSL stream with ``n_channels`` float32 channels
(one per active NF modality). Channel labels are set from the modality
names on the first :meth:`push` call.
Parameters
----------
stream_name : str, default "ANT_NF"
LSL stream name visible to subscribers.
stream_type : str, default "NF"
LSL content type (arbitrary string; "NF" is ANT convention).
n_channels : int, default 8
Maximum number of channels in the outlet. If fewer modalities are
active, unused channels are filled with ``0.0``. You can leave
this at the default and the outlet will resize automatically on
first push if needed.
srate : float, default 0.0
Nominal sample rate in Hz. ``0.0`` marks the stream as irregular
(i.e. one sample per NF window, not a fixed rate).
source_id : str, default "ant_nf_outlet"
Unique source identifier embedded in the stream info.
Raises
------
ImportError
If neither ``mne_lsl`` nor ``pylsl`` is installed.
Examples
--------
Basic usage::
sender = LSLSender(stream_name="ANT_NF")
sender.push(["sensor_power", "erd_ers"], [0.42, -1.2])
sender.close()
Context-manager usage::
with LSLSender() as sender:
for value in nf_stream:
sender.push(["sensor_power"], [value])
.. versionadded:: 1.0.0
"""
[docs]
def __init__(
self,
stream_name: str = "ANT_NF",
stream_type: str = "NF",
n_channels: int = 8,
srate: float = 0.0,
source_id: str = "ant_nf_outlet",
) -> None:
self._StreamInfo, self._StreamOutlet = self._import_lsl()
self.stream_name = stream_name
self.stream_type = stream_type
self.srate = srate
self.source_id = source_id
self._n_channels = n_channels
self._outlet = None
self._channel_labels: list[str] = []
self._lock = threading.Lock()
self._outlet = self._make_outlet(n_channels)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _import_lsl():
"""Return (StreamInfo, StreamOutlet) from whichever LSL binding is available."""
try:
from mne_lsl.lsl import StreamInfo, StreamOutlet
return StreamInfo, StreamOutlet
except ImportError:
pass
try:
from pylsl import StreamInfo, StreamOutlet # type: ignore[no-redef]
return StreamInfo, StreamOutlet
except ImportError as exc:
raise ImportError(
"LSLSender requires mne_lsl or pylsl.\n"
"Install ANT with its standard dependencies: pip install ANT\n"
"mne_lsl is a core dependency and should already be present."
) from exc
def _make_outlet(self, n_channels: int):
info = self._StreamInfo(
name=self.stream_name,
stype=self.stream_type,
n_channels=n_channels,
sfreq=self.srate,
dtype="float32",
source_id=self.source_id,
)
return self._StreamOutlet(info)
def _ensure_channels(self, n: int) -> None:
"""Recreate the outlet if the channel count needs to grow."""
if n <= self._n_channels:
return
self._outlet.close() if hasattr(self._outlet, "close") else None
self._n_channels = n
self._outlet = self._make_outlet(n)
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
[docs]
def push(
self,
modalities: Sequence[str],
values: Sequence[float],
) -> None:
"""Push one NF sample into the LSL outlet.
Parameters
----------
modalities : sequence of str
Active modality names (used to set channel labels on first call).
values : sequence of float
Corresponding NF feature values, same length as *modalities*.
Raises
------
ValueError
If ``modalities`` and ``values`` have different lengths.
"""
if len(modalities) != len(values):
raise ValueError(
f"modalities and values must have the same length; "
f"got {len(modalities)} and {len(values)}."
)
n = len(values)
with self._lock:
self._ensure_channels(n)
# Pad with zeros if outlet has more channels than active modalities
sample = list(values) + [0.0] * (self._n_channels - n)
self._outlet.push_sample(sample)
self._channel_labels = list(modalities)
[docs]
def push_value(self, modality: str, value: float) -> None:
"""Push a single-channel NF value.
Parameters
----------
modality : str
Modality name.
value : float
NF feature value.
"""
self.push([modality], [value])
[docs]
def close(self) -> None:
"""Destroy the LSL outlet and release resources.
After calling this the sender should not be used again.
"""
with self._lock:
if self._outlet is not None:
try:
if hasattr(self._outlet, "close"):
self._outlet.close()
del self._outlet
except Exception:
pass
self._outlet = None
# ------------------------------------------------------------------
# Properties
# ------------------------------------------------------------------
@property
def n_channels(self) -> int:
"""Current number of channels in the LSL outlet."""
return self._n_channels
@property
def channel_labels(self) -> list[str]:
"""Modality names from the most recent :meth:`push` call."""
return list(self._channel_labels)
def __repr__(self) -> str:
return (
f"LSLSender(stream_name={self.stream_name!r}, "
f"n_channels={self._n_channels}, "
f"active={self._outlet is not None})"
)
def __enter__(self) -> "LSLSender":
return self
def __exit__(self, *_) -> None:
self.close()