mne_rt.RTEpochs#

class mne_rt.RTEpochs(event_id: dict[str, int], event_channels: str | list[str], tmin: float = -0.2, tmax: float = 0.8, baseline: tuple | None = (None, 0), picks: str | list | None = None, reject: dict | None = None, bufsize: int = 200, on_trial: Callable | None = None, verbose: bool | str | None = None)[source]#

Bases: object

Event-triggered epoch accumulator backed by mne_lsl.stream.EpochsStream.

Connects a StreamLSL to an EpochsStream, polls for new epochs, and optionally drives an TopoPlot that redraws after every new trial.

Parameters:
event_iddict[str, int]

Condition label → marker integer, e.g. {"target": 1, "standard": 2}.

event_channelsstr or list of str

Channel(s) in the LSL stream that carry the event codes (e.g. "STI 014" for a STIM channel, or "stim").

tminfloat, default -0.2

Epoch start in seconds relative to the event.

tmaxfloat, default 0.8

Epoch end in seconds relative to the event.

baselinetuple or None, default (None, 0)

Baseline interval passed to EpochsStream. None disables correction.

picksstr or list or None, default None

Channel selection forwarded to EpochsStream.

rejectdict or None, default None

Peak-to-peak rejection thresholds, e.g. {"eeg": 150e-6}.

bufsizeint, default 200

Number of epochs to keep in the EpochsStream internal ring buffer.

on_trialcallable() or None, default None

Optional callback fired after every accepted epoch:

def on_trial(n_accepted, data, event_code, condition):
    ...

new_data is (n_new, n_channels, n_times); all_events is the current events array.

verbosebool or str or None, default None
Attributes:
epochs_stream_mne_lsl.stream.EpochsStream or None

The underlying EpochsStream after connect_to_lsl() has been called.

n_accepted_int

Running count of accepted epochs since run() started.

See also

mne_rt.viz.TopoPlot

Live scalp-layout ERP display driven by this class.

mne_rt.viz.EpochPlot

Scrolling raw viewer with trigger/epoch overlays.

mne_rt.RTStream

Continuous sliding-window stream processor.

Examples

>>> rt = RTEpochs(
...     event_id={"auditory": 1, "visual": 2},
...     event_channels="STI 014",
...     tmin=-0.2, tmax=0.5,
... )
>>> rt.connect_to_lsl(mock_lsl=True, fname="sample_raw.fif")
>>> rt.run(n_trials=20, show_erp=True)

Added in version 1.0.0.

__init__(event_id: dict[str, int], event_channels: str | list[str], tmin: float = -0.2, tmax: float = 0.8, baseline: tuple | None = (None, 0), picks: str | list | None = None, reject: dict | None = None, bufsize: int = 200, on_trial: Callable | None = None, verbose: bool | str | None = None) None[source]#

Methods

__init__(event_id, event_channels[, tmin, ...])

connect_to_lsl([stream_name, mock_lsl, ...])

Connect to an LSL stream and set up the EpochsStream.

disconnect()

Disconnect EpochsStream, StreamLSL, and stop any mock player.

get_epochs()

Return accumulated epochs as mne.EpochsArray.

get_evoked()

Return per-condition grand-average as mne.EvokedArray objects.

get_source(inverse_operator[, lambda2, method])

Apply a pre-computed inverse operator to the current grand averages.

run([n_trials, show_erp, erp_update_every, ...])

Run the epoch accumulation loop.

save(path[, overwrite])

Save accumulated epochs to a -epo.fif file mid-run.

stop()

Signal the run loop to stop after the current poll.

connect_to_lsl(stream_name: str | None = None, mock_lsl: bool = False, fname: str | None = None, timeout: float = 10.0, verbose: bool | str | None = None) RTEpochs[source]#

Connect to an LSL stream and set up the EpochsStream.

Parameters:
stream_namestr or None

LSL stream name. None picks the first available stream.

mock_lslbool

