Source code for mne_rt.viz.brain_plot

"""Real-time 3D brain activation display.

Built on PyVista / pyvistaqt with a Qt control panel, hemisphere toggles,
surface switching, view presets, and screenshot support.

Classes
-------
BrainPlot
    Interactive real-time 3D brain surface with NF activity overlay.
"""
from __future__ import annotations

import datetime
import time
from pathlib import Path
from typing import Union

import numpy as np
try:
    import pyvista as pv
    _pyvista_available = True
except ImportError:
    pv = None  # type: ignore[assignment]
    _pyvista_available = False

try:
    from pyvistaqt import BackgroundPlotter as _BackgroundPlotter
    _pyvistaqt_available = True
except ImportError:
    _BackgroundPlotter = None  # type: ignore[assignment]
    _pyvistaqt_available = False

from mne_rt._logging import logger
from mne_rt.tools import setup_surface


# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

_CMAPS = ["hot", "plasma", "viridis", "Reds", "YlOrRd", "RdBu_r"]

_SURFACES = ["inflated", "pial", "white", "sphere"]

# Display modes: label shown in the scalar bar title
_DISPLAY_MODES = [
    "Source Activation",
    "Alpha Power  (8–13 Hz)",
    "Beta Power   (13–30 Hz)",
    "Theta Power  (4–8 Hz)",
    "Gamma Power  (30–80 Hz)",
    "SMR Power    (12–15 Hz)",
]

# Suggested clim ranges for each display mode
_DISPLAY_CLIM_HINTS: dict[str, tuple[float, float]] = {
    "Source Activation":   (0.0, 0.6),
    "Alpha Power  (8–13 Hz)":  (0.0, 0.5),
    "Beta Power   (13–30 Hz)": (0.0, 0.3),
    "Theta Power  (4–8 Hz)":   (0.0, 0.5),
    "Gamma Power  (30–80 Hz)": (0.0, 0.2),
    "SMR Power    (12–15 Hz)": (0.0, 0.4),
}

_VIEW_PRESETS: dict[str, dict] = {
    "lateral_lh": {"position": (-450, 0, 0),  "focal": (0, 0, 0), "up": (0, 0, 1)},
    "lateral_rh": {"position": (450, 0, 0),   "focal": (0, 0, 0), "up": (0, 0, 1)},
    "dorsal":     {"position": (0, 0, 450),   "focal": (0, 0, 0), "up": (0, 1, 0)},
    "frontal":    {"position": (0, -450, 0),  "focal": (0, 0, 0), "up": (0, 0, 1)},
    "ventral":    {"position": (0, 0, -450),  "focal": (0, 0, 0), "up": (0, 1, 0)},
}

# Dark stylesheet for the Qt control panel — matches the dark UI of other plots
_PANEL_DARK_STYLE = """
    QWidget {
        background: #161b22;
        color: #e6edf3;
        font-family: -apple-system, 'Segoe UI', sans-serif;
        font-size: 11px;
    }
    QGroupBox {
        border: 1px solid #30363d;
        border-radius: 6px;
        margin-top: 10px;
        padding-top: 8px;
        color: #8b949e;
        font-weight: 600;
        font-size: 10px;
    }
    QGroupBox::title {
        subcontrol-origin: margin;
        left: 8px;
        padding: 0 4px;
        color: #8b949e;
    }
    QLabel { color: #c9d1d9; background: transparent; }
    QComboBox {
        background: #21262d;
        border: 1px solid #30363d;
        border-radius: 5px;
        padding: 4px 8px;
        color: #e6edf3;
    }
    QComboBox::drop-down { border: none; width: 18px; }
    QComboBox QAbstractItemView {
        background: #161b22;
        border: 1px solid #30363d;
        color: #e6edf3;
        selection-background-color: #1f6feb;
    }
    QSlider::groove:horizontal {
        height: 3px;
        background: #30363d;
        border-radius: 2px;
    }
    QSlider::handle:horizontal {
        background: #58a6ff;
        width: 12px;
        height: 12px;
        margin: -5px 0;
        border-radius: 6px;
    }
    QSlider::sub-page:horizontal {
        background: #388bfd;
        border-radius: 2px;
    }
    QPushButton {
        background: #21262d;
        border: 1px solid #30363d;
        border-radius: 5px;
        padding: 5px 8px;
        color: #c9d1d9;
        font-weight: 500;
    }
    QPushButton:hover { background: #30363d; border-color: #8b949e; color: #e6edf3; }
    QPushButton:pressed { background: #1f6feb; border-color: #388bfd; color: #ffffff; }
    QCheckBox { spacing: 6px; color: #c9d1d9; }
    QCheckBox::indicator {
        width: 13px; height: 13px;
        border-radius: 3px;
        border: 1px solid #30363d;
        background: #21262d;
    }
    QCheckBox::indicator:checked { background: #238636; border-color: #2ea043; }
    QDoubleSpinBox {
        background: #21262d;
        border: 1px solid #30363d;
        border-radius: 5px;
        padding: 3px 6px;
        color: #e6edf3;
    }
    QDoubleSpinBox::up-button, QDoubleSpinBox::down-button {
        background: #30363d; border: none; width: 14px;
    }
    QDockWidget { color: #e6edf3; background: #0d1117; }
    QDockWidget::title {
        background: #161b22;
        border-bottom: 1px solid #30363d;
        padding: 6px 8px;
        color: #e6edf3;
        font-weight: 700;
    }
    QScrollBar:vertical { background: #0d1117; width: 8px; border-radius: 4px; }
    QScrollBar::handle:vertical {
        background: #30363d; border-radius: 4px; min-height: 20px;
    }
"""

