Source code for nnsa.feature_extraction.fractality

"""
Algorithms related to fractal analysis.
"""
import copy
import csv
import sys
import warnings

import h5py
import pyprind
import numpy as np
import pandas as pd

from nnsa.artefacts.artefact_detection import default_eeg_signal_quality_criteria, detect_artefact_signals
from nnsa.feature_extraction.common import check_multichannel_data_matrix, get_channel_index, \
    preprocess_segment, local_to_global_features, aggregate_channel_features, print_init_message
from nnsa.feature_extraction.result import ResultBase
from nnsa.parameters.parameters import ClassWithParameters, Parameters
from nnsa.utils.arrays import histogram_features_per_channel, moving_average
from nnsa.utils.conversions import convert_time_scale
from nnsa.utils.segmentation import compute_n_segments, segment_generator, get_all_segments
from nnsa.utils.config import HORIZONTAL_RULE
from nnsa.utils.other import enumerate_label, convert_string_auto
import scipy.signal

__all__ = [
    'LineLength',
    'LineLengthResult',
    'MultifractalAnalysis',
    'MultifractalAnalysisResult',
    'buid_q_log',
    'histogram_features_lll',
    'wfbmesti',
]


[docs]class LineLength(ClassWithParameters): """ Class for computing the Line Length (LL) feature. Reference: Line Length implemented as presented by Koolen et al.: @Article{KOOLEN2014, author = {Ninah Koolen and Katrien Jansen and Jan Vervisch and Vladimir Matic and Maarten De Vos and Gunnar Naulaers and Sabine Van Huffel}, title = {Line length as a robust method to detect high-activity events: Automated burst detection in premature {EEG} recordings}, journal = {Clinical Neurophysiology}, year = {2014}, volume = {125}, number = {10}, pages = {1985--1994}, month = {oct}, doi = {10.1016/j.clinph.2014.02.015}, publisher = {Elsevier {BV}}, } Main method: line_length(): Args: see nnsa.ClassWithParameters. Examples: >>> np.random.seed(0) >>> x = np.random.rand(10, 10000) >>> line_length = LineLength() >>> print(type(line_length.parameters).__name__) Parameters >>> result = line_length.line_length(x, fs=25, verbose=0) >>> print(type(result).__name__) LineLengthResult >>> result.line_length[2, 3] 0.005513363795332106 """
[docs] @staticmethod def default_parameters(): """ Return the default parameters. Returns: (nnsa.Parameters): a default set of parameters for the object. """ # Parameters for segmentation of EEG recording into small time segments/epochs. segmentation = Parameters(**{ # Segment length in seconds: 'segment_length': 1, # Overlap between segments in seconds: 'overlap': 0.12, }) # Parameters for artefact detection/exclusion, see # nnsa.artefacts.artefact_detection.default_eeg_signal_quality_criteria() artefact_criteria = None # Parameters for Line Length feature. line_length = Parameters(**{ # Normalization method for the line length per segment per channel. Choose from: # None: no normalization; # 'sum_segments': normalize by dividing LL by the sum of all LL values over all segments (per channel); # 'mean_segments': normalize by dividing LL by the mean of all LL values over all segments (per channel). 'normalization_kind': 'mean_segments', # Normalization window in seconds, i.e. the length of the window (in seconds) in which the line lengths # are normalized. If None, the window spans all segments (i.e. normalize the whole). Using small windows, # then the normalized LL may become more biased. Is 2.5 minutes according to Koolen et al., # Data-driven metric representing the maturation of preterm EEG, 2015. 'normalization_window': None, # Specify if you want a moving window (True) or a fixed window (False) for normalization # (using a moving window may be more elegant, but may be slower than using a fixed window; # not applicable if normalization_window is None): 'normalize_in_moving_window': False, }) pars = { 'segmentation': segmentation, 'artefact_criteria': artefact_criteria, 'line_length': line_length } return Parameters(**pars)
[docs] def process(self, *args, **kwargs): """ Alias of main method. """ return self.line_length(*args, **kwargs)
[docs] def line_length(self, data_matrix, fs, channel_labels=None, verbose=1): """ Perform the computation of the Line Lenght feature on multichannel data. Args: data_matrix (np.ndarray): see check_multichannel_data_matrix(). Koolen et al. originally used EEG signals bandpass filtered between 1 - 20 Hz. fs (float): sample frequency of the EEG signals. For the line length, the signals should typically be sampled at 250 Hz. If another frequency is used, a warning is raised. channel_labels (list of str, optional): see check_multichannel_data_matrix(). verbose (int, optional): verbose level. Defaults to 1. Returns: (nnsa.LineLengthResult): object containing the LL result per segment per channel. """ # Check input. data_matrix, channel_labels = check_multichannel_data_matrix(data_matrix, channel_labels) if verbose > 0: print(HORIZONTAL_RULE) print('Computing Line Length with parameters:') print(self.parameters) # Extract some parameters. seg_pars = self.parameters['segmentation'] artefact_criteria = self.parameters['artefact_criteria'] line_length_pars = self.parameters['line_length'] normalization_kind = line_length_pars['normalization_kind'] size_gb = np.prod(data_matrix.shape)/1e9*64/8 if size_gb < 6: # Faster method, but requires more memory. # Collect all segments in one array with dimensions (segments, channels, time). data_tensor = get_all_segments(data_matrix, segment_length=seg_pars['segment_length'], overlap=seg_pars['overlap'], fs=fs, axis=-1) # Compute running sum of absolute differences (LL) with dimensions (segments, channels), (Eq. 1). line_length = np.nansum(np.abs(np.diff(data_tensor, axis=-1)), axis=-1) if artefact_criteria is not None: # Channel exclusion mask with dimensions (segments, channels). artefact_mask = detect_artefact_signals(data_tensor, axis=-1, demean=True, **artefact_criteria) # Replace artefact segments with NaN. line_length[artefact_mask] = np.nan else: # Slower method (~3 times) but uses less memory. # Segment the data in the time axis (create a generator). seg_generator = segment_generator(data_matrix, segment_length=seg_pars['segment_length'], overlap=seg_pars['overlap'], fs=fs, axis=-1) line_length = [] for seg in seg_generator: ll = np.nansum(np.abs(np.diff(seg, axis=-1)), axis=-1) if artefact_criteria is not None: nan_mask = detect_artefact_signals(seg, axis=-1, demean=True, **artefact_criteria) ll[nan_mask] = np.nan line_length.append(ll) line_length = np.vstack(line_length) # Transpose such that dimensions are (channels, segments). line_length = np.transpose(line_length) # Normalize (Eq. 2). if normalization_kind is not None: line_length = self._normalize(line_length) result = LineLengthResult(line_length=line_length, algorithm_parameters=self.parameters, channel_labels=channel_labels) return result
def _normalize(self, line_length): """ Normalize the line_length according to the normalization parameters. Args: line_length (np.ndarray): array with line lengths to normalize, with dimensions (channels, segments). Returns: line_length_normalized (np.ndarray): normalized line lengths with dimensions (channels, segments). """ line_length_pars = self.parameters['line_length'] normalization_window = line_length_pars['normalization_window'] normalization_kind = line_length_pars['normalization_kind'].lower() if normalization_window is None: # Use all segments for normalization. line_length_avg = np.nanmean(line_length, axis=-1, keepdims=True) num_segments_in_win = line_length.shape[-1] else: # Normalize per window. normalize_in_moving_window = line_length_pars['normalize_in_moving_window'] if normalize_in_moving_window: line_length_avg, num_segments_in_win = self._average_movingwin(line_length) else: line_length_avg, num_segments_in_win = self._average_fixedwin(line_length) if normalization_kind in ['mean_segments', 'average_segments']: # Use average for normalization. line_length_norm = line_length_avg elif normalization_kind == 'sum_segments': # Infer sum from mean to deal with NaN values in line_length (in essence we replace the NaN # values with the mean of the rest). line_length_norm = line_length_avg * num_segments_in_win else: raise ValueError('Invalid parameters normalization_kind={}. Choose from {}.' .format(normalization_kind, ['mean_segments', 'sum_segments'])) # Normalize. line_length_normalized = line_length / line_length_norm # Check if the values are not too small when using the sum. if normalization_kind == 'sum_segments': avg = np.nanmean(line_length_normalized) if avg < 1e-10: msg = ('Line length values are very small after normalization (mean = {}). ' 'Consider setting `normalization_kind` parameter to "mean_segments".' .format(avg)) warnings.warn(msg) return line_length_normalized def _average_fixedwin(self, line_length): """ Compute the average line length in fixed, non-overlapping windows. First divides the line_length in windows and then computes the average per window (dealing with nans). This can be used for normalization. Args: line_length (np.ndarray): array with line lengths, with dimensions (channels, segments). Returns: line_length_avg (np.ndarray): (local) average line length, with same shape as `line_length`. num_segments_in_win (int): the number of segments used per window for computing the average. Use this e.g. to convert the average to a sum. """ seg_pars = self.parameters['segmentation'] line_length_pars = self.parameters['line_length'] normalization_window = line_length_pars['normalization_window'] num_channels, num_segments = line_length.shape # Reshape to (channels, windows, segments), with 1 window consists of `normalization_window` seconds. num_segments_in_win = int(np.round(normalization_window / (seg_pars['segment_length'] - seg_pars['overlap']))) num_windows = int(np.floor(num_segments / num_segments_in_win)) line_length_reshaped = line_length[:, :num_windows*num_segments_in_win].reshape( (num_channels, num_windows, num_segments_in_win)) # Compute mean LL per epoch. line_length_avg_per_win = np.nanmean(line_length_reshaped, axis=-1, keepdims=True) # Reshape so that line_length_avg matches the shapes of line_length. line_length_avg_repeat = np.repeat(line_length_avg_per_win, num_segments_in_win, axis=-1) line_length_reshaped = line_length_avg_repeat.reshape(num_channels, num_windows*num_segments_in_win) # Add nans to match the shape of `line_length` (for segments falling outside of the last window). nan_fill = np.full((num_channels, num_segments - num_windows*num_segments_in_win), fill_value=np.nan) line_length_avg = np.concatenate((line_length_reshaped, nan_fill), axis=-1) return line_length_avg, num_segments_in_win def _average_movingwin(self, line_length): """ Compute the average line length in a moving window. Uses a moving average to compute the average line length per segment (dealing with nans). This can be used for normalization. Args: line_length (np.ndarray): array with line lengths, with dimensions (channels, segments). Returns: line_length_avg (np.ndarray): (local) average line length, with same shape as `line_length`. num_segments_in_win (int): the number of segments used per window for computing the average. Use this e.g. to convert the average to a sum. """ seg_pars = self.parameters['segmentation'] line_length_pars = self.parameters['line_length'] normalization_window = line_length_pars['normalization_window'] # Compute moving average in specified window length that deals with np.nan. num_segments_in_win = int(np.round(normalization_window / (seg_pars['segment_length'] - seg_pars['overlap']))) if num_segments_in_win >= line_length.shape[-1]: # Use mean of entire signal if averaging window is larger than signal length. line_length_avg = np.nanmean(line_length, axis=-1, keepdims=True) else: # Use moving average if averaging window is smaller than signal length. line_length_avg = moving_average(line_length, n=num_segments_in_win, axis=-1)[0] return line_length_avg, num_segments_in_win
[docs]class LineLengthResult(ResultBase): """ High-level interface for processing the results of line length computation as created by nnsa.LineLength(). Args: line_length (np.ndarray): array with shape (channels, segments) containing the (normalized) line lengths. channel_labels (list of str, optional): labels of the channels corresponding to the channel dimensions of the arrays. If None, default labels will be created. Defaults to None. data_info (str, optional): see ResultBase. segment_start_times (np.ndarray, optional): see ResultBase. segment_end_times (np.ndarray, optional): see ResultBase. fs (flaot, optional): see ResultBase. **kwargs: for ResultBase. """ def __init__(self, line_length, algorithm_parameters, channel_labels=None, data_info=None, segment_start_times=None, segment_end_times=None, fs=None, **kwargs): super().__init__(algorithm_parameters=algorithm_parameters, data_info=data_info, segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs, **kwargs) # Input check. n_channels, n_segments = line_length.shape if channel_labels is None: channel_labels = enumerate_label(n_channels, label='Channel') elif len(channel_labels) != n_channels: raise ValueError('Length of channel_labels ({}) does not correspond to the shape of the data {}.' .format(len(channel_labels), line_length.shape)) # Store variables that are not already stored by the parent class (ResultBase). self.line_length = line_length self.channel_labels = channel_labels if self.is_discontinuous(): raise ValueError('Data is discontinuous. This is invalid for the {} class.'.format(self.__class__.__name__)) @property def median_line_length(self): """ Retrun the median of the line lenghts across channels. Returns: (np.ndarray): median line length per segment.. """ return np.nanmedian(self.line_length, axis=0) @property def num_segments(self): """ Return the number of segments. Returns: (int): number of segments. """ return self.line_length.shape[-1]
[docs] def extract_channels(self, channels, inplace=False): if not inplace: res = copy.deepcopy(self) else: res = self if channels is not None: idx_channels = get_channel_index(res.channel_labels, channels) res.line_length = res.line_length[idx_channels, :] res.channel_labels = [res.channel_labels[idx] for idx in idx_channels] if not inplace: return res
[docs] def extract_global_features(self, aggregate_channels=aggregate_channel_features, sleep_stages=None): """ Extract features that characterize the entire recording. Args: aggregate_channels (function or None, optional): function that takes an array of channel features and returns one aggregate values. If None, the channels are not aggregated, i.e. the feature values each channel are returned per channel. Defaults to nnsa.aggregate_channel_features. sleep_stages (nnsa.SleepStagesResult, optional): object containing sleep stages result. Used to report global feature values per sleep stage. If None, no sleep stage dependent feature values are returned. Defaults to None. Returns: global_features (dict): dictionary containing the feature name and value pairs. """ global_features = dict() line_length = self.line_length features_df_all = dict() if sleep_stages is None: # Combine all segments to get global features (do not distinguish sleep stages). # Just use all data. sleep_label = 'ALL' features_df = histogram_features_lll(self.line_length, self.channel_labels, ignore_nan=True) features_df_all[sleep_label] = features_df else: # Combine all segments to get global features (distinguish sleep stages). # Get the sleep labels (class numbers) of the segments. segment_labels = sleep_stages.segment_labels(self.segment_start_times, self.segment_end_times) # Exclude np.nan values. sleep_mask = ~np.isnan(segment_labels) # Use all sleep stages to capture the entire recording. sleep_label = 'ALL' features_df = histogram_features_lll(line_length[:, sleep_mask], self.channel_labels, ignore_nan=True) features_df_all[sleep_label] = features_df # Loop over sleep stages. for sleep_label, label_number in sleep_stages.class_mapping.items(): if np.isnan(label_number): # Skip NaNs (no_label segments). continue # Combine the segments per sleep stage. sleep_mask = segment_labels == label_number features_df = histogram_features_lll(line_length[:, sleep_mask], self.channel_labels, ignore_nan=True) features_df_all[sleep_label] = features_df # For each feature, combine the channel data as requested. # Loop over sleep stages. for sleep_label, features_df in features_df_all.items(): # Loop over the computed features. feature_labels_all = features_df.keys() for feature_label in feature_labels_all: if aggregate_channels is None: # Store the features per channel. for channel_label, value in features_df[feature_label].items(): global_features['LL_{}_{}_{}'.format( feature_label, channel_label.lstrip('EEG '), sleep_label)] = value else: # Aggregate the features of the channels. channel_label = 'AGG' channel_features = features_df[feature_label].values value = aggregate_channels(channel_features) global_features['LL_{}_{}_{}'.format( feature_label, channel_label.lstrip('EEG '), sleep_label)] = value return global_features
[docs] def histogram_features(self, **kwargs): """ Compute the histogram features from the log LL distribution. Wrapper of histogram_features_lll() function. Args: **kwargs (optional): keyword arguments for histogram_features_lll(). Returns: see histogram_features_lll(). """ return histogram_features_lll(self.line_length, self.channel_labels, **kwargs)
[docs] def plot(self, *args, channels='median', time_scale='seconds', log_transform=False, ax=None, **kwargs): """ Plot the line length as function of time (segments). Args: * args (optional): positional arguments for plt.plot. channels (list of str, optional): a list of labels that specify which channels to plot. Each channel gets its own color in the plot. if 'median', the median line length across channels is plotted. If None, all channels are plotted. Defaults to 'median'. time_scale (str, optional): the time scale to use. Choose from 'seconds', 'minutes', 'hours'. Defaults to 'seconds'. log_transform (bool, optional): if True, do a log10 transform on the LL. Defaults to False. ax (plt.Axes, optional): axes to plot in. If None, plots in current axes. **kwargs (optional): keyword arguments for plt.plot. """ import matplotlib.pyplot as plt if ax is not None: plt.sca(ax) else: ax = plt.gca() if channels is None: # Plot all channels. channels = self.channel_labels elif not isinstance(channels, list): channels = [channels] # The segment times in seconds. time = self.segment_times # Convert time scale to requested scale. time = convert_time_scale(time, time_scale) # Loop over channels. for chan in channels: if chan == 'median': y = self.median_line_length else: idx = get_channel_index(self.channel_labels, chan) y = self.line_length[idx, :] if log_transform: y = np.log10(y) plot_kwargs = {'label': chan} plot_kwargs.update(kwargs) plt.plot(time, y, *args, **plot_kwargs) # Figure make-up. plt.title('Log '*log_transform + 'Line Length') plt.ylabel('Log '*log_transform + 'Line length (-)') plt.xlabel('Time ({})'.format(time_scale)) plt.legend()
[docs] def to_suppression_curve(self, window_length=150, **kwargs): """ Compute the suppression curve, as derived from the line length (Dereymaeker et al. 2015). Args: window_length (float): the window length in seconds for the computation of the suppression curve. **kwargs (optional): optional keyword arguments for SuppressionCurve(). Returns: suppression (np.array): suppression values, with a sample frequency 1/window_length. """ # Local import since discontinuity module imports from this module. from nnsa.feature_extraction.discontinuity import SuppressionCurve if self.is_discontinuous(): raise NotImplementedError('Not implemented for discontinuous line length object.') # Take the median over the channels (Eq. 3 in the paper). me_LL = np.nanmedian(self.line_length, axis=0) seg_pars = self.algorithm_parameters['segmentation'] num_segments = len(me_LL) # Reshape to (channels, windows, segments), where 1 window consists of `window_size` seconds. num_segments_in_win = int(np.round(window_length / (seg_pars['segment_length'] - seg_pars['overlap']))) num_windows = int(np.floor(num_segments / num_segments_in_win)) line_length_reshaped = me_LL[:num_windows * num_segments_in_win].reshape( (num_windows, num_segments_in_win)) # Compute mean, median and ratio (suppression value) in each window. line_length_means = np.nanmean(line_length_reshaped, axis=-1) line_length_medians = np.nanmedian(line_length_reshaped, axis=-1) suppression = 1 - line_length_medians / (line_length_means + 1e-15) return SuppressionCurve(suppression=suppression, window_length=window_length, time_offset=self.time_offset, **kwargs)
[docs] def to_time_series(self, channel='median', **kwargs): from nnsa.containers.time_series import TimeSeries if channel == 'median': signal = self.median_line_length else: idx = get_channel_index(self.channel_labels, channel) signal = self.line_length[idx, :] ts_kwargs = {'label': f'{channel} LL'} ts_kwargs.update(kwargs) ts = TimeSeries(signal=signal, fs=self.fs, time_offset=self.segment_times[0], **ts_kwargs) return ts
def _extract_epoch(self, mask): """ See ResultBase. """ self.line_length = self.line_length[:, mask] def _merge(self, other, index=None): """ See ResultBase. """ # Check if the channel labels of self and other are the same. if self.channel_labels != other.channel_labels: raise ValueError('Cannot merge results with different channel labels.') if index is not None: n_channels, n_samples = self.line_length.shape if index < n_samples: # Cut piece off. msg = 'Overwriting data while merging.' warnings.warn(msg) self.line_length = self.line_length[:, :index] else: # Add nans. self.line_length = np.concatenate([self.line_length, np.full((n_channels, index-n_samples), fill_value=np.nan)], axis=-1) # Merge. self.line_length = np.concatenate([self.line_length, other.line_length], axis=-1) @staticmethod def _read_from_csv(filepath): """ Read result from csv file into a LineLengthResult class. Args: filepath (str): see ResultBase._read_from_csv(). Returns: result (nnsa.LineLengthResult): instance of LineLengthResult containing the LineLength result. """ # Lines 1-4: Standard csv header (use the ResultBase method). algorithm_parameters, data_info, fs = ResultBase._read_csv_header(filepath)[1:] # Re-open the file and read the rest of the file, line by line. with open(filepath, 'r') as f: reader = csv.reader(f) # Lines 1-4: Standard csv header (already read, skip). [next(reader) for i in range(4)] # Line 5: Non-array data header (skip). assert(next(reader) == ['channel_labels', 'line_length.shape']) # Line 6: Non-array data. channel_labels, line_length_shape = [ convert_string_auto(i) for i in next(reader)] # Line 7: Array header (skip). assert(next(reader)[0] == 'line_length') # Line 8: Array data (flattened). line_length_as_list = [float(i) for i in next(reader)] # Convert to numpy array and reshape. line_length = np.reshape(line_length_as_list, line_length_shape) # Create a result object. result = LineLengthResult(line_length=line_length, channel_labels=channel_labels, algorithm_parameters=algorithm_parameters, data_info=data_info, fs=fs) return result @staticmethod def _read_from_hdf5(filepath): """ Read result from hdf5 file into a LineLengthResult class. Args: filepath (str): see ResultBase._read_from_csv(). Returns: result (nnsa.LineLengthResult): instance of LineLengthResult containing the LineLength result. """ # Read standard hdf5 header (use the ResultBase method). algorithm_parameters, data_info, segment_start_times, segment_end_times, fs, time_offset =\ ResultBase._read_hdf5_header(filepath)[1:] # Re-open the file and read the rest of the file. with h5py.File(filepath, 'r') as f: # Read array data. line_length = f['line_length'][:] # Read non-array data. channel_labels = [label.decode() for label in f['line_length'].attrs['channel_labels']] # Create a result object. result = LineLengthResult(line_length=line_length, channel_labels=channel_labels, algorithm_parameters=algorithm_parameters, data_info=data_info, segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs) return result def _write_to_csv(self, filepath): """ Write the contents of the object to a csv file. Args: filepath (str): see ResultBase._write_to_csv(). """ # Lines 1-4: Standard csv header (use the ResultBase method). self._write_csv_header(filepath) # Append attributes to the csv file, line by line. with open(filepath, 'a', newline='') as csvfile: writer = csv.writer(csvfile) # Line 5: Non-array data header. writer.writerow(['channel_labels', 'line_length.shape']) # Line 6: Non-array data. writer.writerow([self.channel_labels, self.line_length.shape]) # Line 7: Array header. writer.writerow(['line_length']) # Line 8: Array data (flattened). writer.writerow(self.line_length.reshape(-1).tolist()) def _write_to_hdf5(self, filepath): """ Write the contents of the object to an hdf5 file. Args: filepath (str): see ResultBase._write_to_hdf5(). """ # Write standard hdf5 header (use the ResultBase method). self._write_hdf5_header(filepath) # Append attributes to the hdf5 file. with h5py.File(filepath, 'a') as f: # Write array data. f.create_dataset('line_length', data=self.line_length) # Write non-array data as attributes. # Convert strings to np.string_ type as recommended for compatibility. f['line_length'].attrs['channel_labels'] = [np.string_(label) for label in self.channel_labels]
[docs]class MultifractalAnalysis(ClassWithParameters): """ Class for performing a multifractal analysis. References: This code was partly ported from Mario Lavanga and Ofelie de Wel's MATLAB code and calls some MATLAB functions from the Wavelet p-Leader and Bootstrap based MultiFractal analysis (WLBMF) MATLAB toolbox, described in: H. Wendt, P.Abry, and S.Jaffard, “Bootstrap for Multifractal Analysis” IEEE SignalProcessingMagazine, vol. 24, no. 4, pp. 38–48, 2007. Main method: multifractal_analysis() Notes: The following project claims to have implemented the MATLAB toolbox in Python (I did not check it out): https://github.com/omardrwch/mfanalysis/blob/master/mfanalysis/mfa.py Args: eng (matlab.engine.matlabengine.MatlabEngine, optional): MATLAB engine to use for calling MATLAB functions. The required paths must already have been initialized (see self.eng()). If None, a new MATLAB engine will be started (which takes some time). Defaults to None. **kwargs (optional): see nnsa.ClassWithParameters. Examples: >>> np.random.seed(0) >>> x = np.random.rand(2, 2500) >>> mfa = MultifractalAnalysis() >>> print(type(mfa.parameters).__name__) Parameters >>> mfa_result = mfa.multifractal_analysis(x, fs=25, verbose=0) Starting MATLAB engine... Setting up the path... >>> print(type(mfa_result).__name__) MultifractalAnalysisResult >>> mfa_result.cp[0, 0, 0] 0.5654653145233981 """ def __init__(self, eng=None, **kwargs): super().__init__(**kwargs) self._eng = eng @property def eng(self): """ Return a MATLAB engine. Returns: (matlab.engine.matlabengine.MatlabEngine): MATLAB engine. """ from nnsa.matlab.utils import get_matlab_engine if self._eng is None: # Start Matlab engine and set paths. self._eng = get_matlab_engine() return self._eng
[docs] @staticmethod def default_parameters(): """ Return the default parameters. Returns: (nnsa.Parameters): a default set of parameters for the object. """ # Parameters for segmentation of EEG recording into small time segments/epochs. segmentation = Parameters(**{ # Segment length in seconds: 'segment_length': 50, # Overlap in segments in seconds: 'overlap': 0, }) # Parameters for artefact detection/exclusion, see # nnsa.artefacts.artefact_detection.default_eeg_signal_quality_criteria() artefact_criteria = Parameters(**default_eeg_signal_quality_criteria()) # Specify a filter for filtering the segments, see nnsa.preprocessing.filter.Filter(). # Specify None to not filter the segments. segment_filter_specification = None # Parameters for multifractal analysis. mfa = Parameters(**{ # Resampling frequency for the power in the delta frequency band: 'fs_rr': 8, # Smallest scale analysis: 'j1': 3, # Largest scale analysis: 'j2': 15, # Number of (positive) q values that will be used (see build_q_log()): 'nq': 50 }) pars = { 'segmentation': segmentation, 'artefact_criteria': artefact_criteria, 'segment_filter_specification': segment_filter_specification, 'mfa': mfa, } return Parameters(**pars)
[docs] def multifractal_analysis(self, data_matrix, fs, channel_labels=None, verbose=1): """ Perform the multifractal (MF) analysis on multichannel data. Analysis pipeline ported from MATLAB code designed by Ofelie De Wel and Mario Lavanga. Pipeline constsist of 4 steps: 1) Segmentation 2) Optional filtering 3) Artefact exclusion 4) MF feature computation Reference: Lavanga M., De Wel O., Caicedo A., Heremans E., Jansen K., Dereymaeker A., Naulaers G., Van Huffel S. (2017), Automatic quiet sleep detection based on multifractality in preterm neonates: effects of maturation. Proc. of the 39th Annual International Conference of the IEEE Engineering in Medicine and Biology Society of the IEEE (EMBC 2017) Jeju Island, South Korea, Jul. 2017, pp. 2010-2013 Args: data_matrix (np.ndarray): see check_multichannel_data_matrix(). fs (float): sample frequency of the EEG signals. channel_labels (list of str, optional): see check_multichannel_data_matrix(). verbose (int, optional): verbose level. Defaults to 1. Returns: (nnsa.MultifractalAnalysisResult): object containing the MF result per segment and per channel. """ # Check input. data_matrix, channel_labels = check_multichannel_data_matrix(data_matrix, channel_labels) if verbose > 0: print_init_message(self) # Extract some parameters. seg_pars = self.parameters['segmentation'] filter_specification = self.parameters['segment_filter_specification'] artefact_criteria = self.parameters['artefact_criteria'] mfa_pars = self.parameters['mfa'] fs_rr = mfa_pars['fs_rr'] j1 = mfa_pars['j1'] j2 = mfa_pars['j2'] nq = mfa_pars['nq'] n_channels = data_matrix.shape[0] dtype = data_matrix.dtype # Segment the data in the time axis (create a generator). seg_generator = segment_generator(data_matrix, segment_length=seg_pars['segment_length'], overlap=seg_pars['overlap'], fs=fs, axis=-1) n_segments = compute_n_segments(data_matrix, segment_length=seg_pars['segment_length'], overlap=seg_pars['overlap'], fs=fs, axis=-1) # Initialize progress bar. bar = pyprind.ProgBar(n_segments, stream=sys.stdout) # Initialize output arrays. n_q_tot = len(buid_q_log(n=nq)) n_cum = 3 # Number of log-cumulants computed in MATLAB code. zq_tensor = np.empty((n_q_tot, n_channels, n_segments), dtype=dtype) hq_tensor = np.empty_like(zq_tensor) dq_tensor = np.empty_like(zq_tensor) cp_tensor = np.empty((n_cum, n_channels, n_segments), dtype=dtype) hmin_tensor = np.empty((n_channels, n_segments), dtype=dtype) hmax_tensor = np.empty_like(hmin_tensor) # Loop over segments. for i_seg, seg in enumerate(seg_generator): # Preprocess segment (demean, optionally filter, find channels to exclude). seg, exclude_mask = preprocess_segment(seg, fs, filter_specification=filter_specification, artefact_criteria=artefact_criteria, demean=True) # Loop over channels. for j_channel, excl, signal in zip(range(n_channels), exclude_mask, seg): # If channels is to be excluded, use NaN to indicate artefacted channel. if excl: # Assign nan values as output. zq_ij, hq_ij, dq_ij, cp_ij, hmin_ij, hmax_ij = (np.nan for i in range(6)) else: # Compute MFA features for single-channel time-series segment. zq_ij, hq_ij, dq_ij, cp_ij, hmin_ij, hmax_ij = self.mfa_time_series( signal, fs, fs_rr, j1, j2, nq) zq_tensor[:, j_channel, i_seg] = zq_ij hq_tensor[:, j_channel, i_seg] = hq_ij dq_tensor[:, j_channel, i_seg] = dq_ij cp_tensor[:, j_channel, i_seg] = cp_ij hmin_tensor[j_channel, i_seg] = hmin_ij hmax_tensor[j_channel, i_seg] = hmax_ij # Update progress bar. if verbose > 0: bar.update() # Collect the functions of q in a dictionary. q_functions = { 'zq': zq_tensor, 'hq': hq_tensor, 'Dq': dq_tensor } result = MultifractalAnalysisResult(cp=cp_tensor, hmin=hmin_tensor, hmax=hmax_tensor, q_functions=q_functions, algorithm_parameters=self.parameters, channel_labels=channel_labels) return result
[docs] def mfa_time_series(self, time_series, fs, fs_rr, j1, j2, nq): """ Compute multifractality features of a time series. Args: time_series (np.ndarray): 1D array with time series data to analyze. fs (int): sample frequency of the data in time_series. fs_rr: (int): resampling frequency for the power in the delta frequency band. j1 (int): smallest scale analysis. j2 (int): largest scale analysis. nq (int): number of (positive) q values that will be used (see build_q_log()). Returns: zq (np.ndarray): estimates of the partition function zeta(q). hq (np.ndarray): estimates of Hurst exponents h(q). dq (np.ndarray): estimates of singularity/multifractal spectrum D(q). cp (np.ndarray): estimates of the coefficients c_p from the Taylor expansion that estimate tau(q). Element i-1 in the cp array corresponds to c_i. hmin (float): Hurst exponent at the left of the spectrum (at q=-5). hmax (float): Hurst exponent at the right of the spectrum (at q=5). """ zq, hq, dq, cp, hmin, hmax = self._mfa_time_series_matlab( time_series, fs, fs_rr, j1, j2, nq) return zq, hq, dq, cp, hmin, hmax
def _mfa_time_series_matlab(self, time_series, fs, fs_rr, j1, j2, nq): """ Call the MATLAB function mfaTimeSeries() to compute the multifractality features. Args: time_series (np.ndarray): see mfaTimeSeries.m. fs (float): see mfaTimeSeries.m. fs_rr: (float): see mfaTimeSeries.m. j1 (float): see mfaTimeSeries.m. j2 (float): see mfaTimeSeries.m. nq (float): see mfaTimeSeries.m. """ from nnsa.matlab.utils import ml_array, quit_engine eng = self.eng try: # Convert the time_series numpy array to a matlab array. time_series = ml_array(array=time_series, eng=eng, dtype='double') # Call the MATLAB function. Note that we cast all inputs to float, so that they will be passed as # number of datatype 'double' in the MATLAB engine (which is the required type for most MATLAB functions). zq, hq, dq, cp, hmin, hmax = eng.mfaTimeSeries(time_series, float(fs), float(fs_rr), float(j1), float(j2), float(nq), nargout=6) # Convert matlab arrays to numpy arrays. zq, hq, dq, cp = (np.asarray(ar) for ar in (zq, hq, dq, cp)) except: # Make sure to quit the matlab engine and then re-raise the error. quit_engine(eng) raise return zq, hq, dq, cp, hmin, hmax
[docs]class MultifractalAnalysisResult(ResultBase): """ High-level interface for processing the results of multifractal analysis as created by nnsa.MultifractalAnalysis(). Args: cp (np.ndarray): estimates of the coefficients c_p from the Taylor expansion that estimate tau(q) with dimensions (p, channels, segments). Element i-1 in the cp array corresponds to c_i. hmin (np.ndarray): Hurst exponent at the left of the spectrum (at q=-5) with dimensions (channels, segments). hmax (np.ndarray): Hurst exponent at the right of the spectrum (at q=5) with dimensions (channels, segments). algorithm_parameters (nnsa.Parameters): see ResultBase. q_functions (dict, optional): dict with functions of q (e.g. 'zq', 'hq', 'Dq'). Each value in the dict is an array with dimensions (q, channels, segments). Each array in the dict must have this same shape. channel_labels (list of str, optional): labels of the channels corresponding to the channel dimensions of the arrays. If None, default labels will be created. Default is None. data_info (str, optional): see ResultBase. segment_start_times (np.ndarray, optional): see ResultBase. segment_end_times (np.ndarray, optional): see ResultBase. fs (flaot, optional): see ResultBase. """ def __init__(self, cp, hmin, hmax, algorithm_parameters, q_functions=None, channel_labels=None, data_info=None, segment_start_times=None, segment_end_times=None, fs=None): super().__init__(algorithm_parameters=algorithm_parameters, data_info=data_info, segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs) # Input check. data_shape = cp.shape if channel_labels is None: channel_labels = enumerate_label(data_shape[-2], label='Channel') elif len(channel_labels) != data_shape[1]: raise ValueError('Length of channel_labels ({}) does not correspond to the shape of the data {}.' .format(len(channel_labels), data_shape)) # Store variables that are not already stored by the parent class (ResultBase). self.cp = cp self.hmin = hmin self.hmax = hmax self.q_functions = q_functions self.channel_labels = channel_labels @property def num_segments(self): """ Return the number of segments. Returns: (int): number of segments. """ return self.hmin.shape[-1] @property def q(self): """ Return array with q values. Returns: (np.ndarray): array with q values. """ nq = self.algorithm_parameters['mfa']['nq'] q = buid_q_log(n=nq) return q
[docs] def extract_global_features(self, **kwargs): """ Extract features that characterize the entire recording. Args: **kwargs (optional): optional keyword arguments for local_to_global_features(). Returns: global_features (dict): see local_to_global_features(). """ # Compute (local) features. features = self.extract_features() # Change the keys and indicate the type of feature. feature_signature = 'MFA' for key in list(features.keys()): # Note the use of a list instead of iterator. features['{}_{}'.format(feature_signature, key)] = features.pop(key) # Compute global features. global_features = local_to_global_features(features, self.segment_start_times, self.segment_end_times, channel_labels=self.channel_labels, **kwargs) return global_features
[docs] def extract_features(self): """ Return standard set of features extracted from the result of the MFA. Returns: (dict): dictionary with features. Features are arrays with dimension (channels, segments). """ features = { 'c1': self.cp[0], 'c2': self.cp[1], 'c3': self.cp[2], 'deltah': self.hmax - self.hmin } return features
[docs] def plot(self, ax=None): """ Plot the result. Args: ax: Returns: """ import matplotlib.pyplot as plt if ax is None: plt.figure() else: plt.sca(ax) qf = self.q_functions h = np.nanmean(np.nanmean(qf['hq'], axis=-1), axis=-1) D = np.nanmean(np.nanmean(qf['Dq'], axis=-1), axis=-1) plt.plot(h, D) plt.xlabel('h') plt.ylabel('D(h)') plt.title('MF spectrum')
def _merge(self, other): """ See ResultBase. """ # Check if the channel labels of self and other are the same. if self.channel_labels != other.channel_labels: raise ValueError('Cannot merge results with different channel labels.') self.cp = np.concatenate((self.cp, other.cp), axis=-1) self.hmin = np.concatenate((self.hmin, other.hmin), axis=-1) self.hmax = np.concatenate((self.hmax, other.hmax), axis=-1) for key, val in self.q_functions.items(): val_other = other.q_functions[key] self.q_functions[key] = np.concatenate((val, val_other), axis=-1) @staticmethod def _read_from_csv(filepath): """ Read result from csv file into a MultifractalAnalysisResult class. Args: filepath (str): see ResultBase._read_from_csv(). Returns: result (nnsa.MultifractalAnalysisResult): instance of MultifractalAnalysisResult containing the MultifractalAnalysis result. """ # Lines 1-4: Standard csv header (use the ResultBase method). algorithm_parameters, data_info, fs = ResultBase._read_csv_header(filepath)[1:] # Re-open the file and read the rest of the file, line by line. with open(filepath, 'r') as f: reader = csv.reader(f) # Lines 1-4: Standard csv header (already read, skip). [next(reader) for i in range(4)] # Line 5: Non-array data header (skip). assert(next(reader) == ['channel_labels', 'cp.shape', 'hmin.shape', 'hmax.shape', 'q_functions[key].shape']) # Line 6: Non-array data. channel_labels, cp_shape, hmin_shape, hmax_shape, q_functions_shape = [ convert_string_auto(i) for i in next(reader)] # Line 7: Array header (skip). assert(next(reader)[0] == 'cp') # Line 8: Array data (flattened). cp_as_list = [float(i) for i in next(reader)] # Line 9: Array header (skip). assert(next(reader)[0] == 'hmin') # Line 10: Array data (flattened). hmin_as_list = [float(i) for i in next(reader)] # Line 11: Array header (skip). assert(next(reader)[0] == 'hmax') # Line 12: Array data (flattened). hmax_as_list = [float(i) for i in next(reader)] if q_functions_shape is not None: # Line 13: Array header. q_functions_keys = next(reader) # Line 14-... : Array data (flattened). Each f(q) is saved on one row. q_functions = dict() for key in q_functions_keys: array_as_list = [float(i) for i in next(reader)] # Convert to numpy array and reshape. q_functions[key] = np.reshape(array_as_list, q_functions_shape) else: q_functions = None # Convert to numpy array and reshape. cp = np.reshape(cp_as_list, cp_shape) hmin = np.reshape(hmin_as_list, hmin_shape) hmax = np.reshape(hmax_as_list, hmax_shape) # Create a result object. result = MultifractalAnalysisResult(cp=cp, hmin=hmin, hmax=hmax, q_functions=q_functions, algorithm_parameters=algorithm_parameters, channel_labels=channel_labels, data_info=data_info, fs=fs) return result @staticmethod def _read_from_hdf5(filepath): """ Read result from hdf5 file into a MultifractalAnalysisResult class. Args: filepath (str): see ResultBase._read_from_hdf5(). Returns: result (nnsa.MultifractalAnalysisResult): instance of MultifractalAnalysisResult containing the MultifractalAnalysis result. """ # Read standard hdf5 header (use the ResultBase method). algorithm_parameters, data_info, segment_start_times, segment_end_times, fs, time_offset =\ ResultBase._read_hdf5_header(filepath)[1:] # Re-open the file and read the rest of the file. with h5py.File(filepath, 'r') as f: # Read array data. cp = f['cp'][:] hmin = f['hmin'][:] hmax = f['hmax'][:] if 'q_functions' in f: q_functions_ds = f['q_functions'] q_functions = dict() for key in q_functions_ds.keys(): q_functions[key] = q_functions_ds[key][:] else: q_functions = None # Read non-array data. channel_labels = [label.decode() for label in f['cp'].attrs['channel_labels']] # Create a result object. result = MultifractalAnalysisResult(cp=cp, hmin=hmin, hmax=hmax, q_functions=q_functions, algorithm_parameters=algorithm_parameters, channel_labels=channel_labels, data_info=data_info, segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs) return result def _write_to_csv(self, filepath): """ Write the contents of the object to a csv file. Args: filepath (str): see ResultBase._write_to_csv(). """ # Lines 1-4: Standard csv header (use the ResultBase method). self._write_csv_header(filepath) # Append attributes to the csv file, line by line. with open(filepath, 'a', newline='') as csvfile: writer = csv.writer(csvfile) # Line 5: Non-array data header. writer.writerow(['channel_labels', 'cp.shape', 'hmin.shape', 'hmax.shape', 'q_functions[key].shape']) # Line 6: Non-array data. data_shapes = [data.shape for data in (self.cp, self.hmin, self.hmax)] q_func_shape = next((p for p in self.q_functions.values())).shape if self.q_functions is not None else None writer.writerow([self.channel_labels] + data_shapes + [q_func_shape]) # Line 7: Array header. writer.writerow(['cp']) # Line 8: Array data (flattened). writer.writerow(self.cp.reshape(-1).tolist()) # Line 9: Array header. writer.writerow(['hmin']) # Line 10: Array data (flattened). writer.writerow(self.hmin.reshape(-1).tolist()) # Line 11: Array header. writer.writerow(['hmax']) # Line 12: Array data (flattened). writer.writerow(self.hmax.reshape(-1).tolist()) if self.q_functions is not None: # Line 13: Array header. writer.writerow(self.q_functions.keys()) # Line 14-... : Array data (flattened). Each f(q) is saved on one row. for p in self.q_functions.values(): writer.writerow(p.reshape(-1).tolist()) def _write_to_hdf5(self, filepath): """ Write the contents of the object to an hdf5 file. Args: filepath (str): see ResultBase._write_to_hdf5(). """ # Write standard hdf5 header (use the ResultBase method). self._write_hdf5_header(filepath) # Append attributes to the hdf5 file. with h5py.File(filepath, 'a') as f: # Write array data. f.create_dataset('cp', data=self.cp) f.create_dataset('hmin', data=self.hmin) f.create_dataset('hmax', data=self.hmax) if self.q_functions is not None: for key, data in self.q_functions.items(): f.create_dataset('q_functions/{}'.format(key), data=data) # Write non-array data as attributes to the 'cp' group. # Convert strings to np.string_ type as recommended for compatibility. f['cp'].attrs['channel_labels'] = [np.string_(label) for label in self.channel_labels]
[docs]def buid_q_log(q_min=0.01, q_max=5, n=50): """ Build a convenient array q for multifractal analysis. The created array consist of the following values: \pm log-spaced values between q_min and q_max, 0, \pm 1 and \pm 2. Args: q_min (float, optional): minimum values for positive q. q_max (flaot, optional): maxmimum value for positive q. n (int, optional): number of log-spaced values for positive q. Returns: (np.ndarray): array with q values in sorted order. """ q = np.append(np.logspace(np.log10(q_min), np.log10(q_max), n), [1, 2]) return np.unique(np.concatenate((q, -q, [0])))
[docs]def histogram_features_lll(line_length, channel_labels=None, ignore_nan=True): """ Compute the histogram features from the log Line Length distribution. N. Koolen et al., "Data-driven metric representing the maturation of preterm EEG," 2015 37th Annual International Conference of the IEEE Engineering in Medicine and Biology Society (EMBC), Milan, 2015, pp. 1492-1495. Args: line_length (np.ndarray): array with the normalized line length with dimensions (channels, segments). channel_labels (list, optional): list of the channel labels corresponding to the rows of line_length. ignore_nan (bool, optional): if True, ignore nan values in line_length. Defaults to True. Returns: (pd.DataFrame): dataframe with the channel labels as row index and the features as columns. """ # Log-transform the line length. log_line_length = np.log10(line_length) # Histogram bins. width = 0.1 range = (-3.1, -1.2) centers = np.arange(range[0], range[1] + width, width) bin_edges = np.append(centers - width/2, range[1] + width/2) return histogram_features_per_channel(log_line_length, bins=bin_edges, channel_labels=channel_labels, ignore_nan=ignore_nan)
[docs]def wfbmesti(x, method='derivative_wavelet', axis=-1, keepdims=False): """ Compute Hurst exponent using the second order discrete derivative as in MATLAB's wfbmesti function. This function only implements the two second order discrete derivative methods from MATLAB's function, the third method (variance versus level) is not implemented here (since it is the slowest method). Args: x (np.ndarray): input signal(s). method (str): which method to use. Chose from: 'derivative': the first estimate returned by MATLAB. 'derivative_wavelet': the second estimate returned by MATLAB. axis (int): the time axis in `x`. keepdims (bool): if True, the time axis is reduced to 1, if False, the time axis is removed. Returns: H (np.ndarray): array with Hurst exponent(s). E.g., if `x` has shape (p, q, r), `axis` is -1 and `keepdims` is False, then `H` has shape (p, q). """ y = np.diff(x, axis=axis) y = np.cumsum(y, axis=axis) n = y.shape[axis] method = method.lower() if 'derivative' in method: if method == 'derivative': # Second order discrete derivative. b1 = [1, -2, 1] b2 = [1, 0, -2, 0, 1] elif 'wavelet' in method: # Second order discrete derivative (using wavelets). # The following filter coefficients are hardcoded for speed. # pywt.Wavelet('sym5').dec_hi was used to obtain the coefficients for b1. b1 = [-0.019538882735286728, -0.021101834024758855, 0.17532808990845047, 0.01660210576452232, -0.6339789634582119, 0.7234076904024206, -0.1993975339773936, -0.039134249302383094, -0.029519490925774643, 0.027333068345077982] # b2 is defined as b1 alternated with zeros. b2 = [-0.019538882735286728, 0, -0.021101834024758855, 0, 0.17532808990845047, 0, 0.01660210576452232, 0, -0.6339789634582119, 0, 0.7234076904024206, 0, -0.1993975339773936, 0, -0.039134249302383094, 0, -0.029519490925774643, 0, 0.027333068345077982, 0] else: raise ValueError('Invalid option method="{}". See code for all options.'.format(method)) # The following is ported from MATLAB's wfbmesti function. # It is extended to works for arrays with any shape. y1 = scipy.signal.lfilter(b1, 1, y, axis=axis) y1 = y1.take(indices=range(len(b1) - 1, n), axis=axis) y2 = scipy.signal.lfilter(b2, 1, y, axis=axis) y2 = y2.take(indices=range(len(b2) - 1, n), axis=axis) s1 = np.mean(y1**2, axis=axis, keepdims=keepdims) s2 = np.mean(y2**2, axis=axis, keepdims=keepdims) H = 0.5*np.log2(s2/s1) elif 'regression' in method: raise NotImplementedError('Regression method not implemented.') else: raise ValueError('Invalid option method="{}". See code for all options.'.format(method)) return H