Source code for mne_rt.combiners

"""Feature combiners for multi-modality real-time M/EEG processing.

When MNE-RT extracts several feature values in parallel (e.g. ``sensor_power``,
``laterality``, ``connectivity_ratio``), each produces its own numeric value
per window.  A :class:`FeatureCombiner` reduces those N values to a single
*mixed* output score that can be passed to a protocol or displayed as one trace.

Pipeline position::

    feature extraction  →  FeatureCombiner.combine()  →  Protocol  →  display

Quick examples::

    from mne_rt.combiners import WeightedSumCombiner, ZScoredNormCombiner

    # Weighted blend: 60 % alpha power, 40 % laterality
    combiner = WeightedSumCombiner(
        weights={"sensor_power": 0.6, "laterality": 0.4}
    )
    mixed = combiner.combine({"sensor_power": 1.5, "laterality": 0.3})

    # Unit-free deviation score: how far are we from baseline across all features?
    combiner = ZScoredNormCombiner(
        features=["sensor_power", "laterality", "connectivity_ratio"],
        warmup=30,
    )
    mixed = combiner.combine({"sensor_power": 1.5, "laterality": 0.3,
                               "connectivity_ratio": 0.7})

Classes
-------
FeatureCombiner
    Abstract base class.  All combiners implement :meth:`combine`.
WeightedSumCombiner
    Weighted linear combination of feature values.
GeometricMeanCombiner
    Geometric mean (suitable when features are positive ratios or powers).
ZScoredNormCombiner
    Per-feature z-score normalisation followed by Euclidean norm.
LearnedCombiner
    Data-driven combination via a fitted sklearn-compatible estimator.
"""
from __future__ import annotations

import math
import warnings
from typing import Any, Optional


# ---------------------------------------------------------------------------
# Base class
# ---------------------------------------------------------------------------

