"""
Algorithms related to analysis of the envelope/magnitude of the EEG.
"""
import copy
import csv
import sys
import warnings
import h5py
import pycwt
import pyprind
import scipy.signal
import numpy as np
from nnsa.artefacts.artefact_detection import detect_artefact_signals
from nnsa.cwt.config import DEFAULT_WAVELET
from nnsa.feature_extraction.common import check_multichannel_data_matrix, get_channel_index, baseline_correction_min, \
prepare_postfix
from nnsa.feature_extraction.result import ResultBase
from nnsa.parameters.parameters import ClassWithParameters, Parameters
from nnsa.utils.arrays import interp_nan
from nnsa.utils.config import HORIZONTAL_RULE
from nnsa.utils.mathematics import nextpow2
from nnsa.utils.other import enumerate_label, convert_string_auto
__all__ = [
'Envelope',
'EnvelopeResult',
'hilbert_envelope',
'power_envelope',
]
from nnsa.utils.segmentation import segment_generator, get_all_segments
[docs]class Envelope(ClassWithParameters):
"""
Class for computing the envelope of a real valued signal.
Main method: envelope().
Args:
see nnsa.ClassWithParameters
Examples:
>>> fs = 256
>>> t = 1/fs*np.arange(8*100000).reshape(8, 100000)
>>> x = 4 * np.sin(t)
>>> env = Envelope(method='hilbert')
>>> print(type(env.parameters).__name__)
Parameters
>>> result = env.envelope(x, fs=fs, verbose=0)
>>> print(type(result).__name__)
EnvelopeResult
>>> print(result.envelope[2, 2222])
3.98601719157083
>>> print(result.envelope.mean())
3.997973665767074
"""
[docs] @staticmethod
def default_parameters():
"""
Return the default parameters.
Returns:
(nnsa.Parameters): a default set of parameters for the object.
"""
pars = {
# The method/algorithm to use for computation of the envelope.
# Choose from 'hilbert', 'power', 'power_segmented'.
# See the functions in this module for each of these methods for more information (e.g. hilbert_envelope()):
'method': 'hilbert',
# Optional additional keyword arguments/parameters for the method/function that computes the envelope.
# These keyword arguments depend on the method specified above. E.g. if method is set to 'hilbert', see the
# function hilbert_envelope() for the optional keyword argument that you can specify here:
'method_kwargs': {},
# The data type / precision for the envelope values (might save space when saving).
'dtype': np.float32,
}
return Parameters(**pars)
[docs] def envelope(self, data_matrix, fs, channel_labels=None, verbose=1):
"""
Perform the envelope computation on multichannel data.
Args:
data_matrix (np.ndarray): see check_multichannel_data_matrix().
fs (float): sample frequency of the signals in data_matrix.
channel_labels (list of str, optional): see check_multichannel_data_matrix().
verbose (int, optional): verbose level.
Defaults to 1.
Returns:
(nnsa.EnvelopeResult): object containing the signal envelopes per channel.
"""
# Check input.
data_matrix, channel_labels = check_multichannel_data_matrix(data_matrix, channel_labels)
if verbose > 0:
print(HORIZONTAL_RULE)
print('Computing envelope with parameters:')
print(self.parameters)
# Extract some parameters.
method = self.parameters['method'].lower()
method_kwargs = self.parameters['method_kwargs']
# Call the corresponding function to compute envelope.
if method == 'hilbert':
envelope = hilbert_envelope(data_matrix, **method_kwargs)
fs_env = fs
elif method == 'power':
envelope = power_envelope(data_matrix, **method_kwargs)
fs_env = fs
elif method == 'power_segmented':
envelope, fs_env = power_segmented_envelope(data_matrix, fs, **method_kwargs)
else:
raise ValueError('Invalid method "{}". Choose from "".'
.format(method, ['hilbert', 'power', 'power_segmented']))
# Set dtype.
envelope = envelope.astype(self.parameters['dtype'])
result = EnvelopeResult(envelope=envelope,
algorithm_parameters=self.parameters,
fs=fs_env,
channel_labels=channel_labels)
return result
[docs]class EnvelopeResult(ResultBase):
"""
High-level interface for processing the results of envelope computation as created by
nnsa.Envelope().
Args:
envelope (np.ndarray): array with shape (channels, segments) containing signal envelopes.
algorithm_parameters (nnsa.Parameters): see ResultBase.
fs (float): sample frequency corresponding to the envelope array.
channel_labels (list of str, optional): labels of the channels corresponding to the channel dimensions of
`envelope`.
If None, default labels will be created.
Defaults to None.
data_info (str, optional): see ResultBase.
segment_start_times (np.ndarray, optional): see ResultBase.
segment_end_times (np.ndarray, optional): see ResultBase.
"""
def __init__(self, envelope, algorithm_parameters, fs, channel_labels=None, data_info=None,
segment_start_times=None, segment_end_times=None):
super().__init__(algorithm_parameters=algorithm_parameters, data_info=data_info,
segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs)
self.envelope = envelope
if self.is_discontinuous():
raise ValueError('Data is discontinuous. This is invalid for the {} class.'.format(self.__class__.__name__))
# Input check.
n_channels, n_segments = envelope.shape
if channel_labels is None:
channel_labels = enumerate_label(n_channels, label='Channel')
elif len(channel_labels) != n_channels:
raise ValueError('Length of channel_labels ({}) does not correspond to the shape of the data {}.'
.format(len(channel_labels), envelope.shape))
# Store variables that are not already stored by the parent class (ResultBase).
self.channel_labels = channel_labels
@property
def num_segments(self):
"""
Return the number of segments (number of samples).
Returns:
(int): number of segments/samples.
"""
return self.envelope.shape[-1]
[docs] def baseline_correction_min(self, window_length=None, inplace=False):
"""
Perform baseline correction of the envelope by subtracting the minimum value in a window before each sample.
See nnsa.baseline_correction_min().
Args:
window_length (int, optional): the length of the window (in samples) to look for the baseline (minimum),
see nnsa.baseline_correction_min().
If None, a window corresponding to 1 minute is used.
Defaults to None.
inplace (bool, optional): if True, the corrected values replace the envelope values in the current object.
If False, a new EnvelopeResult object is returned with the baseline corrected envelope values.
Defaults to False.
Returns:
envelope_corrected (nnsa.EnvelopeResult): a new EnvelopeResult object with the baseline corrected envelope
values (only if inplace is True).
"""
if window_length is None:
window_length = 60 * self.fs
if inplace:
# We will replace the original envelope attribute in the current EnvelopeResult object by the corrected
# envelope values.
envelope_corrected = self
else:
# We will replace the original envelope attribute in A COPY OF the current EnvelopeResult.
# Create a copy of the EnvelopeResult object.
envelope_corrected = copy.deepcopy(self)
for j, env_j in enumerate(envelope_corrected.envelope):
# For each channel.
envelope_corrected.envelope[j] = baseline_correction_min(env_j, window_length=window_length)
# Only return if not in place.
if not inplace:
return envelope_corrected
[docs] def compute_features(self, envs, postfix=None, concatenate_channels=True):
"""
Compute features of an array of envelope values.
Args:
envs (np.ndarray): array with envelope values with dimensions (channels, time).
postfix (str, optional): postfix for the keys in the output dictionary.
concatenate_channels (bool, optional): if True, the channels are concatenated before computing the features.
If False, the features are computed on each channel.
Defaults to True.
Returns:
features (dict): features describing the envelope distribution.
"""
# Define the percentiles that characterize the envelope distribution.
percentiles = [5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 95]
# Prepare postfix.
postfix = prepare_postfix(postfix)
if concatenate_channels:
# Make the code iterate over a list with just one element containing the entire envelope array.
envs = [envs]
postfix_str = [postfix]
else:
postfix_str = ['_{}{}'.format(label.lstrip('EEG '), postfix)
for label in self.channel_labels]
features = dict()
# Loop over channels.
for e, pf in zip(envs, postfix_str):
# Compute percentiles.
if np.size(e) == 0:
# Cannot compute percentiles if e is empty.
percentile_values = np.full(len(percentiles), np.nan)
else:
percentile_values = np.nanpercentile(e, percentiles)
# Loop over percentiles (those are the features).
for prc, val in zip(percentiles, percentile_values):
features['ENV_q{}{}'.format(prc, pf)] = val
return features
[docs] def plot(self, channel=None):
"""
Plot the envelope in the current axis.
Args:
channel (str or list, optional): string or list of strings with label(s) of the channel(s) to plot.
If None, all channels are plot in the same figure with a separate color.
Defaults to None.
"""
import matplotlib.pyplot as plt
# Collect indices of the channels to plot.
if channel is not None:
indices = get_channel_index(self.channel_labels, channel)
if not isinstance(indices, list):
indices = [indices]
else:
indices = range(len(self.channel_labels))
t = self.segment_start_times
for i in indices:
plt.plot(t, self.envelope[i], label=self.channel_labels[i])
plt.title('Envelope')
plt.xlabel('Time (s)')
plt.ylabel('Envelope')
[docs] def to_base_dataset(self, unit='uV', **kwargs):
"""
Create an BaseDataset object from the envelope data.
Args:
unit (str, optional): unit of the envelope data.
Defaults to 'uV'.
**kwargs (optional): optional keyword arguments for the TimeSeries object.
Returns:
ds (nnsa.BaseDataset): BaseDataset containing the envelope data of the available channels.
"""
# Import locally since EegDataset imports Envelope from this module.
from nnsa.containers.datasets import BaseDataset
# Initialize an empty EegDataset.
ds = BaseDataset()
# Loop over the channels, create a TimeSeries object and append it to the EegDataset.
for signal, label in zip(self.envelope, self.channel_labels):
ts = self.to_time_series(label, unit=unit, **kwargs)
ds.append(ts)
return ds
[docs] def to_eeg_dataset(self, unit='uV', **kwargs):
"""
Create an EegDataset object from the envelope data.
Args:
unit (str, optional): unit of the envelope data.
Defaults to 'uV'.
**kwargs (optional): optional keyword arguments for the TimeSeries object.
Returns:
ds (nnsa.EegDataset): EegDataset containing the envelope data of the available channels.
"""
# Import locally since EegDataset imports Envelope from this module.
from nnsa.containers.datasets import EegDataset
# Initialize an empty EegDataset.
ds = EegDataset()
# Loop over the channels, create a TimeSeries object and append it to the EegDataset.
for signal, label in zip(self.envelope, self.channel_labels):
ts = self.to_time_series(label, unit=unit, **kwargs)
ds.append(ts)
return ds
[docs] def to_time_series(self, channel=None, unit='uV', **kwargs):
"""
Create a TimeSeries object from the envelope data of the specified channel.
Args:
channel (str, optional): the label of the channel which to convert to a TimeSeries.
If None and the envelope contains only one channel, this channel is selected.
Defaults to None.
unit (str, optional): unit of the envelope data.
Defaults to 'uV'.
**kwargs (optional): optional keyword arguments for the TimeSeries object.
Returns:
ts (nnsa.TimeSeries): TimeSeries containing the envelope data of the specified channel.
"""
# Import locally since TimeSeries imports Envelope from this module.
from nnsa.containers.time_series import TimeSeries
if channel is None:
if self.envelope.shape[0] == 1:
# only one channel available. Select this one.
index = 0
channel = self.channel_labels[0]
else:
raise ValueError('Specify the channel in EnvelopeResult to convert to TimeSeries. Available channels:'
' {}.'.format(self.channel_labels))
else:
# Get the channel index.
index = get_channel_index(self.channel_labels, channel)
# Create info object.
info = {
'source': 'EnvelopeResult of {}.'.format(
self.data_info if self.data_info is not None else 'unknown source'
)
}
ts = TimeSeries(signal=self.envelope[index],
fs=self.fs,
label=channel,
unit=unit,
info=info,
time_offset=self.time_offset,
**kwargs)
return ts
def _merge(self, other, index):
"""
See ResultBase.
"""
# Check if the channel labels of self and other are the same.
if self.channel_labels != other.channel_labels:
raise ValueError('Cannot merge objects with different channel labels.')
n_channels, n_samples = self.envelope.shape
if index < n_samples:
# Cut piece off.
msg = 'Overwriting data while merging.'
warnings.warn(msg)
self.envelope = self.envelope[:, :index]
else:
# Add nans.
self.envelope = np.concatenate([self.envelope, np.full((n_channels, index-n_samples), fill_value=np.nan)], axis=-1)
# Merge.
self.envelope = np.concatenate([self.envelope, other.envelope], axis=-1)
@staticmethod
def _read_from_csv(filepath):
"""
Read result from csv file into a EnvelopeResult class.
Args:
filepath (str): see ResultBase._read_from_csv().
Returns:
result (nnsa.EnvelopeResult): instance of EnvelopeResult containing the
Envelope result.
"""
# Lines 1-4: Standard csv header (use the ResultBase method).
algorithm_parameters, data_info, fs = ResultBase._read_csv_header(filepath)[1:]
# Re-open the file and read the rest of the file, line by line.
with open(filepath, 'r') as f:
reader = csv.reader(f)
# Lines 1-4: Standard csv header (already read, skip).
[next(reader) for i in range(4)]
# Line 5: Non-array data header (skip).
assert(next(reader) == ['channel_labels', 'envelope.shape'])
# Line 6: Non-array data.
channel_labels, envelope_shape = [
convert_string_auto(i) for i in next(reader)]
# Line 7: Array header (skip).
assert(next(reader)[0] == 'envelope')
# Line 8: Array data (flattened).
envelope_as_list = [float(i) for i in next(reader)]
# Convert to numpy array and reshape.
envelope = np.reshape(envelope_as_list, envelope_shape)
# Create a result object.
result = EnvelopeResult(envelope=envelope,
fs=fs,
channel_labels=channel_labels,
algorithm_parameters=algorithm_parameters,
data_info=data_info)
return result
@staticmethod
def _read_from_hdf5(filepath):
"""
Read result from hdf5 file into a EnvelopeResult class.
Args:
filepath (str): see ResultBase._read_from_hdf5().
Returns:
result (nnsa.EnvelopeResult): instance of EnvelopeResult containing the
Envelope result.
"""
# Read standard hdf5 header (use the ResultBase method).
algorithm_parameters, data_info, segment_start_times, segment_end_times, fs, time_offset =\
ResultBase._read_hdf5_header(filepath)[1:]
# Re-open the file and read the rest of the file.
with h5py.File(filepath, 'r') as f:
# Read array data.
envelope = f['envelope'][:]
# Read non-array data.
channel_labels = [label.decode() for label in f['envelope'].attrs['channel_labels']]
# Create a result object.
result = EnvelopeResult(envelope=envelope,
channel_labels=channel_labels,
algorithm_parameters=algorithm_parameters,
data_info=data_info,
segment_start_times=segment_start_times,
segment_end_times=segment_end_times,
fs=fs)
return result
def _write_to_csv(self, filepath):
"""
Write the contents of the object to a csv file.
Args:
filepath (str): see ResultBase._write_to_csv().
"""
# Lines 1-4: Standard csv header (use the ResultBase method).
self._write_csv_header(filepath)
# Append attributes to the csv file, line by line.
with open(filepath, 'a', newline='') as csvfile:
writer = csv.writer(csvfile)
# Line 5: Non-array data header.
writer.writerow(['channel_labels', 'envelope.shape'])
# Line 6: Non-array data.
writer.writerow([self.channel_labels, self.envelope.shape])
# Line 7: Array header.
writer.writerow(['envelope'])
# Line 8: Array data (flattened).
writer.writerow(self.envelope.reshape(-1).tolist())
def _write_to_hdf5(self, filepath):
"""
Write the contents of the object to an hdf5 file.
Args:
filepath (str): see ResultBase._write_to_hdf5().
"""
# Write standard hdf5 header (use the ResultBase method).
self._write_hdf5_header(filepath)
# Append attributes to the hdf5 file.
with h5py.File(filepath, 'a') as f:
# Write array data.
f.create_dataset('envelope', data=self.envelope)
# Write non-array data as attributes.
# Convert strings to np.string_ type as recommended for compatibility.
f['envelope'].attrs['channel_labels'] = [np.string_(label)
for label in self.channel_labels]
def compute_bandpass_envelope(x, freq_limits, fs=1, order=4, axis=-1, verbose=1):
"""
Compute the envelope(s) of the x for certain frequency bands.
First applies a bandpass filter and then computes the envelope using the Hilbert transform of
the band-filtered signal.
Args:
x (np.ndarray): signal array.
freq_limits (list, np.ndarray or dict): iterbale specifying the frequency bands.
Has items consisting of cut-off frequencies, like [f_low, f_high].
Specify fs to put these in Hz, else they are normalized by the Nyqvist frequency.
If a dict, the returned value is a dict as well with the same keys corresponding to the
frequency bands.
fs (float): sample frequency of x. Specify this to put freq_limits in Hz.
order (int): order of the butterworth bandpass filter.
axis (int): time axis of x.
Returns:
envelopes (np.ndarray or dict): envelopes. If an array has shape (len(freq_limits), ) + x.shape.
If a dict (when freq_linits is a dict) has len(freq_limits) items.
Examples:
>>> fs = 100
>>> t = np.arange(1000)/fs
>>> np.random.seed(43)
>>> x = 5*np.sin(2*np.pi*10*t) + np.random.rand(len(t)) # 10 Hz sine
>>> x += 10*np.sin(2*np.pi*40*t) + np.random.rand(len(t)) # 40 Hz sine
>>> x.shape
(1000,)
>>> freq_limits = [[9, 11], [24, 26], [39, 41]] # 10, 25, 40 Hz center frequencies
>>> env = compute_bandpass_envelope(x, freq_limits, fs=fs)
>>> env.shape
(3, 1000)
>>> print(env.mean(axis=-1))
[4.9376538 0.09354586 9.82055459]
"""
x = np.asarray(x)
# Interpolate nans.
nan_mask = np.isnan(x)
x = interp_nan(x, axis=axis)
envelopes = []
# Parse inputs.
if isinstance(freq_limits, dict):
freq_keys = list(freq_limits.keys())
freq_limits = list(freq_limits.values())
else:
freq_keys = None
# Check if only one freq low, high pair is given.
if np.size(np.asarray(freq_limits)) == 2 and np.asarray(freq_limits).ndim == 1:
freq_limits = [freq_limits]
# Initialize progress bar.
bar = pyprind.ProgBar(len(freq_limits), stream=sys.stdout)
for i, (flow, fhigh) in enumerate(freq_limits):
# Bandpass filter.
if flow is None or flow == -1:
if fhigh is None or fhigh == -1:
# No filter.
xf = x
else:
b, a = scipy.signal.butter(N=order, Wn=fhigh, btype='low', fs=fs)
xf = scipy.signal.filtfilt(b, a, x, axis=axis)
else:
if fhigh is None or fhigh == -1:
b, a = scipy.signal.butter(N=order, Wn=flow, btype='high', fs=fs)
xf = scipy.signal.filtfilt(b, a, x, axis=axis)
else:
b, a = scipy.signal.butter(N=order, Wn=[flow, fhigh], btype='bandpass', fs=fs)
xf = scipy.signal.filtfilt(b, a, x, axis=axis)
# Envelope computation.
xa = hilbert_envelope(xf, axis=axis)
# Put nans back.
xa[nan_mask] = np.nan
envelopes.append(xa)
if verbose:
bar.update()
if freq_keys is not None:
# To dict.
envelopes = dict(zip(freq_keys, envelopes))
else:
# To array.
envelopes = np.vstack([np.expand_dims(xa, 0) for xa in envelopes])
return envelopes
def compute_cwt_envelope(x, freqs, fs, **kwargs):
"""
Compute the envelope(s) of the x for certain frequency bands.
Args:
x (np.ndarray): signal array. A 1-D array or a 2D array with shape (n_channels, n_samples).
freqs (iterable): iterbale specifying the center frequency of the wavelets in Hz.
fs (float): sample frequency of x.
**kwargs: arguments for pycwt.wavelet.cwt().
Returns:
envelopes (np.ndarray): envelopes. Has shape (len(freqs), ) + x.shape.
Examples:
>>> fs = 100
>>> t = np.arange(1000)/fs
>>> np.random.seed(43)
>>> x = 5*np.sin(2*np.pi*10*t) + np.random.rand(len(t)) # 10 Hz sine
>>> x += 10*np.sin(2*np.pi*40*t) + np.random.rand(len(t)) # 40 Hz sine
>>> x.shape
(1000,)
>>> freqs = [10, 25, 40] # 10, 25, 40 Hz center frequencies
>>> env = compute_cwt_envelope(x, freqs, fs=fs)
>>> env.shape
(3, 1000)
>>> print(env.mean(axis=-1))
[14.53283559 0.35320805 14.53766499]
"""
# Check input.
x = np.asarray(x)
if x.ndim == 1:
x = [x]
elif x.ndim == 2 and x.shape[0] < x.shape[1]:
pass
else:
raise ValueError('Invalid input shape {}. Must be 1D or 2D with shape (n_channels, n_samples).'
.format(x.shape))
# Wavelet transform.
cwt_kwargs = dict({
'wavelet': DEFAULT_WAVELET,
'freqs': np.asarray(freqs),
}, **kwargs)
def fun(a):
# Interpolate nans.
nan_mask = np.isnan(a)
a = interp_nan(a)
coefs, scales, freqs, coi, fft, _ = pycwt.wavelet.cwt(a, 1 / fs,
**cwt_kwargs)
C = np.abs(coefs)
# Put nans back.
C[:, nan_mask] = np.nan
return C
# Call function one each 1D array, operating along `axis` and collect in an array.
envelopes = np.array(list(map(fun, x))).swapaxes(0, 1)
if isinstance(x, list):
# 1D array passed as input, extract single channel.
envelopes = envelopes[:, 0, :]
return envelopes
[docs]def hilbert_envelope(x, nfft=None, axis=-1):
"""
Compute the envelope of the signals in x using the Hilbert transform.
Notes:
The envelope may suffer from boundary effects.
Args:
x (np.ndarray): signal data.
nfft (int, optional): number of Fourier components.
If None, uses the next power of 2 from x.shape[axis] for optimal speed.
Defaults to None.
axis (int, optional): the axis along which to compute the envelope.
Defaults to -1.
Returns:
envelope (np.ndarray): array with same shape as `x` containing the envelopes of
the signals in `x`.
"""
if nfft is None:
# Optimally, N equals the length of the input signal x. However, this may be slow. Taking the next power of 2
# for this N makes the hilbert function a lot faster (due to fast Fourier transform algorithm).
# If N > x.shape[axis], the hilbert() function zero-pads the signal at the end (on the right side only).
# We will remove these padded samples afterwards.
nfft = 2 ** (nextpow2(x.shape[axis]))
elif nfft < x.shape[axis]:
msg = '\nnfft (={}) is smaller than the length of x (={}).'.format(nfft, x.shape[axis])
warnings.warn(msg)
# Convert the real signal to an analytical (complex) signal using the Hilbert transform.
x_analytical = scipy.signal.hilbert(x, N=nfft, axis=axis)
# The envelope can be computed as the magnitude of the complex signal.
envelope = np.abs(x_analytical)
# Exclude the last samples that were added due to zero-padding in the hilbert() function if N > x.shape[axis].
n_pad = nfft - x.shape[axis]
if n_pad > 0:
slc = [slice(None)] * len(x.shape)
slc[axis] = slice(None, -n_pad)
envelope = envelope[tuple(slc)]
return envelope
[docs]def power_envelope(x, n_window=100, axis=-1):
"""
Compute the envelope of the signals in x derived from average signal power in small windows.
Based on the envelope approximation used by Jennekens et al. 2011.
References:
Jennekens W , Ruijs LS , Lommen CML , Niemarkt HJ , Pasman JW , van Kranen–Mastenbroek VM , et al.
Automatic burst detection for the EEG of the preterm infant.
Physiol Meas 2011;32(10):1623–37 .
Args:
x (np.ndarray): signal data.
n_window (int, optional): the number of samples in the averaging window.
Defaults to 100.
axis (int, optional): the axis along which to compute the envelope.
Defaults to -1.
Returns:
envelope (np.ndarray): array with same shape as `x` containing the envelopes of
the signals in `x`.
"""
# Envelope is the square root of twice the average power in a small window at each sample.
# Create the convolution kernel to compute the running average.
kernel_shape = np.ones(x.ndim, dtype=int)
kernel_shape[axis] = n_window
kernel = 2 * np.ones(kernel_shape) / n_window # Note the factor 2 (we need twice the average power).
# Compute the average power.
avg_power = scipy.signal.convolve(x**2, kernel, mode='same')
# Compute the envelope.
# Note: use clip, since convolutions of zeros may lead to very small negative numbers due to amplified
# machine precision problems.
envelope = np.sqrt(np.clip(avg_power, a_min=0, a_max=np.inf))
return envelope
def power_segmented_envelope(x, fs, segment_length, overlap=0,
artefact_criteria=None, axis=-1):
"""
Compute the envelope of the signals in x derived from average signal power per segments.
Based on the envelope approximation used by Jennekens et al. 2011.
References:
Jennekens W , Ruijs LS , Lommen CML , Niemarkt HJ , Pasman JW , van Kranen–Mastenbroek VM , et al.
Automatic burst detection for the EEG of the preterm infant.
Physiol Meas 2011;32(10):1623–37.
Args:
x (np.ndarray): signal data.
fs (float): sample frequency of the signal.
segment_length (float): the segment length of the averaging window in seconds.
overlap (float, optional): segment overlap in seconds.
Defaults to 0.
artefact_criteria (dict, optional): keyword arguments for nnsa.detect_artefact_signals()
that operates on the segments. Artefact segments are indicated by NaN in the output array.
If None, no artefact detection is done.
Default to None.
axis (int, optional): the axis along which to compute the envelope.
Defaults to -1.
Returns:
envelope (np.ndarray): array with same dimensions as `x` containing the envelopes of
the signals in `x`. The size of the time dimension (the `axis`) will be shorter due to segmentation.
fs_env (float): sample frequency of `envelope`.
"""
# Segment as a large array.
x = np.asarray(x)
x_segmented = get_all_segments(x**2, segment_length=segment_length,
overlap=overlap, fs=fs, axis=axis)
# Swap axis, the segment axis will become the output's time axis.
if axis < 0:
# Convert negative axis to positive.
axis += x.ndim
x_segmented = x_segmented.swapaxes(0, axis+1)
# Determine the sample frequency of the segments.
fs_env = 1/(segment_length - overlap)
# Envelope is the square root of twice the average power in each segment.
envelope = np.sqrt(2*np.nanmean(x_segmented, axis=0))
# Channel exclusion mask.
if artefact_criteria is not None:
artefact_mask = detect_artefact_signals(x_segmented, axis=0, demean=False,
**artefact_criteria)
# Replace artefacts with NaN.
envelope[artefact_mask] = np.nan
return envelope, fs_env
# def power_segmented_envelope_2(x, fs, segment_length, overlap=0, artefact_criteria=None, axis=-1):
# """
# Compute the envelope of the signals in x derived from average signal power per segments.
#
# Based on the envelope approximation used by Jennekens et al. 2011.
#
# References:
# Jennekens W , Ruijs LS , Lommen CML , Niemarkt HJ , Pasman JW , van Kranen–Mastenbroek VM , et al.
# Automatic burst detection for the EEG of the preterm infant.
# Physiol Meas 2011;32(10):1623–37.
#
# Args:
# x (np.ndarray): signal data.
# fs (float): sample frequency of the signal.
# segment_length (float): the segment length of the averaging window in seconds.
# overlap (float, optional): segment overlap in seconds.
# Defaults to 0.
# artefact_criteria (dict, optional): keyword arguments for nnsa.detect_artefact_signals()
# that operates on the segments. Artefact segments are indicated by NaN in the output array.
# If None, no artefact detection is done.
# Default to None.
# axis (int, optional): the axis along which to compute the envelope.
# Defaults to -1.
#
# Returns:
# envelope (np.ndarray): array with same dimensions as `x` containing the envelopes of
# the signals in `x`. The size of the time dimension (the `axis`) will be shorter due to segmentation.
# fs_env (float): sample frequency of `envelope`.
# """
# if axis < 0:
# # Convert negative axis to positive.
# axis += x.ndim
#
# # Determine the sample frequency of the segments.
# fs_env = 1/(segment_length - overlap)
#
# # Segment as a large array.
# seg_generator = segment_generator(x, segment_length=segment_length,
# overlap=overlap, fs=fs, axis=axis)
#
# # Loop over segments.
# envelope = []
# for seg in seg_generator:
# # Envelope is the square root of twice the average power in each segment.
# env = np.sqrt(2 * np.nanmean(seg ** 2, axis=axis, keepdims=True))
# envelope.append(env)
#
# # Collect in array.
# envelope = np.concatenate(envelope, axis=axis)
#
# # Replace zeros with NaN (nanmean of all-nan slice).
# envelope[envelope == 0] = np.nan
#
# return envelope, fs_env