Source code for nnsa.feature_extraction.envelope

"""
Algorithms related to analysis of the envelope/magnitude of the EEG.
"""
import copy
import csv
import sys
import warnings

import h5py
import pycwt
import pyprind
import scipy.signal
import numpy as np

from nnsa.artefacts.artefact_detection import detect_artefact_signals
from nnsa.cwt.config import DEFAULT_WAVELET
from nnsa.feature_extraction.common import check_multichannel_data_matrix, get_channel_index, baseline_correction_min, \
    prepare_postfix
from nnsa.feature_extraction.result import ResultBase
from nnsa.parameters.parameters import ClassWithParameters, Parameters
from nnsa.utils.arrays import interp_nan
from nnsa.utils.config import HORIZONTAL_RULE
from nnsa.utils.mathematics import nextpow2
from nnsa.utils.other import enumerate_label, convert_string_auto

__all__ = [
    'Envelope',
    'EnvelopeResult',
    'hilbert_envelope',
    'power_envelope',
]

from nnsa.utils.segmentation import segment_generator, get_all_segments


[docs]class Envelope(ClassWithParameters): """ Class for computing the envelope of a real valued signal. Main method: envelope(). Args: see nnsa.ClassWithParameters Examples: >>> fs = 256 >>> t = 1/fs*np.arange(8*100000).reshape(8, 100000) >>> x = 4 * np.sin(t) >>> env = Envelope(method='hilbert') >>> print(type(env.parameters).__name__) Parameters >>> result = env.envelope(x, fs=fs, verbose=0) >>> print(type(result).__name__) EnvelopeResult >>> print(result.envelope[2, 2222]) 3.98601719157083 >>> print(result.envelope.mean()) 3.997973665767074 """
[docs] @staticmethod def default_parameters(): """ Return the default parameters. Returns: (nnsa.Parameters): a default set of parameters for the object. """ pars = { # The method/algorithm to use for computation of the envelope. # Choose from 'hilbert', 'power', 'power_segmented'. # See the functions in this module for each of these methods for more information (e.g. hilbert_envelope()): 'method': 'hilbert', # Optional additional keyword arguments/parameters for the method/function that computes the envelope. # These keyword arguments depend on the method specified above. E.g. if method is set to 'hilbert', see the # function hilbert_envelope() for the optional keyword argument that you can specify here: 'method_kwargs': {}, # The data type / precision for the envelope values (might save space when saving). 'dtype': np.float32, } return Parameters(**pars)
[docs] def envelope(self, data_matrix, fs, channel_labels=None, verbose=1): """ Perform the envelope computation on multichannel data. Args: data_matrix (np.ndarray): see check_multichannel_data_matrix(). fs (float): sample frequency of the signals in data_matrix. channel_labels (list of str, optional): see check_multichannel_data_matrix(). verbose (int, optional): verbose level. Defaults to 1. Returns: (nnsa.EnvelopeResult): object containing the signal envelopes per channel. """ # Check input. data_matrix, channel_labels = check_multichannel_data_matrix(data_matrix, channel_labels) if verbose > 0: print(HORIZONTAL_RULE) print('Computing envelope with parameters:') print(self.parameters) # Extract some parameters. method = self.parameters['method'].lower() method_kwargs = self.parameters['method_kwargs'] # Call the corresponding function to compute envelope. if method == 'hilbert': envelope = hilbert_envelope(data_matrix, **method_kwargs) fs_env = fs elif method == 'power': envelope = power_envelope(data_matrix, **method_kwargs) fs_env = fs elif method == 'power_segmented': envelope, fs_env = power_segmented_envelope(data_matrix, fs, **method_kwargs) else: raise ValueError('Invalid method "{}". Choose from "".' .format(method, ['hilbert', 'power', 'power_segmented'])) # Set dtype. envelope = envelope.astype(self.parameters['dtype']) result = EnvelopeResult(envelope=envelope, algorithm_parameters=self.parameters, fs=fs_env, channel_labels=channel_labels) return result
[docs]class EnvelopeResult(ResultBase): """ High-level interface for processing the results of envelope computation as created by nnsa.Envelope(). Args: envelope (np.ndarray): array with shape (channels, segments) containing signal envelopes. algorithm_parameters (nnsa.Parameters): see ResultBase. fs (float): sample frequency corresponding to the envelope array. channel_labels (list of str, optional): labels of the channels corresponding to the channel dimensions of `envelope`. If None, default labels will be created. Defaults to None. data_info (str, optional): see ResultBase. segment_start_times (np.ndarray, optional): see ResultBase. segment_end_times (np.ndarray, optional): see ResultBase. """ def __init__(self, envelope, algorithm_parameters, fs, channel_labels=None, data_info=None, segment_start_times=None, segment_end_times=None): super().__init__(algorithm_parameters=algorithm_parameters, data_info=data_info, segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs) self.envelope = envelope if self.is_discontinuous(): raise ValueError('Data is discontinuous. This is invalid for the {} class.'.format(self.__class__.__name__)) # Input check. n_channels, n_segments = envelope.shape if channel_labels is None: channel_labels = enumerate_label(n_channels, label='Channel') elif len(channel_labels) != n_channels: raise ValueError('Length of channel_labels ({}) does not correspond to the shape of the data {}.' .format(len(channel_labels), envelope.shape)) # Store variables that are not already stored by the parent class (ResultBase). self.channel_labels = channel_labels @property def num_segments(self): """ Return the number of segments (number of samples). Returns: (int): number of segments/samples. """ return self.envelope.shape[-1]
[docs] def baseline_correction_min(self, window_length=None, inplace=False): """ Perform baseline correction of the envelope by subtracting the minimum value in a window before each sample. See nnsa.baseline_correction_min(). Args: window_length (int, optional): the length of the window (in samples) to look for the baseline (minimum), see nnsa.baseline_correction_min(). If None, a window corresponding to 1 minute is used. Defaults to None. inplace (bool, optional): if True, the corrected values replace the envelope values in the current object. If False, a new EnvelopeResult object is returned with the baseline corrected envelope values. Defaults to False. Returns: envelope_corrected (nnsa.EnvelopeResult): a new EnvelopeResult object with the baseline corrected envelope values (only if inplace is True). """ if window_length is None: window_length = 60 * self.fs if inplace: # We will replace the original envelope attribute in the current EnvelopeResult object by the corrected # envelope values. envelope_corrected = self else: # We will replace the original envelope attribute in A COPY OF the current EnvelopeResult. # Create a copy of the EnvelopeResult object. envelope_corrected = copy.deepcopy(self) for j, env_j in enumerate(envelope_corrected.envelope): # For each channel. envelope_corrected.envelope[j] = baseline_correction_min(env_j, window_length=window_length) # Only return if not in place. if not inplace: return envelope_corrected
[docs] def compute_features(self, envs, postfix=None, concatenate_channels=True): """ Compute features of an array of envelope values. Args: envs (np.ndarray): array with envelope values with dimensions (channels, time). postfix (str, optional): postfix for the keys in the output dictionary. concatenate_channels (bool, optional): if True, the channels are concatenated before computing the features. If False, the features are computed on each channel. Defaults to True. Returns: features (dict): features describing the envelope distribution. """ # Define the percentiles that characterize the envelope distribution. percentiles = [5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 95] # Prepare postfix. postfix = prepare_postfix(postfix) if concatenate_channels: # Make the code iterate over a list with just one element containing the entire envelope array. envs = [envs] postfix_str = [postfix] else: postfix_str = ['_{}{}'.format(label.lstrip('EEG '), postfix) for label in self.channel_labels] features = dict() # Loop over channels. for e, pf in zip(envs, postfix_str): # Compute percentiles. if np.size(e) == 0: # Cannot compute percentiles if e is empty. percentile_values = np.full(len(percentiles), np.nan) else: percentile_values = np.nanpercentile(e, percentiles) # Loop over percentiles (those are the features). for prc, val in zip(percentiles, percentile_values): features['ENV_q{}{}'.format(prc, pf)] = val return features
[docs] def extract_global_features(self, max_amplitude=200, sleep_stages=None, concatenate_channels=True): """ Extract features that characterize the entire recording. Args: max_amplitude (float, optional): the threshold for the envelope in uV. Values above this threshold are discarded in the analysis. Defaults to 200. sleep_stages (nnsa.SleepStagesResult, optional): object containing sleep stages result. Used to report global feature values per sleep stage. If None, no sleep stage dependent feature values are returned. Defaults to None. concatenate_channels (bool, optional): see self.compute_features. Returns: global_features (dict): dictionary of features values, hashed by feature name. """ global_features = dict() # Extract envelopes per channel (dimensions (channels, time)). envs = self.envelope # Discard envelope values exceeding the max_amplitude threshold by setting them to nan. envs[envs > 200] = np.nan if sleep_stages is None: # Combine all segments to get global features (do not distinguish sleep stages). # Just use all data. sleep_label = 'ALL' features = self.compute_features(envs, postfix=sleep_label, concatenate_channels=concatenate_channels) global_features.update(features) else: # Combine all segments to get global features (distinguish sleep stages). # Get the sleep labels (class numbers) of the segments. segment_labels = sleep_stages.segment_labels(self.segment_start_times, self.segment_end_times) # Use all sleep stages, except no_label to capture the entire recording. sleep_label = 'ALL' sleep_mask = ~np.isnan(segment_labels) features = self.compute_features(envs[:, sleep_mask], postfix=sleep_label, concatenate_channels=concatenate_channels) global_features.update(features) # Loop over sleep stages. for sleep_label, label_number in sleep_stages.class_mapping.items(): if np.isnan(label_number): # Skip NaNs (no_label segments). continue # Combine the segments per sleep stage. sleep_mask = segment_labels == label_number features = self.compute_features(envs[:, sleep_mask], postfix=sleep_label, concatenate_channels=concatenate_channels) global_features.update(features) return global_features
[docs] def plot(self, channel=None): """ Plot the envelope in the current axis. Args: channel (str or list, optional): string or list of strings with label(s) of the channel(s) to plot. If None, all channels are plot in the same figure with a separate color. Defaults to None. """ import matplotlib.pyplot as plt # Collect indices of the channels to plot. if channel is not None: indices = get_channel_index(self.channel_labels, channel) if not isinstance(indices, list): indices = [indices] else: indices = range(len(self.channel_labels)) t = self.segment_start_times for i in indices: plt.plot(t, self.envelope[i], label=self.channel_labels[i]) plt.title('Envelope') plt.xlabel('Time (s)') plt.ylabel('Envelope')
[docs] def to_base_dataset(self, unit='uV', **kwargs): """ Create an BaseDataset object from the envelope data. Args: unit (str, optional): unit of the envelope data. Defaults to 'uV'. **kwargs (optional): optional keyword arguments for the TimeSeries object. Returns: ds (nnsa.BaseDataset): BaseDataset containing the envelope data of the available channels. """ # Import locally since EegDataset imports Envelope from this module. from nnsa.containers.datasets import BaseDataset # Initialize an empty EegDataset. ds = BaseDataset() # Loop over the channels, create a TimeSeries object and append it to the EegDataset. for signal, label in zip(self.envelope, self.channel_labels): ts = self.to_time_series(label, unit=unit, **kwargs) ds.append(ts) return ds
[docs] def to_eeg_dataset(self, unit='uV', **kwargs): """ Create an EegDataset object from the envelope data. Args: unit (str, optional): unit of the envelope data. Defaults to 'uV'. **kwargs (optional): optional keyword arguments for the TimeSeries object. Returns: ds (nnsa.EegDataset): EegDataset containing the envelope data of the available channels. """ # Import locally since EegDataset imports Envelope from this module. from nnsa.containers.datasets import EegDataset # Initialize an empty EegDataset. ds = EegDataset() # Loop over the channels, create a TimeSeries object and append it to the EegDataset. for signal, label in zip(self.envelope, self.channel_labels): ts = self.to_time_series(label, unit=unit, **kwargs) ds.append(ts) return ds
[docs] def to_time_series(self, channel=None, unit='uV', **kwargs): """ Create a TimeSeries object from the envelope data of the specified channel. Args: channel (str, optional): the label of the channel which to convert to a TimeSeries. If None and the envelope contains only one channel, this channel is selected. Defaults to None. unit (str, optional): unit of the envelope data. Defaults to 'uV'. **kwargs (optional): optional keyword arguments for the TimeSeries object. Returns: ts (nnsa.TimeSeries): TimeSeries containing the envelope data of the specified channel. """ # Import locally since TimeSeries imports Envelope from this module. from nnsa.containers.time_series import TimeSeries if channel is None: if self.envelope.shape[0] == 1: # only one channel available. Select this one. index = 0 channel = self.channel_labels[0] else: raise ValueError('Specify the channel in EnvelopeResult to convert to TimeSeries. Available channels:' ' {}.'.format(self.channel_labels)) else: # Get the channel index. index = get_channel_index(self.channel_labels, channel) # Create info object. info = { 'source': 'EnvelopeResult of {}.'.format( self.data_info if self.data_info is not None else 'unknown source' ) } ts = TimeSeries(signal=self.envelope[index], fs=self.fs, label=channel, unit=unit, info=info, time_offset=self.time_offset, **kwargs) return ts
def _merge(self, other, index): """ See ResultBase. """ # Check if the channel labels of self and other are the same. if self.channel_labels != other.channel_labels: raise ValueError('Cannot merge objects with different channel labels.') n_channels, n_samples = self.envelope.shape if index < n_samples: # Cut piece off. msg = 'Overwriting data while merging.' warnings.warn(msg) self.envelope = self.envelope[:, :index] else: # Add nans. self.envelope = np.concatenate([self.envelope, np.full((n_channels, index-n_samples), fill_value=np.nan)], axis=-1) # Merge. self.envelope = np.concatenate([self.envelope, other.envelope], axis=-1) @staticmethod def _read_from_csv(filepath): """ Read result from csv file into a EnvelopeResult class. Args: filepath (str): see ResultBase._read_from_csv(). Returns: result (nnsa.EnvelopeResult): instance of EnvelopeResult containing the Envelope result. """ # Lines 1-4: Standard csv header (use the ResultBase method). algorithm_parameters, data_info, fs = ResultBase._read_csv_header(filepath)[1:] # Re-open the file and read the rest of the file, line by line. with open(filepath, 'r') as f: reader = csv.reader(f) # Lines 1-4: Standard csv header (already read, skip). [next(reader) for i in range(4)] # Line 5: Non-array data header (skip). assert(next(reader) == ['channel_labels', 'envelope.shape']) # Line 6: Non-array data. channel_labels, envelope_shape = [ convert_string_auto(i) for i in next(reader)] # Line 7: Array header (skip). assert(next(reader)[0] == 'envelope') # Line 8: Array data (flattened). envelope_as_list = [float(i) for i in next(reader)] # Convert to numpy array and reshape. envelope = np.reshape(envelope_as_list, envelope_shape) # Create a result object. result = EnvelopeResult(envelope=envelope, fs=fs, channel_labels=channel_labels, algorithm_parameters=algorithm_parameters, data_info=data_info) return result @staticmethod def _read_from_hdf5(filepath): """ Read result from hdf5 file into a EnvelopeResult class. Args: filepath (str): see ResultBase._read_from_hdf5(). Returns: result (nnsa.EnvelopeResult): instance of EnvelopeResult containing the Envelope result. """ # Read standard hdf5 header (use the ResultBase method). algorithm_parameters, data_info, segment_start_times, segment_end_times, fs, time_offset =\ ResultBase._read_hdf5_header(filepath)[1:] # Re-open the file and read the rest of the file. with h5py.File(filepath, 'r') as f: # Read array data. envelope = f['envelope'][:] # Read non-array data. channel_labels = [label.decode() for label in f['envelope'].attrs['channel_labels']] # Create a result object. result = EnvelopeResult(envelope=envelope, channel_labels=channel_labels, algorithm_parameters=algorithm_parameters, data_info=data_info, segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs) return result def _write_to_csv(self, filepath): """ Write the contents of the object to a csv file. Args: filepath (str): see ResultBase._write_to_csv(). """ # Lines 1-4: Standard csv header (use the ResultBase method). self._write_csv_header(filepath) # Append attributes to the csv file, line by line. with open(filepath, 'a', newline='') as csvfile: writer = csv.writer(csvfile) # Line 5: Non-array data header. writer.writerow(['channel_labels', 'envelope.shape']) # Line 6: Non-array data. writer.writerow([self.channel_labels, self.envelope.shape]) # Line 7: Array header. writer.writerow(['envelope']) # Line 8: Array data (flattened). writer.writerow(self.envelope.reshape(-1).tolist()) def _write_to_hdf5(self, filepath): """ Write the contents of the object to an hdf5 file. Args: filepath (str): see ResultBase._write_to_hdf5(). """ # Write standard hdf5 header (use the ResultBase method). self._write_hdf5_header(filepath) # Append attributes to the hdf5 file. with h5py.File(filepath, 'a') as f: # Write array data. f.create_dataset('envelope', data=self.envelope) # Write non-array data as attributes. # Convert strings to np.string_ type as recommended for compatibility. f['envelope'].attrs['channel_labels'] = [np.string_(label) for label in self.channel_labels]
def compute_bandpass_envelope(x, freq_limits, fs=1, order=4, axis=-1, verbose=1): """ Compute the envelope(s) of the x for certain frequency bands. First applies a bandpass filter and then computes the envelope using the Hilbert transform of the band-filtered signal. Args: x (np.ndarray): signal array. freq_limits (list, np.ndarray or dict): iterbale specifying the frequency bands. Has items consisting of cut-off frequencies, like [f_low, f_high]. Specify fs to put these in Hz, else they are normalized by the Nyqvist frequency. If a dict, the returned value is a dict as well with the same keys corresponding to the frequency bands. fs (float): sample frequency of x. Specify this to put freq_limits in Hz. order (int): order of the butterworth bandpass filter. axis (int): time axis of x. Returns: envelopes (np.ndarray or dict): envelopes. If an array has shape (len(freq_limits), ) + x.shape. If a dict (when freq_linits is a dict) has len(freq_limits) items. Examples: >>> fs = 100 >>> t = np.arange(1000)/fs >>> np.random.seed(43) >>> x = 5*np.sin(2*np.pi*10*t) + np.random.rand(len(t)) # 10 Hz sine >>> x += 10*np.sin(2*np.pi*40*t) + np.random.rand(len(t)) # 40 Hz sine >>> x.shape (1000,) >>> freq_limits = [[9, 11], [24, 26], [39, 41]] # 10, 25, 40 Hz center frequencies >>> env = compute_bandpass_envelope(x, freq_limits, fs=fs) >>> env.shape (3, 1000) >>> print(env.mean(axis=-1)) [4.9376538 0.09354586 9.82055459] """ x = np.asarray(x) # Interpolate nans. nan_mask = np.isnan(x) x = interp_nan(x, axis=axis) envelopes = [] # Parse inputs. if isinstance(freq_limits, dict): freq_keys = list(freq_limits.keys()) freq_limits = list(freq_limits.values()) else: freq_keys = None # Check if only one freq low, high pair is given. if np.size(np.asarray(freq_limits)) == 2 and np.asarray(freq_limits).ndim == 1: freq_limits = [freq_limits] # Initialize progress bar. bar = pyprind.ProgBar(len(freq_limits), stream=sys.stdout) for i, (flow, fhigh) in enumerate(freq_limits): # Bandpass filter. if flow is None or flow == -1: if fhigh is None or fhigh == -1: # No filter. xf = x else: b, a = scipy.signal.butter(N=order, Wn=fhigh, btype='low', fs=fs) xf = scipy.signal.filtfilt(b, a, x, axis=axis) else: if fhigh is None or fhigh == -1: b, a = scipy.signal.butter(N=order, Wn=flow, btype='high', fs=fs) xf = scipy.signal.filtfilt(b, a, x, axis=axis) else: b, a = scipy.signal.butter(N=order, Wn=[flow, fhigh], btype='bandpass', fs=fs) xf = scipy.signal.filtfilt(b, a, x, axis=axis) # Envelope computation. xa = hilbert_envelope(xf, axis=axis) # Put nans back. xa[nan_mask] = np.nan envelopes.append(xa) if verbose: bar.update() if freq_keys is not None: # To dict. envelopes = dict(zip(freq_keys, envelopes)) else: # To array. envelopes = np.vstack([np.expand_dims(xa, 0) for xa in envelopes]) return envelopes def compute_cwt_envelope(x, freqs, fs, **kwargs): """ Compute the envelope(s) of the x for certain frequency bands. Args: x (np.ndarray): signal array. A 1-D array or a 2D array with shape (n_channels, n_samples). freqs (iterable): iterbale specifying the center frequency of the wavelets in Hz. fs (float): sample frequency of x. **kwargs: arguments for pycwt.wavelet.cwt(). Returns: envelopes (np.ndarray): envelopes. Has shape (len(freqs), ) + x.shape. Examples: >>> fs = 100 >>> t = np.arange(1000)/fs >>> np.random.seed(43) >>> x = 5*np.sin(2*np.pi*10*t) + np.random.rand(len(t)) # 10 Hz sine >>> x += 10*np.sin(2*np.pi*40*t) + np.random.rand(len(t)) # 40 Hz sine >>> x.shape (1000,) >>> freqs = [10, 25, 40] # 10, 25, 40 Hz center frequencies >>> env = compute_cwt_envelope(x, freqs, fs=fs) >>> env.shape (3, 1000) >>> print(env.mean(axis=-1)) [14.53283559 0.35320805 14.53766499] """ # Check input. x = np.asarray(x) if x.ndim == 1: x = [x] elif x.ndim == 2 and x.shape[0] < x.shape[1]: pass else: raise ValueError('Invalid input shape {}. Must be 1D or 2D with shape (n_channels, n_samples).' .format(x.shape)) # Wavelet transform. cwt_kwargs = dict({ 'wavelet': DEFAULT_WAVELET, 'freqs': np.asarray(freqs), }, **kwargs) def fun(a): # Interpolate nans. nan_mask = np.isnan(a) a = interp_nan(a) coefs, scales, freqs, coi, fft, _ = pycwt.wavelet.cwt(a, 1 / fs, **cwt_kwargs) C = np.abs(coefs) # Put nans back. C[:, nan_mask] = np.nan return C # Call function one each 1D array, operating along `axis` and collect in an array. envelopes = np.array(list(map(fun, x))).swapaxes(0, 1) if isinstance(x, list): # 1D array passed as input, extract single channel. envelopes = envelopes[:, 0, :] return envelopes
[docs]def hilbert_envelope(x, nfft=None, axis=-1): """ Compute the envelope of the signals in x using the Hilbert transform. Notes: The envelope may suffer from boundary effects. Args: x (np.ndarray): signal data. nfft (int, optional): number of Fourier components. If None, uses the next power of 2 from x.shape[axis] for optimal speed. Defaults to None. axis (int, optional): the axis along which to compute the envelope. Defaults to -1. Returns: envelope (np.ndarray): array with same shape as `x` containing the envelopes of the signals in `x`. """ if nfft is None: # Optimally, N equals the length of the input signal x. However, this may be slow. Taking the next power of 2 # for this N makes the hilbert function a lot faster (due to fast Fourier transform algorithm). # If N > x.shape[axis], the hilbert() function zero-pads the signal at the end (on the right side only). # We will remove these padded samples afterwards. nfft = 2 ** (nextpow2(x.shape[axis])) elif nfft < x.shape[axis]: msg = '\nnfft (={}) is smaller than the length of x (={}).'.format(nfft, x.shape[axis]) warnings.warn(msg) # Convert the real signal to an analytical (complex) signal using the Hilbert transform. x_analytical = scipy.signal.hilbert(x, N=nfft, axis=axis) # The envelope can be computed as the magnitude of the complex signal. envelope = np.abs(x_analytical) # Exclude the last samples that were added due to zero-padding in the hilbert() function if N > x.shape[axis]. n_pad = nfft - x.shape[axis] if n_pad > 0: slc = [slice(None)] * len(x.shape) slc[axis] = slice(None, -n_pad) envelope = envelope[tuple(slc)] return envelope
[docs]def power_envelope(x, n_window=100, axis=-1): """ Compute the envelope of the signals in x derived from average signal power in small windows. Based on the envelope approximation used by Jennekens et al. 2011. References: Jennekens W , Ruijs LS , Lommen CML , Niemarkt HJ , Pasman JW , van Kranen–Mastenbroek VM , et al. Automatic burst detection for the EEG of the preterm infant. Physiol Meas 2011;32(10):1623–37 . Args: x (np.ndarray): signal data. n_window (int, optional): the number of samples in the averaging window. Defaults to 100. axis (int, optional): the axis along which to compute the envelope. Defaults to -1. Returns: envelope (np.ndarray): array with same shape as `x` containing the envelopes of the signals in `x`. """ # Envelope is the square root of twice the average power in a small window at each sample. # Create the convolution kernel to compute the running average. kernel_shape = np.ones(x.ndim, dtype=int) kernel_shape[axis] = n_window kernel = 2 * np.ones(kernel_shape) / n_window # Note the factor 2 (we need twice the average power). # Compute the average power. avg_power = scipy.signal.convolve(x**2, kernel, mode='same') # Compute the envelope. # Note: use clip, since convolutions of zeros may lead to very small negative numbers due to amplified # machine precision problems. envelope = np.sqrt(np.clip(avg_power, a_min=0, a_max=np.inf)) return envelope
def power_segmented_envelope(x, fs, segment_length, overlap=0, artefact_criteria=None, axis=-1): """ Compute the envelope of the signals in x derived from average signal power per segments. Based on the envelope approximation used by Jennekens et al. 2011. References: Jennekens W , Ruijs LS , Lommen CML , Niemarkt HJ , Pasman JW , van Kranen–Mastenbroek VM , et al. Automatic burst detection for the EEG of the preterm infant. Physiol Meas 2011;32(10):1623–37. Args: x (np.ndarray): signal data. fs (float): sample frequency of the signal. segment_length (float): the segment length of the averaging window in seconds. overlap (float, optional): segment overlap in seconds. Defaults to 0. artefact_criteria (dict, optional): keyword arguments for nnsa.detect_artefact_signals() that operates on the segments. Artefact segments are indicated by NaN in the output array. If None, no artefact detection is done. Default to None. axis (int, optional): the axis along which to compute the envelope. Defaults to -1. Returns: envelope (np.ndarray): array with same dimensions as `x` containing the envelopes of the signals in `x`. The size of the time dimension (the `axis`) will be shorter due to segmentation. fs_env (float): sample frequency of `envelope`. """ # Segment as a large array. x = np.asarray(x) x_segmented = get_all_segments(x**2, segment_length=segment_length, overlap=overlap, fs=fs, axis=axis) # Swap axis, the segment axis will become the output's time axis. if axis < 0: # Convert negative axis to positive. axis += x.ndim x_segmented = x_segmented.swapaxes(0, axis+1) # Determine the sample frequency of the segments. fs_env = 1/(segment_length - overlap) # Envelope is the square root of twice the average power in each segment. envelope = np.sqrt(2*np.nanmean(x_segmented, axis=0)) # Channel exclusion mask. if artefact_criteria is not None: artefact_mask = detect_artefact_signals(x_segmented, axis=0, demean=False, **artefact_criteria) # Replace artefacts with NaN. envelope[artefact_mask] = np.nan return envelope, fs_env # def power_segmented_envelope_2(x, fs, segment_length, overlap=0, artefact_criteria=None, axis=-1): # """ # Compute the envelope of the signals in x derived from average signal power per segments. # # Based on the envelope approximation used by Jennekens et al. 2011. # # References: # Jennekens W , Ruijs LS , Lommen CML , Niemarkt HJ , Pasman JW , van Kranen–Mastenbroek VM , et al. # Automatic burst detection for the EEG of the preterm infant. # Physiol Meas 2011;32(10):1623–37. # # Args: # x (np.ndarray): signal data. # fs (float): sample frequency of the signal. # segment_length (float): the segment length of the averaging window in seconds. # overlap (float, optional): segment overlap in seconds. # Defaults to 0. # artefact_criteria (dict, optional): keyword arguments for nnsa.detect_artefact_signals() # that operates on the segments. Artefact segments are indicated by NaN in the output array. # If None, no artefact detection is done. # Default to None. # axis (int, optional): the axis along which to compute the envelope. # Defaults to -1. # # Returns: # envelope (np.ndarray): array with same dimensions as `x` containing the envelopes of # the signals in `x`. The size of the time dimension (the `axis`) will be shorter due to segmentation. # fs_env (float): sample frequency of `envelope`. # """ # if axis < 0: # # Convert negative axis to positive. # axis += x.ndim # # # Determine the sample frequency of the segments. # fs_env = 1/(segment_length - overlap) # # # Segment as a large array. # seg_generator = segment_generator(x, segment_length=segment_length, # overlap=overlap, fs=fs, axis=axis) # # # Loop over segments. # envelope = [] # for seg in seg_generator: # # Envelope is the square root of twice the average power in each segment. # env = np.sqrt(2 * np.nanmean(seg ** 2, axis=axis, keepdims=True)) # envelope.append(env) # # # Collect in array. # envelope = np.concatenate(envelope, axis=axis) # # # Replace zeros with NaN (nanmean of all-nan slice). # envelope[envelope == 0] = np.nan # # return envelope, fs_env