[docs] class FeatureCombiner: """Abstract base class for multi-feature NF combiners. Subclass this and implement :meth:`combine` to define a custom mixing strategy. All combiners share the same one-method interface so they can be swapped in without changing the surrounding pipeline code. Parameters ---------- features : list of str | None, default None Ordered list of modality names this combiner expects. If ``None`` the combiner accepts any keys present in the dict passed to :meth:`combine`. Subclasses may require this to be set at construction time (e.g. :class:`WeightedSumCombiner`). Notes ----- The combiner receives a snapshot dict ``{modality_name: float}`` once per analysis window, immediately after the EMA smoothing step inside :meth:`~mne_rt.RTStream.record_main`. The returned scalar replaces the per-modality values for protocol evaluation and display when a combiner is active. """
[docs] def __init__(self, features: Optional[list[str]] = None) -> None: self.features = features
[docs] def combine(self, values: dict[str, float]) -> float: """Reduce a dict of per-modality NF values to one scalar. Parameters ---------- values : dict[str, float] Mapping of ``{modality_name: current_value}`` for all active modalities in the current window. Returns ------- mixed : float Single combined NF value passed downstream to the protocol and display. """ raise NotImplementedError( f"{type(self).__name__} must implement combine()." )
def __repr__(self) -> str: feat = self.features or "any" return f"{type(self).__name__}(features={feat})"
# --------------------------------------------------------------------------- # Concrete combiners # ---------------------------------------------------------------------------
[docs] class WeightedSumCombiner(FeatureCombiner): """Weighted linear combination of feature values. Computes the weight-normalised linear blend:: mixed = Σ(wᵢ · xᵢ) / Σ(wᵢ) where the sum runs only over features present in *values*. Normalising by the sum of active weights means the result is unaffected by how many features are missing in a given window. Parameters ---------- weights : dict[str, float] Mapping of ``{modality_name: weight}``. Weights do not need to sum to 1 — they are normalised internally. Negative weights are allowed (e.g. to subtract one feature from another). Features absent from *values* at call time are silently skipped. Notes ----- Returns ``0.0`` if none of the specified features are present in *values*, with a :mod:`warnings` message. Examples -------- Alpha-power minus frontal asymmetry:: from mne_rt.combiners import WeightedSumCombiner combiner = WeightedSumCombiner( weights={"sensor_power": 0.6, "laterality": 0.4} ) mixed = combiner.combine({"sensor_power": 1.5, "laterality": 0.3}) # mixed ≈ 0.6*1.5/1.0 + 0.4*0.3/1.0 = 1.02 Suppressing one feature (negative weight):: combiner = WeightedSumCombiner( weights={"sensor_power": 1.0, "entropy": -0.5} ) """
[docs] def __init__(self, weights: dict[str, float]) -> None: super().__init__(features=list(weights.keys())) self.weights = weights
[docs] def combine(self, values: dict[str, float]) -> float: """Return the normalised weighted sum of available feature values.""" total_weight = 0.0 weighted_sum = 0.0 for feat, w in self.weights.items(): if feat in values: weighted_sum += w * values[feat] total_weight += w if total_weight == 0.0: warnings.warn( f"{type(self).__name__}: none of the specified features " f"({list(self.weights)}) were present in values — returning 0.0.", RuntimeWarning, stacklevel=2, ) return 0.0 return weighted_sum / total_weight
[docs] class GeometricMeanCombiner(FeatureCombiner): """Geometric mean of (positive) feature values. Computes the weighted geometric mean:: mixed = exp( Σ(wᵢ · log(max(xᵢ, floor))) / Σ(wᵢ) ) with uniform weights ``wᵢ = 1`` when *weights* is ``None``. Input values are clipped to *floor* before the log transform so that zero or negative inputs do not cause ``NaN`` or ``−inf``. Best suited for features that are inherently positive and multiplicative, such as band-power ratios, coherence values, or connectivity measures. Parameters ---------- features : list of str Ordered modality names to include. weights : dict[str, float] | None, default None Optional per-feature exponents in the weighted geometric mean. ``None`` applies equal weighting (all exponents = 1). floor : float, default 1e-9 Minimum value each input is clipped to before ``log``. Examples -------- Equal-weight geometric mean of three power features:: from mne_rt.combiners import GeometricMeanCombiner combiner = GeometricMeanCombiner( features=["sensor_power", "band_ratio", "individual_peak_power"] ) mixed = combiner.combine({ "sensor_power": 2.0, "band_ratio": 0.5, "individual_peak_power": 1.0, }) # mixed = (2.0 * 0.5 * 1.0) ** (1/3) ≈ 1.0 Weighted exponents (emphasise sensor_power):: combiner = GeometricMeanCombiner( features=["sensor_power", "band_ratio"], weights={"sensor_power": 2.0, "band_ratio": 1.0}, ) """
[docs] def __init__( self, features: list[str], weights: Optional[dict[str, float]] = None, floor: float = 1e-9, ) -> None: super().__init__(features=features) self.weights = weights self.floor = floor
[docs] def combine(self, values: dict[str, float]) -> float: """Return the weighted geometric mean of available feature values.""" log_sum = 0.0 weight_sum = 0.0 for feat in self.features: if feat not in values: continue w = self.weights.get(feat, 1.0) if self.weights else 1.0 x = max(values[feat], self.floor) log_sum += w * math.log(x) weight_sum += w if weight_sum == 0.0: warnings.warn( f"{type(self).__name__}: none of the specified features " f"({self.features}) were present in values — returning 0.0.", RuntimeWarning, stacklevel=2, ) return 0.0 return math.exp(log_sum / weight_sum)
[docs] class ZScoredNormCombiner(FeatureCombiner): """Euclidean norm after online z-score normalisation of each feature. Each feature stream is independently normalised using the mean and standard deviation estimated from the first *warmup* windows:: zᵢ = (xᵢ − μᵢ) / σᵢ mixed = ‖z‖ / √n = sqrt(Σ zᵢ²) / sqrt(n) Dividing by ``√n`` keeps the output near **1** when all features hover at their baseline mean, and it grows when *any* feature deviates — making it a natural "how different from baseline are we?" score. Statistics are **frozen** after warmup (no drift tracking). To re-fit (e.g. between blocks), call :meth:`reset`. Parameters ---------- features : list of str Modality names to include. warmup : int, default 30 Number of windows collected before statistics are fixed and the combiner starts producing non-zero output. Returns ``0.0`` during the warmup phase. Notes ----- If a feature's standard deviation is effectively zero (constant signal), it is floored at ``1e-9`` to prevent division-by-zero. Examples -------- :: from mne_rt.combiners import ZScoredNormCombiner combiner = ZScoredNormCombiner( features=["sensor_power", "laterality", "connectivity_ratio"], warmup=30, ) for window_vals in session_data: # first 30 calls return 0.0 mixed = combiner.combine(window_vals) """
[docs] def __init__(self, features: list[str], warmup: int = 30) -> None: super().__init__(features=features) self.warmup = warmup self._buf: dict[str, list[float]] = {f: [] for f in features} self._mean: dict[str, float] = {} self._std: dict[str, float] = {} self._warmed_up: bool = False
[docs] def reset(self) -> None: """Clear collected statistics and restart the warmup phase. Useful when called between NF blocks so the combiner re-fits its baseline to the new block's distribution. """ self._buf = {f: [] for f in self.features} self._mean.clear() self._std.clear() self._warmed_up = False
[docs] def combine(self, values: dict[str, float]) -> float: """Return the z-scored Euclidean norm, or ``0.0`` during warmup.""" if not self._warmed_up: # Accumulate warmup buffer for feat in self.features: if feat in values: self._buf[feat].append(values[feat]) # Check whether all features have enough samples ready = all( len(self._buf[f]) >= self.warmup for f in self.features ) if ready: for feat in self.features: buf = self._buf[feat] mu = sum(buf) / len(buf) variance = sum((x - mu) ** 2 for x in buf) / len(buf) self._mean[feat] = mu self._std[feat] = max(math.sqrt(variance), 1e-9) self._buf.clear() self._warmed_up = True else: return 0.0 # Post-warmup: z-score each present feature then return normalised norm z_scores = [ (values[f] - self._mean[f]) / self._std[f] for f in self.features if f in values and f in self._mean ] if not z_scores: return 0.0 norm = math.sqrt(sum(z ** 2 for z in z_scores)) return norm / math.sqrt(len(z_scores))
[docs] class LearnedCombiner(FeatureCombiner): """Data-driven combination via a fitted sklearn-compatible estimator. Assembles the feature vector ``[x₁, x₂, …, xₙ]`` (in *features* order), calls ``estimator.predict([[x₁, …, xₙ]])``, and returns the scalar result. Any ``sklearn``-style regressor works out of the box. For classifiers, wrap ``predict_proba`` in a small adapter so the interface matches. The model must be fitted **offline** — on resting-state recordings, prior session data, or a dedicated calibration block — before passing it here. Typical estimator choices: * ``sklearn.linear_model.Ridge`` — regularised linear projection; low variance, directly interpretable weights. * ``sklearn.cross_decomposition.PLSRegression`` — finds the latent direction in feature space most correlated with a target (e.g. tinnitus severity). * ``sklearn.svm.SVR`` — non-linear kernel regression; higher capacity but needs more calibration data and may overfit. Parameters ---------- features : list of str Ordered modality names. The feature vector fed to *estimator* is built in this exact order; missing features are filled with ``0.0``. estimator : fitted sklearn-compatible estimator Must expose a ``predict(X)`` method where ``X`` has shape ``(1, n)``. Examples -------- Offline fit, then real-time use:: from sklearn.linear_model import Ridge from mne_rt.combiners import LearnedCombiner # --- offline (calibration session) --- X_cal = ... # shape (n_windows, n_features) y_cal = ... # target scores, shape (n_windows,) model = Ridge(alpha=1.0).fit(X_cal, y_cal) # --- real-time session --- combiner = LearnedCombiner( features=["sensor_power", "laterality", "connectivity_ratio"], estimator=model, ) mixed = combiner.combine({ "sensor_power": 1.2, "laterality": 0.4, "connectivity_ratio": 0.7, }) """
[docs] def __init__(self, features: list[str], estimator: Any) -> None: super().__init__(features=features) self.estimator = estimator
[docs] def combine(self, values: dict[str, float]) -> float: """Return the estimator's prediction for the current feature vector.""" import numpy as np x = np.array( [[values.get(f, 0.0) for f in self.features]], dtype=float, ) result = self.estimator.predict(x) return float(np.ravel(result)[0])