Replay fname via PlayerLSL.

fnamestr or None

Path to a .fif file (required when mock_lsl=True).

timeoutfloat

LSL connection timeout in seconds.

verbosebool or str or None
Returns:
selfRTEpochs
run(n_trials: int = 100, show_erp: bool = False, erp_update_every: int = 1, poll_interval: float = 0.05, verbose: bool | str | None = None) RTEpochs[source]#

Run the epoch accumulation loop.

Polls n_new_epochs and retrieves data in batches. Blocks until n_trials accepted epochs have been collected or stop() is called.

Parameters:
n_trialsint, default 100

Stop after this many accepted epochs.

show_erpbool, default False

Open an TopoPlot that redraws every erp_update_every accepted epochs.

erp_update_everyint, default 1

ERP redraw cadence in number of accepted epochs.

poll_intervalfloat, default 0.05

Seconds to sleep between polling n_new_epochs.

verbosebool or str or None
Returns:
selfRTEpochs
stop() None[source]#

Signal the run loop to stop after the current poll.

disconnect() None[source]#

Disconnect EpochsStream, StreamLSL, and stop any mock player.

get_epochs() mne.EpochsArray[source]#

Return accumulated epochs as mne.EpochsArray.

Can be called mid-run or after run() completes. The returned object contains all epochs accepted so far and uses the real mne.Info from the underlying stream (including channel positions and digitisation points).

Returns:
epochsmne.EpochsArray

Shape (n_accepted, n_channels, n_times).

Raises:
RuntimeError

If called before connect_to_lsl().

Examples

>>> rt.run(n_trials=50, show_erp=True)
>>> epochs = rt.get_epochs()
>>> epochs.plot_image()
get_evoked() dict[str, mne.EvokedArray][source]#

Return per-condition grand-average as mne.EvokedArray objects.

Useful for immediate offline analysis, plotting with mne.viz.plot_evoked(), or source localisation via get_source().

Returns:
evokeddict[str, mne.EvokedArray]

Mapping condition_label EvokedArray. Conditions with zero accepted epochs are omitted.

Examples

>>> evoked = rt.get_evoked()
>>> mne.viz.plot_evoked(evoked["auditory/left"])
save(path: str, overwrite: bool = False) None[source]#

Save accumulated epochs to a -epo.fif file mid-run.

The file can be reloaded offline with mne.read_epochs(path) and the full MNE analysis pipeline applied.

Parameters:
pathstr

Destination path. Should end with -epo.fif or -epo.fif.gz to follow MNE naming conventions.

overwritebool, default False

Overwrite an existing file.

Examples

>>> rt.run(n_trials=30)
>>> rt.save("session01-epo.fif", overwrite=True)
get_source(inverse_operator, lambda2: float = 0.1111111111111111, method: str = 'dSPM') dict[str, mne.SourceEstimate][source]#

Apply a pre-computed inverse operator to the current grand averages.

Wraps mne.minimum_norm.apply_inverse() — load an existing inverse operator with mne.minimum_norm.read_inverse_operator(fname).

Parameters:
inverse_operatormne.minimum_norm.InverseOperator

Pre-computed inverse operator matching the stream’s Info (same channels, same channel order).

lambda2float, default 1/9

Regularisation parameter (1 / SNR²). Use 1/9 for SNR ≈ 3 (typical ERP), 1.0 for noisy single-trial data.

methodstr, default “dSPM”

Inverse method: "MNE", "dSPM", "sLORETA", or "eLORETA".

Returns:
stc_dictdict[str, mne.SourceEstimate]

Condition label → source estimate (vertex × time).

Examples

>>> inv_op = mne.minimum_norm.read_inverse_operator("sample-inv.fif")
>>> stc = rt.get_source(inv_op)
>>> brain = mne_rt.BrainPlot(subject="sample", subjects_dir=sd)
>>> brain.update(stc["auditory/left"].data.mean(-1))