# Background colour presets  (bottom_hex, top_hex)
_BACKGROUNDS: dict[str, tuple[str, str]] = {
    "Deep space":   ("#040810", "#0b1628"),
    "Midnight":     ("#050d1a", "#0d1b35"),
    "Slate":        ("#0d1117", "#1c2333"),
    "Charcoal":     ("#111111", "#1e1e1e"),
    "Black":        ("#000000", "#050505"),
    "Light":        ("#e8eaf0", "#ffffff"),   # publication / presentations
}

_DEFAULT_BG = "Midnight"


[docs] class BrainPlot: """Interactive real-time 3D brain activation display. Renders bilateral ``fsaverage`` cortical surfaces with a colour-mapped activity overlay and a Qt control panel docked on the right. Designed to run alongside :class:`~mne_rt.viz.NFPlot` inside a shared Qt event loop — call :meth:`update_from_arrays` or :meth:`update` from the acquisition thread's pump timer. Parameters ---------- subjects_fs_dir : str | Path FreeSurfer subjects directory. The MNE-bundled ``fsaverage`` template is used automatically (auto-downloaded on first use); this path is accepted for API compatibility. clim : tuple of float, default (0.0, 0.6) Initial ``(min, max)`` colour-map range for the activity overlay. hemi_distance : float, default 20.0 Gap in mm between the medial walls of the two hemispheres. surf : {"inflated", "pial", "white", "sphere"}, default "inflated" Initial cortical surface geometry to display. cmap : str, default "hot" Initial colour map name. Must be one of ``["hot", "plasma", "viridis", "Reds", "YlOrRd", "RdBu_r"]``. opacity : float, default 0.6 Initial opacity of the activity overlay (0 = transparent, 1 = opaque). window_size : tuple of int, default (1600, 1000) Width × height of the render window in pixels. display_smoothing : float, default 0.3 EMA factor applied to the per-vertex activation arrays before each render. ``1.0`` disables smoothing; lower values blend consecutive frames so the cortical map transitions smoothly instead of jumping between analysis windows. verbose : bool | str | None, default None Verbosity level. See Also -------- mne_rt.viz.NFPlot : Scrolling real-time NF signal plot. mne_rt.RTStream.record_main : Main NF loop that drives both plots. Notes ----- Activity values are spread from the 10 242 ico-5 source vertices to all 163 842 fsaverage surface vertices via nearest-neighbour interpolation, giving a smooth spatial appearance. .. versionadded:: 1.0.0 """
[docs] def __init__( self, subjects_fs_dir: Union[str, Path], clim: tuple[float, float] = (0.0, 0.6), hemi_distance: float = 20.0, surf: str = "inflated", cmap: str = "hot", opacity: float = 0.6, window_size: tuple[int, int] = (1600, 1000), display_smoothing: float = 0.3, verbose: Union[bool, str, None] = None, ) -> None: if not _pyvista_available or not _pyvistaqt_available: raise ImportError( "pyvista and pyvistaqt are required for BrainPlot. " "Install them with: pip install 'mne-rt[viz]'" ) from mne_rt._logging import set_log_level set_log_level(verbose) if cmap not in _CMAPS: raise ValueError(f"cmap {cmap!r} not recognised. Choose from {_CMAPS}.") if surf not in _SURFACES: raise ValueError(f"surf {surf!r} not recognised. Choose from {_SURFACES}.") self._subjects_fs_dir = Path(subjects_fs_dir) self._clim = list(clim) self._hemi_distance = hemi_distance self._surf = surf self._opacity = opacity self._threshold = 0.0 self._cmap_idx = _CMAPS.index(cmap) self._hemi_visible = {"lh": True, "rh": True} self._bg_name = _DEFAULT_BG self._display_mode = _DISPLAY_MODES[0] self._recording = False self._video_writer = None self._video_path: Path | None = None self._display_alpha = float(np.clip(display_smoothing, 0.0, 1.0)) self._lh_ema: np.ndarray | None = None self._rh_ema: np.ndarray | None = None # Custom band overlay state self._custom_band_hz: tuple[float, float] | None = None # Parcellation border overlay state self._parc_name: str | None = None self._parc_actor = None self._parc_edges: np.ndarray | None = None # (n_edges, 2) vertex index pairs logger.info("Loading %s surface …", surf) self._load_surface(surf) self._plotter = self._build_plotter(window_size) self._sync_scalar_bar_lut() # link scalar bar to activity actor's LUT self._add_key_bindings() self._add_overlays() logger.info("BrainPlot ready.")
# ------------------------------------------------------------------ # Surface management # ------------------------------------------------------------------ def _load_surface(self, surf: str) -> None: ( self._hemi_offsets, self._scalars_full, self._mesh, self._verts_stc, self._nn_map, ) = setup_surface( str(self._subjects_fs_dir), hemi_distance=self._hemi_distance, surf=surf, ) self._n_lh = int(self._hemi_offsets["rh"]) # ------------------------------------------------------------------ # Plotter construction # ------------------------------------------------------------------ def _build_plotter(self, window_size: tuple[int, int]) -> "_BackgroundPlotter": p = _BackgroundPlotter( window_size=tuple(window_size), lighting="three lights", title="MNE-RT — Brain Activation", toolbar=False, menu_bar=False, editor=False, ) # Background gradient bg_bot, bg_top = _BACKGROUNDS[_DEFAULT_BG] p.set_background(bg_bot, top=bg_top) # ── Base sulcal-depth mesh ──────────────────────────────────────── # sulc values: negative = gyri (light), positive = sulci (dark) # "Greys_r" maps low→white, high→black → correct neuroimaging convention self._base_actor = p.add_mesh( self._mesh, scalars="base", cmap="Greys_r", clim=[-1.0, 1.5], smooth_shading=True, show_scalar_bar=False, ) # ── Activity overlay ────────────────────────────────────────────── self._act_actor = p.add_mesh( self._mesh, scalars="activity", cmap=_CMAPS[self._cmap_idx], opacity=self._opacity, clim=self._clim, smooth_shading=True, show_scalar_bar=False, interpolate_before_map=True, nan_opacity=0.0, ) # ── Scalar bar (no title, no tick numbers) ─────────────────────── p.add_scalar_bar( title="", italic=False, vertical=True, position_x=0.02, position_y=0.20, height=0.38, width=0.04, color="white", title_font_size=0, label_font_size=10, n_labels=0, ) # ── Lighting & post-processing ──────────────────────────────────── p.enable_eye_dome_lighting() p.enable_ssao(radius=0.5, bias=0.005, kernel_size=256) p.enable_anti_aliasing("ssaa") p.add_camera_orientation_widget() p.camera_position = "yz" p.camera.azimuth = 45 self._add_control_panel(p) return p # ------------------------------------------------------------------ # Qt control panel # ------------------------------------------------------------------ def _add_control_panel(self, p: "_BackgroundPlotter") -> None: """Dock a Qt control panel on the right side of the brain window.""" from PyQt6.QtWidgets import ( QDockWidget, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QSlider, QComboBox, QCheckBox, QPushButton, QGroupBox, QDoubleSpinBox, ) from PyQt6.QtCore import Qt as Qt_ # ── helpers ────────────────────────────────────────────────────── def _hslider(lo: int = 0, hi: int = 100, val: int = 0) -> QSlider: sl = QSlider(Qt_.Orientation.Horizontal) sl.setRange(lo, hi) sl.setValue(val) return sl def _group(title: str) -> tuple[QGroupBox, QVBoxLayout]: grp = QGroupBox(title) ly = QVBoxLayout(grp) ly.setSpacing(4) ly.setContentsMargins(6, 12, 6, 6) return grp, ly # ── root panel ──────────────────────────────────────────────────── panel = QWidget() panel.setMinimumWidth(210) panel.setMaximumWidth(260) panel.setStyleSheet(_PANEL_DARK_STYLE) root = QVBoxLayout(panel) root.setSpacing(6) root.setContentsMargins(6, 6, 6, 6) # ── Surface ─────────────────────────────────────────────────────── grp, ly = _group("Surface") surf_combo = QComboBox() surf_combo.addItems(_SURFACES) surf_combo.setCurrentText(self._surf) surf_combo.currentTextChanged.connect( lambda s: self.set_surface(s) if s != self._surf else None ) ly.addWidget(surf_combo) root.addWidget(grp) # ── Parcellation ────────────────────────────────────────────────── grp, ly = _group("Parcellation") _PARC_ITEMS = ["None", "aparc (Desikan)", "aparc.a2009s (Destrieux)"] self._parc_combo = QComboBox() self._parc_combo.addItems(_PARC_ITEMS) self._parc_combo.currentTextChanged.connect(self._set_parcellation) ly.addWidget(self._parc_combo) self._parc_status_lbl = QLabel("") self._parc_status_lbl.setWordWrap(True) self._parc_status_lbl.setStyleSheet("color:#8b949e; font-size:9px;") ly.addWidget(self._parc_status_lbl) root.addWidget(grp) # ── Hemispheres ─────────────────────────────────────────────────── grp, ly = _group("Hemispheres") hl = QHBoxLayout() lh_cb = QCheckBox("Left") lh_cb.setChecked(True) rh_cb = QCheckBox("Right") rh_cb.setChecked(True) def _on_lh(state: int) -> None: self._hemi_visible["lh"] = bool(state) self._refresh_scalars() def _on_rh(state: int) -> None: self._hemi_visible["rh"] = bool(state) self._refresh_scalars() lh_cb.stateChanged.connect(_on_lh) rh_cb.stateChanged.connect(_on_rh) hl.addWidget(lh_cb) hl.addWidget(rh_cb) ly.addLayout(hl) root.addWidget(grp) # ── Colormap ────────────────────────────────────────────────────── grp, ly = _group("Colormap") cmap_combo = QComboBox() cmap_combo.addItems(_CMAPS) cmap_combo.setCurrentText(_CMAPS[self._cmap_idx]) def _on_cmap(name: str) -> None: idx = _CMAPS.index(name) if idx == self._cmap_idx: return self._cmap_idx = idx self._sync_scalar_bar_lut() p.render() cmap_combo.currentTextChanged.connect(_on_cmap) ly.addWidget(cmap_combo) root.addWidget(grp) # ── Color range (clim) ──────────────────────────────────────────── grp, ly = _group("Color Range") clim_max_lbl = QLabel(f"max {self._clim[1]:.2f}") clim_max_sl = _hslider(0, 100, int(self._clim[1] * 100)) clim_min_lbl = QLabel(f"min {self._clim[0]:.2f}") clim_min_sl = _hslider(0, 100, int(self._clim[0] * 100)) def _on_clim_max(v: int) -> None: self._clim[1] = v / 100.0 clim_max_lbl.setText(f"max {self._clim[1]:.2f}") self._act_actor.GetMapper().SetScalarRange(*self._clim) p.render() def _on_clim_min(v: int) -> None: self._clim[0] = v / 100.0 clim_min_lbl.setText(f"min {self._clim[0]:.2f}") self._act_actor.GetMapper().SetScalarRange(*self._clim) p.render() clim_max_sl.valueChanged.connect(_on_clim_max) clim_min_sl.valueChanged.connect(_on_clim_min) ly.addWidget(clim_max_lbl) ly.addWidget(clim_max_sl) ly.addWidget(clim_min_lbl) ly.addWidget(clim_min_sl) root.addWidget(grp) # ── Opacity ─────────────────────────────────────────────────────── grp, ly = _group("Opacity") op_lbl = QLabel(f"{self._opacity:.2f}") op_sl = _hslider(0, 100, int(self._opacity * 100)) def _on_opacity(v: int) -> None: self._opacity = v / 100.0 op_lbl.setText(f"{self._opacity:.2f}") self._act_actor.GetProperty().SetOpacity(self._opacity) p.render() op_sl.valueChanged.connect(_on_opacity) ly.addWidget(op_lbl) ly.addWidget(op_sl) root.addWidget(grp) # ── Threshold ───────────────────────────────────────────────────── grp, ly = _group("Threshold (0 → 1)") thr_lbl = QLabel(f"{self._threshold:.3f}") thr_sl = _hslider(0, 100, 0) def _on_threshold(v: int) -> None: self._threshold = v / 100.0 thr_lbl.setText(f"{self._threshold:.3f}") self._refresh_scalars() thr_sl.valueChanged.connect(_on_threshold) ly.addWidget(thr_lbl) ly.addWidget(thr_sl) root.addWidget(grp) # ── Background ──────────────────────────────────────────────────── grp, ly = _group("Background") bg_combo = QComboBox() bg_combo.addItems(list(_BACKGROUNDS.keys())) bg_combo.setCurrentText(_DEFAULT_BG) def _on_bg(name: str) -> None: self._bg_name = name bot, top = _BACKGROUNDS[name] p.set_background(bot, top=top) p.render() bg_combo.currentTextChanged.connect(_on_bg) ly.addWidget(bg_combo) root.addWidget(grp) # ── View presets ────────────────────────────────────────────────── grp, ly = _group("View Presets") btn_row = QHBoxLayout() for label, key, tip in [ ("L", "lateral_lh", "Left lateral"), ("R", "lateral_rh", "Right lateral"), ("D", "dorsal", "Dorsal (top)"), ("F", "frontal", "Frontal"), ("V", "ventral", "Ventral (bottom)"), ]: btn = QPushButton(label) btn.setFixedSize(32, 28) btn.setToolTip(tip) btn.clicked.connect(lambda _=False, k=key: self._set_view(k)) btn_row.addWidget(btn) ly.addLayout(btn_row) root.addWidget(grp) # ── Display Mode ────────────────────────────────────────────────── grp, ly = _group("Display Mode") mode_combo = QComboBox() mode_combo.addItems(_DISPLAY_MODES) mode_combo.setCurrentText(self._display_mode) mode_lbl = QLabel( "<span style='color:#8b949e;font-size:9px;'>" "Configure RTStream modality to match selected mode" "</span>" ) mode_lbl.setWordWrap(True) def _on_display_mode(name: str) -> None: self.set_display_mode(name) mode_combo.currentTextChanged.connect(_on_display_mode) ly.addWidget(mode_combo) ly.addWidget(mode_lbl) root.addWidget(grp) # ── Custom frequency band ───────────────────────────────────────── grp, ly = _group("Custom Band (Hz)") band_row = QHBoxLayout() self._custom_lo_spin = QDoubleSpinBox() self._custom_lo_spin.setRange(0.5, 200.0) self._custom_lo_spin.setValue(13.0) self._custom_lo_spin.setSuffix(" Hz") self._custom_lo_spin.setDecimals(1) dash_lbl2 = QLabel("–") dash_lbl2.setStyleSheet("color:#8b949e;") self._custom_hi_spin = QDoubleSpinBox() self._custom_hi_spin.setRange(1.0, 500.0) self._custom_hi_spin.setValue(30.0) self._custom_hi_spin.setSuffix(" Hz") self._custom_hi_spin.setDecimals(1) band_row.addWidget(self._custom_lo_spin) band_row.addWidget(dash_lbl2) band_row.addWidget(self._custom_hi_spin) ly.addLayout(band_row) btn_band_row = QHBoxLayout() btn_set_band = QPushButton("Set") btn_set_band.clicked.connect(self._set_custom_band) btn_clr_band = QPushButton("Clear") btn_clr_band.clicked.connect(self._clear_custom_band) btn_band_row.addWidget(btn_set_band) btn_band_row.addWidget(btn_clr_band) ly.addLayout(btn_band_row) self._custom_band_lbl = QLabel("No custom band set") self._custom_band_lbl.setStyleSheet("color:#8b949e; font-size:10px;") self._custom_band_lbl.setWordWrap(True) ly.addWidget(self._custom_band_lbl) root.addWidget(grp) # ── Actions ─────────────────────────────────────────────────────── grp, ly = _group("Actions") reset_btn = QPushButton("Reset activity (r)") reset_btn.clicked.connect(self.reset_activity) shot_btn = QPushButton("Screenshot (s)") shot_btn.clicked.connect(lambda: self.screenshot()) ly.addWidget(reset_btn) ly.addWidget(shot_btn) root.addWidget(grp) root.addStretch() # ── Keyboard hint ───────────────────────────────────────────────── hint = QLabel( "<span style='color:#8b949e; font-size:10px;'>" "Keys: 1-5 views · i/p/w/h surface · s screenshot · r reset" "</span>" ) hint.setWordWrap(True) root.addWidget(hint) # ── Dock widget ─────────────────────────────────────────────────── dock = QDockWidget("Brain Controls") dock.setWidget(panel) dock.setFeatures( QDockWidget.DockWidgetFeature.DockWidgetMovable | QDockWidget.DockWidgetFeature.DockWidgetFloatable, ) p.app_window.addDockWidget(Qt_.DockWidgetArea.RightDockWidgetArea, dock) def _add_key_bindings(self) -> None: p = self._plotter p.add_key_event("1", lambda: self._set_view("lateral_lh")) p.add_key_event("2", lambda: self._set_view("lateral_rh")) p.add_key_event("3", lambda: self._set_view("dorsal")) p.add_key_event("4", lambda: self._set_view("frontal")) p.add_key_event("5", lambda: self._set_view("ventral")) p.add_key_event("s", self.screenshot) p.add_key_event("r", self.reset_activity) p.add_key_event("i", lambda: self.set_surface("inflated")) p.add_key_event("p", lambda: self.set_surface("pial")) p.add_key_event("w", lambda: self.set_surface("white")) p.add_key_event("h", lambda: self.set_surface("sphere")) def _add_overlays(self) -> None: self._rec_label = self._plotter.add_text( "", position="lower_right", font_size=12, color="#FF4444", ) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _refresh_scalars(self, deferred: bool = False) -> None: """Recompute display scalars respecting threshold and hemisphere masks.""" display = self._scalars_full.copy() display[display < self._threshold] = np.nan if not self._hemi_visible["lh"]: display[: self._n_lh] = np.nan if not self._hemi_visible["rh"]: display[self._n_lh :] = np.nan self._mesh["activity"] = display if not deferred: self._plotter.render() def _set_view(self, preset: str) -> None: if preset not in _VIEW_PRESETS: return v = _VIEW_PRESETS[preset] self._plotter.camera_position = [v["position"], v["focal"], v["up"]] self._plotter.render() def _rebuild_actors(self) -> None: """Remove and re-add mesh actors after a surface swap.""" p = self._plotter p.remove_actor(self._base_actor) p.remove_actor(self._act_actor) self._remove_parc_actor() self._base_actor = p.add_mesh( self._mesh, scalars="base", cmap="Greys_r", clim=[-1.0, 1.5], smooth_shading=True, show_scalar_bar=False, ) self._act_actor = p.add_mesh( self._mesh, scalars="activity", cmap=_CMAPS[self._cmap_idx], opacity=self._opacity, clim=self._clim, smooth_shading=True, show_scalar_bar=False, interpolate_before_map=True, nan_opacity=0.0, ) self._sync_scalar_bar_lut() # Re-overlay parcellation borders on the new surface geometry if self._parc_edges is not None: self._build_parc_actor() p.render() def _sync_scalar_bar_lut(self) -> None: """Sync the scalar bar LUT with the current activity actor's mapper.""" import matplotlib cmap_obj = matplotlib.colormaps[_CMAPS[self._cmap_idx]] lut = self._act_actor.GetMapper().GetLookupTable() lut.SetNumberOfColors(256) for i in range(256): r, g, b, a = cmap_obj(i / 255.0) lut.SetTableValue(i, r, g, b, a) lut.Build() self._plotter.scalar_bar.SetLookupTable(lut) # ------------------------------------------------------------------ # Parcellation border overlay # ------------------------------------------------------------------ def _set_parcellation(self, parc_label: str) -> None: """Load annotation and draw parcel boundary edges on the surface.""" self._remove_parc_actor() self._parc_edges = None self._parc_name = None if parc_label == "None": if hasattr(self, "_parc_status_lbl"): self._parc_status_lbl.setText("") return parc_id = parc_label.split()[0] # "aparc (Desikan)" → "aparc" logger.info("Loading parcellation %r …", parc_id) if hasattr(self, "_parc_status_lbl"): self._parc_status_lbl.setText("Loading …") try: import mne from pathlib import Path as _Path fs_dir = _Path(mne.datasets.fetch_fsaverage(verbose=False)) subjects_dir = str(fs_dir.parent) labels_lh = mne.read_labels_from_annot( "fsaverage", parc=parc_id, hemi="lh", subjects_dir=subjects_dir, verbose=False, ) labels_rh = mne.read_labels_from_annot( "fsaverage", parc=parc_id, hemi="rh", subjects_dir=subjects_dir, verbose=False, ) n_lh = self._n_lh n_rh = self._mesh.n_points - n_lh parcel_lh = np.full(n_lh, -1, dtype=np.int32) parcel_rh = np.full(n_rh, -1, dtype=np.int32) for i, lbl in enumerate(labels_lh): verts = lbl.vertices[lbl.vertices < n_lh] parcel_lh[verts] = i for i, lbl in enumerate(labels_rh): verts = lbl.vertices[lbl.vertices < n_rh] parcel_rh[verts] = i parcel_all = np.concatenate([parcel_lh, parcel_rh]) # Extract triangle edges and keep those crossing parcel boundaries faces = self._mesh.faces.reshape(-1, 4)[:, 1:] # (n_faces, 3) e01 = np.sort(faces[:, :2], axis=1) e12 = np.sort(faces[:, 1:], axis=1) e02 = np.sort(faces[:, [0, 2]], axis=1) all_edges = np.unique(np.vstack([e01, e12, e02]), axis=0) mask = parcel_all[all_edges[:, 0]] != parcel_all[all_edges[:, 1]] self._parc_edges = all_edges[mask] self._parc_name = parc_id self._build_parc_actor() n_bnd = len(self._parc_edges) logger.info("Parcellation %r: %d boundary edges", parc_id, n_bnd) if hasattr(self, "_parc_status_lbl"): self._parc_status_lbl.setText(f"{n_bnd:,} boundary edges") except Exception as exc: self._parc_edges = None self._parc_name = None logger.warning("Parcellation load failed: %s", exc) if hasattr(self, "_parc_status_lbl"): self._parc_status_lbl.setText(f"Error: {exc}") def _build_parc_actor(self) -> None: """Create a pyvista line mesh from stored boundary edges and add it.""" if self._parc_edges is None or len(self._parc_edges) == 0: return pts = self._mesh.points n_segs = len(self._parc_edges) seg_pts = np.empty((n_segs * 2, 3), dtype=np.float32) seg_pts[0::2] = pts[self._parc_edges[:, 0]] seg_pts[1::2] = pts[self._parc_edges[:, 1]] lines_conn = np.empty(n_segs * 3, dtype=np.int_) lines_conn[0::3] = 2 lines_conn[1::3] = np.arange(0, n_segs * 2, 2) lines_conn[2::3] = np.arange(1, n_segs * 2, 2) bnd = pv.PolyData(seg_pts, lines=lines_conn) self._parc_actor = self._plotter.add_mesh( bnd, color="#ffffff", line_width=2.0, render_lines_as_tubes=False, show_scalar_bar=False, ) def _remove_parc_actor(self) -> None: if self._parc_actor is not None: self._plotter.remove_actor(self._parc_actor) self._parc_actor = None # ------------------------------------------------------------------ # Custom frequency band # ------------------------------------------------------------------ def _set_custom_band(self) -> None: lo = self._custom_lo_spin.value() hi = self._custom_hi_spin.value() if lo >= hi: return self._custom_band_hz = (lo, hi) self._display_mode = f"Custom {lo:.1f}{hi:.1f} Hz" self._custom_band_lbl.setText(f"Active: {lo:.1f}{hi:.1f} Hz") self._custom_band_lbl.setStyleSheet("color:#80d8ff; font-size:10px;") logger.info("BrainPlot custom band → %.1f%.1f Hz", lo, hi) def _clear_custom_band(self) -> None: self._custom_band_hz = None self._display_mode = _DISPLAY_MODES[0] self._custom_band_lbl.setText("No custom band set") self._custom_band_lbl.setStyleSheet("color:#8b949e; font-size:10px;") logger.info("BrainPlot custom band cleared") @property def custom_band(self) -> tuple[float, float] | None: """Active custom frequency band ``(lo, hi)`` in Hz, or ``None``. Read this in the acquisition loop alongside :attr:`display_mode` to decide which frequency range to pass to :meth:`update_from_arrays`. When not ``None`` it overrides the built-in band labels. """ return self._custom_band_hz # ------------------------------------------------------------------ # Public interface # ------------------------------------------------------------------
[docs] def set_surface(self, surf: str) -> None: """Switch the cortical surface geometry. Parameters ---------- surf : {"inflated", "pial", "white", "sphere"} Target surface geometry. """ if surf not in _SURFACES: raise ValueError(f"surf {surf!r} not recognised. Choose from {_SURFACES}.") if surf == self._surf: return logger.info("Switching surface: %s%s", self._surf, surf) saved_scalars = self._scalars_full.copy() self._surf = surf self._load_surface(surf) if len(saved_scalars) == len(self._scalars_full): self._scalars_full[:] = saved_scalars self._mesh["activity"] = saved_scalars self._rebuild_actors() logger.info("Surface changed to %r.", surf)
[docs] def update_from_arrays( self, lh_scalars: np.ndarray, rh_scalars: np.ndarray, mode: str = "power", deferred: bool = False, ) -> None: """Update the brain display from pre-computed per-source-vertex scalars. Source values (10 242 per hemisphere) are spread to all 163 842 surface vertices via nearest-neighbour interpolation. Parameters ---------- lh_scalars : ndarray, shape (10242,) Activity values for left-hemisphere source vertices. rh_scalars : ndarray, shape (10242,) Activity values for right-hemisphere source vertices. mode : str, default "power" Kept for API symmetry; unused internally. deferred : bool, default False If ``True``, skip the immediate ``render()`` call. """ if self._display_alpha < 1.0: if self._lh_ema is None: self._lh_ema = lh_scalars.copy() self._rh_ema = rh_scalars.copy() else: a = self._display_alpha self._lh_ema = a * lh_scalars + (1.0 - a) * self._lh_ema self._rh_ema = a * rh_scalars + (1.0 - a) * self._rh_ema lh_scalars = self._lh_ema rh_scalars = self._rh_ema n_lh = self._n_lh self._scalars_full[:n_lh] = lh_scalars[self._nn_map["lh"]] self._scalars_full[n_lh:] = rh_scalars[self._nn_map["rh"]] self._refresh_scalars(deferred=deferred)
[docs] def update( self, stc, mode: str = "power", interval: float = 0.05, ) -> None: """Animate the brain from a source time-course estimate. Parameters ---------- stc : mne.SourceEstimate Source estimate returned by ``apply_inverse_raw`` or similar. mode : {"power", "activation"}, default "power" ``"power"`` displays mean squared amplitude; ``"activation"`` displays the time-averaged amplitude. interval : float, default 0.05 Seconds to pause between frame updates. """ n_times = stc.lh_data.shape[1] block = max(n_times // 2, 1) n_lh = self._n_lh for b_start in range(0, n_times, block): b_end = min(b_start + block, n_times) for hemi in ("lh", "rh"): raw_d = stc.rh_data if hemi == "rh" else stc.lh_data chunk = raw_d[:, b_start:b_end] src_vals = ( np.mean(chunk ** 2, axis=1) if mode == "power" else chunk.mean(axis=1) ) nn = self._nn_map[hemi] if hemi == "lh": self._scalars_full[:n_lh] = src_vals[nn] else: self._scalars_full[n_lh:] = src_vals[nn] self._refresh_scalars() time.sleep(interval)
[docs] def set_display_mode(self, mode: str) -> None: """Switch the display mode label and reset clim to sensible defaults. The display mode is a **label only** — it tells both the operator and the visualisation what kind of values are being streamed in. Configure :meth:`~mne_rt.RTStream.record_main` with the matching ``modality`` to pass the correct data. Parameters ---------- mode : str One of :data:`_DISPLAY_MODES`. Choosing a band-power mode will also set the colour-range (clim) to a typical power scale; choosing "Source Activation" restores the activation scale. Notes ----- For **band power** modes pass per-source-vertex spectral power (e.g., from ``modality="source_power"``) to :meth:`update_from_arrays`. For **activation** mode pass eLORETA / dSPM amplitude values from :meth:`update` (``stc`` from MNE inverse operator). """ if mode not in _DISPLAY_MODES: raise ValueError(f"mode {mode!r} not recognised. Choose from {_DISPLAY_MODES}.") self._display_mode = mode clim_hint = _DISPLAY_CLIM_HINTS.get(mode, (0.0, 0.6)) self._clim[0] = clim_hint[0] self._clim[1] = clim_hint[1] self._act_actor.GetMapper().SetScalarRange(*self._clim) self._plotter.render() logger.info("BrainPlot display mode → %r (clim %s)", mode, clim_hint)
@property def display_mode(self) -> str: """Current display mode string (e.g. ``"Alpha Power (8–13 Hz)"``). Read this in the acquisition loop to decide which modality to compute and pass to :meth:`update_from_arrays`. """ return self._display_mode
[docs] def reset_activity(self) -> None: """Zero out all activity scalars and refresh the display.""" self._scalars_full[:] = 0.0 self._refresh_scalars()
# ------------------------------------------------------------------ # Video recording # ------------------------------------------------------------------ def _toggle_recording(self) -> None: if self._recording: self.stop_recording() else: self.record_video()
[docs] def record_video(self, path: Union[str, Path, None] = None) -> Path: """Start recording the brain display to an MP4 video file. Parameters ---------- path : str | Path | None, default None Destination file. Defaults to ``~/ant_brain_<timestamp>.mp4``. Returns ------- path : Path """ import imageio if self._recording: raise RuntimeError("Already recording. Call stop_recording() first.") if path is None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") path = Path.home() / f"ant_brain_{ts}.mp4" path = Path(path) self._video_writer = imageio.get_writer( str(path), fps=24, macro_block_size=None, plugin="ffmpeg" ) self._video_path = path self._recording = True self._rec_label.SetInput("● REC") self._plotter.render() logger.info("Brain video recording started: %s", path) return path
[docs] def stop_recording(self) -> Union[Path, None]: """Stop recording and finalise the video file. Returns ------- path : Path | None """ if not self._recording: return None if self._video_writer is not None: self._video_writer.close() self._video_writer = None self._recording = False self._rec_label.SetInput("") self._plotter.render() path = self._video_path self._video_path = None logger.info("Brain video saved: %s", path) return path
[docs] def write_frame_if_recording(self) -> None: """Capture a video frame if recording is active (no-op otherwise).""" if not self._recording or self._video_writer is None: return try: frame = self._plotter.screenshot(return_img=True) self._video_writer.append_data(frame) except Exception: pass
[docs] def screenshot(self, path: Union[str, Path, None] = None) -> Path: """Save a PNG screenshot of the current brain view. Parameters ---------- path : str | Path | None, default None Destination file. Defaults to ``~/ant_brain_<timestamp>.png``. Returns ------- path : Path """ if path is None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") path = Path.home() / f"ant_brain_{ts}.png" self._plotter.screenshot(str(path)) logger.info("Screenshot saved: %s", path) return Path(path)
@property def plotter(self) -> "_BackgroundPlotter": """The underlying :class:`pyvistaqt.BackgroundPlotter` instance.""" return self._plotter @property def surf(self) -> str: """Currently displayed surface geometry name.""" return self._surf