Source code for nnsa.feature_extraction.entropy

"""
Module containing entropy-related functions and classes.
"""
import copy
import csv
import os
import sys
import warnings
from bisect import bisect_right
from collections import deque

import h5py
import numpy as np
import pyprind
from numba import njit
from sklearn.preprocessing import OneHotEncoder

from nnsa.artefacts.artefact_detection import detect_artefact_signals, default_eeg_signal_quality_criteria
from nnsa.feature_extraction.common import check_multichannel_data_matrix, get_channel_index, \
    preprocess_segment, local_to_global_features
from nnsa.feature_extraction.result import ResultBase
from nnsa.parameters.parameters import ClassWithParameters, Parameters
from nnsa.utils.segmentation import compute_n_segments, get_all_segments, segment_generator, get_segment_times
from nnsa.utils.code_performance import print_efficiency
from nnsa.utils.config import HORIZONTAL_RULE
from nnsa.utils.other import enumerate_label, convert_string_auto
from nnsa.utils.plotting import maximize_figure
from nnsa.utils.testing import assert_equal

__all__ = [
    'MultiScaleEntropy',
    'MultiScaleEntropyResult',

    'sample_entropy',
]


[docs]class MultiScaleEntropy(ClassWithParameters): """ Class for performing a multi-scale entropy analysis. Reference: this code was ported from Mario Lavanga and Ofelie de Wel's MATLAB code. Main method: multi_scale_entropy() Args: see nnsa.ClassWithParameters. Examples: >>> np.random.seed(0) >>> x = np.random.rand(2, 2500) >>> mse = MultiScaleEntropy() >>> assert_equal(mse.parameters, MultiScaleEntropy.default_parameters()) >>> print(type(mse.parameters).__name__) Parameters >>> mse_result = mse.multi_scale_entropy(x, fs=25, verbose=0) >>> print(type(mse_result).__name__) MultiScaleEntropyResult >>> mse_result.mse[10, 1, 0] 1.0185695809945732 """
[docs] @staticmethod def default_parameters(): """ Return the default parameters as a dictionary. Returns: (nnsa.Parameters): a default set of parameters for the object. """ # Parameters for segmentation of EEG recording into small time segments/epochs. segmentation = Parameters(**{ # Segment length in seconds: 'segment_length': 50, # Overlap in segments in seconds: 'overlap': 0, }) # Parameters for artefact detection/exclusion, see e.g. # nnsa.artefacts.artefact_detection.default_eeg_signal_quality_criteria() artefact_criteria = None # Specify a filter for filtering the segments, see nnsa.preprocessing.filter.filter_signal(). # Specify None to not filter the segments. segment_filter_specification = None # Parameters for multi-scale entropy. mse = Parameters(**{ # Maximum scale: 'max_scale': 20, # Template length m: 'm': 2, # Scaling coefficient for determining r from std (r = r_coeff*std(x)): 'r_coef': 0.2 }) pars = { 'segmentation': segmentation, 'artefact_criteria': artefact_criteria, 'segment_filter_specification': segment_filter_specification, 'mse': mse, } return Parameters(**pars)
[docs] def multi_scale_entropy(self, data_matrix, fs, channel_labels=None, verbose=1): """ Perform the multi-scale entropy (MSE) analysis on multichannel data. Analysis pipeline ported from MATLAB code designed by Ofelie De Wel and Mario Lavanga. Pipeline constsist of 4 steps: 1) Segmentation 2) Optional filtering 3) Artefact exclusion 4) MSE computation Args: data_matrix (np.ndarray): see check_multichannel_data_matrix(). fs (float): sample frequency of the EEG signals. channel_labels (list of str, optional): see check_multichannel_data_matrix(). verbose (int, optional): verbose level. Defaults to 1. Returns: (nnsa.MultiScaleEntropyResult): object containing the MSE result per segment and per channel. """ # Check input. data_matrix, channel_labels = check_multichannel_data_matrix(data_matrix, channel_labels) if verbose > 0: print(HORIZONTAL_RULE) print('Running multi_scale_entropy with parameters:') print(self.parameters) # Extract some parameters. seg_pars = self.parameters['segmentation'] filter_specification = self.parameters['segment_filter_specification'] artefact_criteria = self.parameters['artefact_criteria'] max_scale = self.parameters['mse']['max_scale'] m_se = self.parameters['mse']['m'] r_coef = self.parameters['mse']['r_coef'] n_channels = data_matrix.shape[0] dtype = data_matrix.dtype # Segment the data in the time axis (create a generator). seg_generator = segment_generator(data_matrix, segment_length=seg_pars['segment_length'], overlap=seg_pars['overlap'], fs=fs, axis=-1) n_segments = compute_n_segments(data_matrix, segment_length=seg_pars['segment_length'], overlap=seg_pars['overlap'], fs=fs, axis=-1) # Initialize progress bar. bar = pyprind.ProgBar(n_segments, stream=sys.stdout) # Loop over segments. mse_tensor = np.empty((max_scale, n_channels, n_segments), dtype=dtype) # (scales, channels, segments) for i_seg, seg in enumerate(seg_generator): # Update progress bar. if verbose > 0: bar.update() # Preprocess segment (demean, optionally filter, find channels to exclude). seg, exclude_mask = preprocess_segment(seg, fs, filter_specification=filter_specification, artefact_criteria=artefact_criteria, demean=True) # Loop over channels. for j_channel, excl, signal in zip(range(n_channels), exclude_mask, seg): # If channels is to be excluded, use NaN to indicate artefacted channel. if excl: mse_tensor[:, j_channel, i_seg] = np.nan else: # Determine matching tolerance for sample entropy. r_se = r_coef*np.std(signal, ddof=1) # Use ddof=1 to do be equivalent to MATLAB. # Compute mse for segment channel. mse_tensor[:, j_channel, i_seg] = multi_scale_entropy(signal, max_scale=max_scale, m=m_se, r=r_se, compute_mse_var=False)[0] # Return as a MultiScaleEntropyResult object. return MultiScaleEntropyResult(mse_tensor, channel_labels=channel_labels, algorithm_parameters=self.parameters)
@print_efficiency def _multi_scale_entropy_parallel(self, data_matrix, fs, channel_labels=None, verbose=1): """ Attempt to create a (faster) parallelized version of multi_scale_entropy(). It is a little bit faster, but not significant (less than 2 times speed up). Analysis pipeline ported from MATLAB code designed by Ofelie De Wel and Mario Lavanga. Pipeline constsist of 4 steps: 1) Segmentation 2) Optional filtering 3) Artefact exclusion 4) MSE computation Args: data_matrix (np.ndarray): see check_multichannel_data_matrix(). fs (float): sample frequency of the EEG signals. channel_labels (list of str, optional): see check_multichannel_data_matrix(). verbose (int, optional): verbose level. Defaults to 1. Returns: (nnsa.MultiScaleEntropyResult): object containing the MSE result per segment and per channel. """ from joblib import Parallel, delayed # Check input. data_matrix, channel_labels = check_multichannel_data_matrix(data_matrix, channel_labels) if verbose > 0: print(HORIZONTAL_RULE) print('Running multi_scale_entropy with parameters:') print(self.parameters) # Extract some parameters. seg_pars = self.parameters['segmentation'] filter_specification = self.parameters['segment_filter_specification'] artefact_criteria = self.parameters['artefact_criteria'] max_scale = self.parameters['mse']['max_scale'] m_se = self.parameters['mse']['m'] r_coef = self.parameters['mse']['r_coef'] n_channels = data_matrix.shape[0] dtype = data_matrix.dtype def _compute_mse_channel(excl, signal): # If channels is to be excluded, use NaN to indicate artefacted channel. if excl: mse = np.empty((max_scale, 1)) mse.fill(np.nan) else: # Determine matching tolerance for sample entropy. r_se = r_coef * np.std(signal, ddof=1) # Use ddof=1 to do be equivalent to MATLAB. # Compute mse for segment channel. mse = multi_scale_entropy(signal, max_scale=max_scale, m=m_se, r=r_se, compute_mse_var=False)[0] mse.shape = (max_scale, 1) return mse # Segment the data in the time axis (create a generator). seg_generator = segment_generator(data_matrix, segment_length=seg_pars['segment_length'], overlap=seg_pars['overlap'], fs=fs, axis=-1) n_segments = compute_n_segments(data_matrix, segment_length=seg_pars['segment_length'], overlap=seg_pars['overlap'], fs=fs, axis=-1) # Initialize progress bar. bar = pyprind.ProgBar(n_segments, stream=sys.stdout) # Loop over segments. mse_tensor = np.empty((max_scale, n_channels, n_segments), dtype=dtype) # (scales, channels, segments) for i_seg, seg in enumerate(seg_generator): # Update progress bar. if verbose > 0: bar.update() # Preprocess segment (demean, optionally filter, find channels to exclude). seg, exclude_mask = preprocess_segment(seg, fs, filter_specification=filter_specification, artefact_criteria=artefact_criteria) # Loop over channels in a parallelized fashion. mse_per_channel = Parallel(n_jobs=os.cpu_count())(delayed(_compute_mse_channel)(excl, signal) for excl, signal in zip(exclude_mask, seg)) mse_channel = np.hstack(mse_per_channel) mse_tensor[:, :, i_seg] = mse_channel # Return as a MultiScaleEntropyResult object. return MultiScaleEntropyResult(mse_tensor, channel_labels=channel_labels, algorithm_parameters=self.parameters)
[docs]class MultiScaleEntropyResult(ResultBase): """ High-level interface for processing the results of multi-scale entropy analysis as created by nnsa.MultiScaleEntropy(). Args: mse (np.ndarray): array with multi-scale entropy values with dimensions (scales, channels, segments). mse[i] contains the result of scale i+1. algorithm_parameters (nnsa.Parameters): see ResultBase. channel_labels (list of str, optional): labels of the channels corresponding to the second axis in `mse`. If None, default labels will be created. Default is None. data_info (str, optional): see ResultBase. segment_start_times (np.ndarray, optional): see ResultBase. segment_end_times (np.ndarray, optional): see ResultBase. fs (flaot, optional): see ResultBase. """ def __init__(self, mse, algorithm_parameters, channel_labels=None, data_info=None, segment_start_times=None, segment_end_times=None, fs=None): super().__init__(algorithm_parameters=algorithm_parameters, data_info=data_info, segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs) # Input check. data_shape = mse.shape if channel_labels is None: channel_labels = enumerate_label(data_shape[1], label='Channel') elif len(channel_labels) != data_shape[1]: raise ValueError('Length of channel_labels ({}) does not correspond to the shape of the data {}.' .format(len(channel_labels), data_shape)) # Store variables that are not already stored by the parent class (ResultBase). self.mse = mse self.channel_labels = channel_labels @property def average_mse(self): """ Return the mse averaged over all segments. Returns: self._average_mse (np.ndarray): array with dimensions (scales, channels) containing the average sample entropy values. """ return np.nanmean(self.mse, axis=-1) @property def num_segments(self): """ Return the number of segments. Returns: (int): number of segments. """ return self.mse.shape[2] @property def scales(self): """ Return the scales of the mse analysis. Returns: (np.ndarray): array with scales corresponding to first axis of self.mse. """ return np.arange(1, self.mse.shape[0] + 1)
[docs] def compute_average_slopes(self, split=5): """ Compute the average slope of the mse curve in the small and large scales per channel. Args: split (int, optional): the scale that divides the scale into small scales (<=split) and large scales (>split). Defaults to 5. Returns: slope_small (np.ndarray): array with dimensions (channels, segments) containing the average slope of the mse curve of scales 1 until split. slope_large (np.ndarray): array with dimensions (channels, segments) containing the average slope of the mse curve of scales split until the maximum available scale. """ # Create a helper function to compute the average slope. def _compute_average_slope(x): return np.mean(np.diff(x, axis=0), axis=0) slope_small = _compute_average_slope(self.mse[:split]) slope_large = _compute_average_slope(self.mse[split:]) return slope_small, slope_large
[docs] def compute_complexity_index(self): """ Compute the complexity index, i.e. the area under the mse curve, for each segment and each channel. Returns: (np.ndarray): array with dimensions (channels, segments) with the complexity indices. """ return np.trapz(self.mse, axis=0)
[docs] def extract_channels(self, channels, inplace=False): if not inplace: mse = copy.deepcopy(self) else: mse = self if channels is not None: idx_channels = get_channel_index(mse.channel_labels, channels) mse.mse = mse.mse[:, idx_channels, :] mse.channel_labels = [mse.channel_labels[idx] for idx in idx_channels] if not inplace: return mse
[docs] def extract_global_features(self, **kwargs): """ Extract features that characterize the entire recording. Args: **kwargs (optional): optional keyword arguments for local_to_global_features(). Returns: global_features (dict): see local_to_global_features(). """ # Compute (local) features. features = dict() features['MSE_slope_small'], features['MSE_slope_large'] = self.compute_average_slopes() features['MSE_ci'] = self.compute_complexity_index() features['MSE_max'] = self.maximum_mse() # TODO Compute coefficients of 2nd order polynomial fit. for i, mse_i in enumerate(self.mse): features['MSE_scale_{}'.format(i+1)] = mse_i # Compute global features. global_features = local_to_global_features(features, self.segment_start_times, self.segment_end_times, channel_labels=self.channel_labels, **kwargs) return global_features
[docs] def maximum_mse(self): """ Return the maximum value of the multiscale entropy curve. Returns: (np.ndarray): array with dimensions (channels, segments) with the maxima. """ return np.max(self.mse, axis=0)
[docs] def plot(self, channels=None, segments=None, plot_fit=False, ax=None): """ Average the mse values for all segments and plot the entropy against scale for the specified channels. Args: channels (list of str, optional): a list of labels that specify which channels to plot. Each channel gets its own color in the plot. If None, all channels are plotted. Defaults to None. segments (list, optional): iterable with indices of the segments to plot. Each segment is plotted as a separate solid line of a color corresponding to the channel. If None, the average sample entropy values over all segments are plotted per scale. Defaults to None. plot_fit (bool, optional): if True, plot the polynomial approximation. Defaults to False. ax (plt.Axes, optional): axes to plot in. If None, plots in a new figure. Defaults to None. """ from matplotlib import pyplot as plt # Default arguments. if channels is None: # Plot all channels. channels = self.channel_labels # Find the indices of the channels in the data arrays. idx_channels = get_channel_index(self.channel_labels, channels) # The scales are plotted on the x-axis. scales = self.scales if ax is None: plt.figure() else: plt.sca(ax) # Loop over channels. for i_channel in idx_channels: label = self.channel_labels[i_channel] color = 'C{}'.format(i_channel) x = scales if segments is None: y = self.average_mse[:, i_channel] plt.plot(x, y, label=label, color=color) if plot_fit: fit = np.poly1d(np.polyfit(x, y, deg=2)) plt.plot(x, fit(x), label='{}_fit'.format(label), color=color, linestyle='--') else: for ii, i_seg in enumerate(segments): y = self.mse[:, i_channel, i_seg] label = label if ii == 0 else None plt.plot(x, y, label=label, color=color) if plot_fit: fit = np.poly1d(np.polyfit(x, y, deg=2)) plt.plot(x, fit(x), label='{}_fit'.format(label) if ii == 0 else None, color=color, linestyle='--') # Figure make-up. plt.title('MSE curve') plt.ylabel('Sample entropy') plt.xlabel('Scale') plt.legend()
[docs] def remove_channel(self, channels, inplace=False): """ Remove one or more channels. Args: channels (str or list): (list of) channel name(s) to remove. inplace (bool, optional): bool tos pecify whether it happens in place or not. Returns: new MultiScaleEntropyResult object if inplace is False. """ if inplace: mse = self else: mse = copy.deepcopy(self) idx_channels = get_channel_index(mse.channel_labels, channels) mse.mse = np.delete(mse.mse, idx_channels, axis=-1) mse.channel_labels = [lab for lab in mse.channel_labels if lab not in channels] if not inplace: return mse
def _extract_epoch(self, mask): """ See ResultBase. """ self.mse = self.mse[:, :, mask] def _merge(self, other, index=None): """ See ResultBase. """ # Check if the channel labels of self and other are the same. if self.channel_labels != other.channel_labels: raise ValueError('Cannot merge results with different channel labels.') if index is not None: n_scales, n_channels, n_samples = self.mse.shape if index < n_samples: # Cut piece off. msg = 'Overwriting data while merging.' warnings.warn(msg) self.mse = self.mse[:, :, :index] else: # Add nans. self.mse = np.concatenate([self.mse, np.full((n_scales, n_channels, index-n_samples), fill_value=np.nan)], axis=-1) self.mse = np.concatenate((self.mse, other.mse), axis=-1) @staticmethod def _read_from_csv(filepath): """ Read result from csv file into a MultiScaleEntropyResult class. Args: filepath (str): see ResultBase._read_from_csv(). Returns: result (nnsa.MultiScaleEntropyResult): instance of MultiScaleEntropyResult containing the MultiScaleEntropy result. """ # Lines 1-4: Standard csv header (use the ResultBase method). algorithm_parameters, data_info, fs = ResultBase._read_csv_header(filepath)[1:] # Re-open the file and read the rest of the file, line by line. with open(filepath, 'r') as f: reader = csv.reader(f) # Lines 1-4: Standard csv header (already read, skip). [next(reader) for i in range(4)] # Line 5: Non-array data header (skip). next(reader) # Line 6: Non-array data. channel_labels, data_shape = [convert_string_auto(i) for i in next(reader)] # Line 7: Array header (skip). next(reader) # Line 8: Array data (flattened). array_as_list = [float(i) for i in next(reader)] # Convert to numpy array and reshape. mse = np.reshape(array_as_list, data_shape) # Create a result object. result = MultiScaleEntropyResult(mse=mse, algorithm_parameters=algorithm_parameters, channel_labels=channel_labels, data_info=data_info, fs=fs) return result @staticmethod def _read_from_hdf5(filepath): """ Read result from hdf5 file into a MultiScaleEntropyResult class. Args: filepath (str): see ResultBase._read_from_hdf5(). Returns: result (nnsa.MultiScaleEntropyResult): instance of MultiScaleEntropyResult containing the MultiScaleEntropy result. """ # Read standard csv header (use the ResultBase method). algorithm_parameters, data_info, segment_start_times, segment_end_times, fs, time_offset =\ ResultBase._read_hdf5_header(filepath)[1:] # Re-open the file and read the rest of the file. with h5py.File(filepath, 'r') as f: # Read array data. mse_ds = f['mse'] mse = mse_ds[:] # Read non-array data. channel_labels = [label.decode() for label in mse_ds.attrs['channel_labels']] # Create a result object. result = MultiScaleEntropyResult(mse=mse, algorithm_parameters=algorithm_parameters, channel_labels=channel_labels, data_info=data_info, segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs) return result def _write_to_csv(self, filepath): """ Write the contents of the object to a csv file. Args: filepath (str): see ResultBase._write_to_csv(). """ # Lines 1-4: Standard csv header (use the ResultBase method). self._write_csv_header(filepath) # Append attributes to the csv file, line by line. with open(filepath, 'a', newline='') as csvfile: writer = csv.writer(csvfile) # Line 5: Non-array data header. writer.writerow(['channel_labels', 'mse.shape']) # Line 6: Non-array data. data_shape = self.mse.shape writer.writerow([self.channel_labels, data_shape]) # Line 7: Array header. writer.writerow(['mse']) # Line 8: Array data (flattened). writer.writerow(self.mse.reshape(-1).tolist()) def _write_to_hdf5(self, filepath): """ Write the contents of the object to an hdf5 file. Args: filepath (str): see ResultBase._write_to_hdf5(). """ # Write standard hdf5 header (use the ResultBase method). self._write_hdf5_header(filepath) # Append attributes to the hdf5 file. with h5py.File(filepath, 'a') as f: # Write array data. f.create_dataset('mse', data=self.mse) # Write non-array data as attributes to the 'mse' group. # Convert strings to np.string_ type as recommended for compatibility. f['mse'].attrs['channel_labels'] = [np.string_(label) for label in self.channel_labels]
def multi_scale_entropy(x, max_scale, m, r, compute_mse_var=False): """ Compute the sample entropy of multiple downsampled versions of signal x for scales 1, ..., max_scale. Downsampling is done by dividing the signal up in non-overlapping segments, where segments have length equal to the scale. Subsequently, the means per segment are taken to downsample the signal. Additionally, the varaince in the segments is taken, of which also the mse is computed. Args: x (np.ndarray): see sample_entropy(). max_scale (int): maximum scale to examine. m (int): see sample_entropy(). r (float): see sample_entropy(). compute_mse_var (bool, optional): if True, compute the mse of the variance within the segments of size scale. If False, do not compute. Returned mse_var array will be nans. Defaults to False. Returns: mse_mean (np.ndarray): 1D array of size (max_scale,) containing the sample entropy of the mean of the segments for all scales. mse_var (np.ndarray): 1D arrar of size (max_scale,) containing the sample entropy of the variance of the segments for all scales (only if compute_mse_var is True). """ # Input check. if x.ndim != 1: raise ValueError('Input array x must have 1 dimension. Got x.ndim = {}.'.format(x.ndim)) # Initialize output arrays. mse_mean = np.empty(max_scale) mse_var = np.empty(max_scale) mse_var[:] = np.nan # Initialize to nan, since we may not fill this array. # Loop over scales. for i, scale in enumerate(range(1, max_scale + 1)): # Segment. segmented_x = get_all_segments(x, scale, overlap=0, fs=1, axis=0) # Compute mean and variance per segment to create a downscaled version of the signal. mean_per_segment = np.mean(segmented_x, axis=-1) # Compute sample entropy. mse_mean[i] = sample_entropy(mean_per_segment, m=m, r=r) if compute_mse_var: mse_var[i] = sample_entropy(np.var(segmented_x, axis=-1, ddof=1), m=m, r=r) if scale > 1 else mse_mean[i] return mse_mean, mse_var
[docs]def sample_entropy(x, m, r): """ Compute the sample entropy of signal x, given a sequence length m and match tolerance r. Sample entropy is the negative natural logarithm of the probability that two sequences of length m that are similar, are still similar if the length of the sequences is increased by one sample. Reference: https://www.physiology.org/doi/full/10.1152/ajpheart.2000.278.6.H2039?url_ver=Z39.88-2003&rfr_id=ori:rid:crossref.org&rfr_dat=cr_pub%3dpubmed Richman, J. S. & Moorman, J. R. Physiological time-series analysis using approximate entropy and sample entropy. American Journal of Physiology-Heart and Circulatory Physiology 278, H2039-H2049 (2000). Args: x (np.ndarray): 1D array containing the signal to compute the sample entropy of. m (int): sequence length. r (float): matching tolerance. Seuquences are considered similar if their Chebyshev distance is below r. This is typically 0.1*std(x) or 0.2*std(x). Returns: (flaot): sample entropy of x. """ # Check input. if x.ndim != 1: raise ValueError('x should be 1 dimensional. Got array with {} dimensions.'.format(x.ndim)) # Run one of the sampen algorithms. return _sampen_fast(x, m, r)
def _sampen_fastest(x, m, r): """ Compute the sample entropy of degree m of a time series x, with matching tolerance r. Adapted version of _sampen_fast(), which precomputes some distance vectors to avoid double computations and array lookups. This algorithm is the fastest implementation of sampen in this package (especially for large len(x)). Args: x (np.ndarray): 1D array containing the signal to compute the sample entropy of. m (int): sequence length. r (float): matching tolerance. Sequences are considered similar if their Chebyshev distance is below r. Returns: sampen (np.ndarray): sample entropy, i.e. "#templates of length m+1" / "#templates of length m". References: [1] http://en.wikipedia.org/wiki/Sample_Entropy [2] http://physionet.incor.usp.br/physiotools/sampen/ [3] Madalena Costa, Ary Goldberger, CK Peng. Multiscale entropy analysis of biological signals """ n = len(x) # count is a 1d array that holds the number of matches. count[k] holds the number of matches for templates of # length k. count = np.zeros(m + 2, dtype=int) # Templates of length 0 matches by definition: #count[0] = n * (n - 1) / 2 # Pre-compute pairwise differences. some_hitlists = deque() # Use a deque increase efficiency of removing the first item. x_iter = iter(x) for i, xi in zip(range(m), x_iter): some_hitlists.append(np.abs(x[i + 1:] - xi) < r) # Iterate over all samples. for i, xi in zip(range(m, n), x_iter): # Add new item with similarity of last element of current template sequence and all other samples. some_hitlists.append(np.abs(x[i + 1:] - xi) < r) # Find similarities of first element of template sequence. hitlist = some_hitlists[0][: -m] searchlist = np.nonzero(hitlist)[0] num_hits = len(searchlist) # The found similarities are the number of sequences with length 1 that match with the first sample in the # current template. length = 1 count[length] += num_hits # While we find matching samples, we increase the sequence length and check the new sample in the sequences for # similarity with the template. go = num_hits > 0 while go: # Increment the sequence length. length += 1 # Find the samples that match, those are counted and represent the candidates for the next iteration. hitlist = some_hitlists[length - 1][searchlist] searchlist = searchlist[hitlist] # Add the number of matches to the count. num_hits = len(searchlist) if length >= m: count[length] += num_hits # Only leave the while loop if length == m + 1 or no candidates left. go = length < m + 1 and num_hits # Remove first item. some_hitlists.popleft() # Compute sample entropy. # sampen = np.log(count[:-1]/count[1:]) sampen = np.log(count[-2]/count[-1]) return sampen def _sampen_precompute_all_diff(x, m, r): """ Compute the sample entropy of degree m of a time series x, with matching tolerance r. This algorithm precomputes the pairwise differences beforehand and then determines the counts. In general this algorithm is a bit slower than _sampen_fast() and _sampen_fastest(), especially for large len(x). Args: x (np.ndarray): 1D array containing the signal to compute the sample entropy of. m (int): sequence length. r (float): matching tolerance. Sequences are considered similar if their Chebyshev distance is below r. Returns: sampen (np.ndarray): sample entropy, i.e. "#templates of length m+1" / "#templates of length m". References: [1] http://en.wikipedia.org/wiki/Sample_Entropy [2] http://physionet.incor.usp.br/physiotools/sampen/ [3] Madalena Costa, Ary Goldberger, CK Peng. Multiscale entropy analysis of biological signals """ n = len(x) # Pre-compute pairwise differences. hitlists = [] for i, xi in enumerate(x): hitlists.append(np.abs(x[i + 1:] - xi) < r) # Remove last, empty list. hitlists.pop() # count is a 1d array that holds the number of matches. count[k] holds the number of matches for templates of # length k. count = np.zeros(m + 2, dtype=np.int64) # Templates of length 0 matches by definition: #count[0] = n * (n - 1) / 2 length = 1 count[length] = np.sum([i[:-m].sum() for i in hitlists]) while length < m + 1: new_hitlist = [] # Skip one list. iter_hitlists = iter(hitlists) next(iter_hitlists) for cur_row, next_row in zip(hitlists, iter_hitlists): # Multiply and save. new_hitlist.append(cur_row[: - 1] * next_row) hitlists = new_hitlist length += 1 count[length] = np.sum([i[: len(i) - m + length - 1].sum() for i in hitlists]) # sampen = np.log(count[:-1]/count[1:]) sampen = np.log(count[-2]/count[-1]) return sampen @njit def _sampen_fast(x, m, r): """ Compute the sample entropy of degree m of a time series x, with matching tolerance r. The njit from numba makes this the fastest implementation. Adapted from https://github.com/nikdon/pyEntropy/blob/master/pyentrp/entropy.py with some improvements and bug fix. This algortihm is a little slower than _sampen_faster(). Args: x (np.ndarray): 1D array containing the signal to compute the sample entropy of. m (int): sequence length. r (float): matching tolerance. Sequences are considered similar if their Chebyshev distance is below r. Returns: sampen (np.ndarray): sample entropy, i.e. "#templates of length m+1" / "#templates of length m". References: [1] http://en.wikipedia.org/wiki/Sample_Entropy [2] http://physionet.incor.usp.br/physiotools/sampen/ [3] Madalena Costa, Ary Goldberger, CK Peng. Multiscale entropy analysis of biological signals """ n = len(x) # count is a 1d array that holds the number of matches. count[k] holds the number of matches for templates of # length k. count = np.zeros(m + 2, dtype=np.int64) # Templates of length 0 matches by definition: #count[0] = n*(n - 1) / 2 # Iterate over all samples. for i in range(n - m): # Extract a template of length m + 1. template = x[i:(i + m + 1)] # Extract the array with remaining samples that potentially contain matching sequences (do not include the # current sample to exclude self-matches). Only consider the right part of the remaining samples (to prevent # doing double work). rem_time_series = x[i + 1:] # Compute a list with potential candidates for matching sequences: only consider the first point and return # the indices of the samples that are within the specified tolerance. Note we also ignore the samples that are # too close to the end to form a sequence of length m. searchlist = np.nonzero(np.abs(rem_time_series[: - m] - template[0]) < r)[0] num_hits = len(searchlist) # The found samples within the tolerance are the number of sequences with length 1 that match with the first # sample in the current template. length = 1 count[length] += num_hits # While we find matching samples, we increase the sequence length and check the new sample in the sequences for # similarity with the template. go = num_hits > 0 while go: # Increment the sequence length and compute the indices of the new samples in the candidate sequences. length += 1 nextindxlist = searchlist + 1 # Extract value of the samples and find the samples that are within the tolerance. nextcandidates = rem_time_series[nextindxlist] hitlist = np.abs(nextcandidates - template[length-1]) < r # Update the searchlist for the next iteration (we can exclude the sequences that were not a match), searchlist = nextindxlist[hitlist] # Add the number of matches to the count. num_hits = len(searchlist) count[length] += num_hits # Only leave the while loop if length == m + 1 or no candidates left. go = length < m + 1 and num_hits # Compute sample entropy. # sampen = np.log(count[:-1]/count[1:]) sampen = np.log(count[-2]/count[-1]) return sampen def _sampen_lightweigth(x, m, r): """ Compute sample entropy using a relatively fast algorithm that includes sorting of values in x. Faster than sampen_simple and sampenc, but slower than sampen_fast. Implementation of an algortihm proposed by Manis et al. Low Computational Cost for Sample Entropy (2018) https://pdfs.semanticscholar.org/9bc5/343a117f4355542d14a7aea366cd4920029c.pdf Args: x (np.ndarray): 1D array containing the signal to compute the sample entropy of. m (int): sequence length. r (float): matching tolerance. Sequences are considered similar if their Chebyshev distance is below r. Returns: (flaot): sample entropy of x. """ # Initialize. N = len(x) A = 0 B = 0 # Sort array x and remember original positions (by default numpy sorts in O(n log n). idx_sorted = np.argsort(x) x_sorted = x[idx_sorted] # Go over all samples in sorted array. for ii, x_i in enumerate(x_sorted): # The current sample is the start of the current template. We can already exclude the sequences of which # the starting point lies too far from the current template's starting point, i.e. for x_j where x_j > x_i + r # (we ignore the values at the left of x_i, since this would just increase our total count by a factor 2). # We find the index of the first value to the right of x_i that is too far away to be a potential match by # doing a binary search in O(log n) (since the array is sorted). first_too_far = bisect_right(x_sorted, x_i + r) # Extract the position of the starting point of the current template in the original (unsorted) array. i = idx_sorted[ii] if i >= N - m: # Skip if the template does not fit. continue # Loop over starting point of a potential matching template (candidate), which in between the current sample # and the first-too-far sample in the sorted array). for jj in range(ii + 1, first_too_far): # Extract the position of the starting point of the candidate template in the original (unsorted) array. j = idx_sorted[jj] if j >= N - m: # Skip if the template does not fit. continue # Check if the current template and the current candidate are a match. k = 1 # we do not need to check the first point again. while k < m: if abs(x[i + k] - x[j + k]) > r: break k += 1 if k == m: B += 1 # Check if they are still a match, when the template length is increased by one. if abs(x[i + m] - x[j + m]) < r: A += 1 if A == 0: return np.inf else: return np.log(B/A) def _sampen_simple(x, m, r): """ Compute sample entropy using a simple, intuitive algorithm (and faster than sampenc, but still slow). Implementation of an algortihm proposed by Manis et al. Low Computational Cost for Sample Entropy (2018) https://pdfs.semanticscholar.org/9bc5/343a117f4355542d14a7aea366cd4920029c.pdf Args: x (np.ndarray): 1D array containing the signal to compute the sample entropy of. m (int): sequence length. r (float): matching tolerance. Sequences are considered similar if their Chebyshev distance is below r. Returns: (flaot): sample entropy of x. """ N = len(x) A = 0 B = 0 for i in range(N - m): for j in range(i + 1, N - m): # Consider each possible sequence to the right of the current template. k = 0 while k < m: if abs(x[i + k] - x[j + k]) > r: # Sequence is similar until a large difference between two components is found, in which case we # ignore the current sequence. break k += 1 if k == m: # Only reach here if we finished the while-loop (without break), so that means we found a # sequence similar to te template. B += 1 if abs(x[i + m] - x[j + m]) < r: # See if there would still be a match if we increase the template length with 1. A += 1 # Compute sample entropy. if A == 0: return np.inf else: return np.log(B/A) def _sampenc(y, M, r): """ Compute the sample entropy for values of m up to M-1. Code was ported from the following MATLAB code: https://www.physionet.org/physiotools/sampen/matlab/1.1-1/sampenc.m This code is very slow in Python (also compared to MATLAB). Suggest to keep this implementation as a golden standard, as it is a widely used implementation, but there are faster algortihms in this module. Args: y (np.ndarray): input data. M (int): maximum template length. r (float): matching tolerance. Returns: e (np.ndarray): sample entropy estimates for m=0,1,...,M-1. A (np.ndarray): number of matches for m=1,...,M. B (np.ndarray): number of matches for m=0,...,M-1 excluding last point. """ M = int(M) n = len(y) lastrun = np.zeros(n, dtype=int) run = np.zeros(n, dtype=int) A = np.zeros(M) B = np.zeros(M) for i in range(n - 1): nj = n - i - 1 y1 = y[i] for jj in range(nj): j = jj + i + 1 if abs(y[j] - y1) < r: run[jj] = lastrun[jj] + 1 M1 = min(M, run[jj]) for m in range(M1): A[m] = A[m] + 1 if j < (n - 1): B[m] = B[m] + 1 else: run[jj] = 0 for j in range(nj): lastrun[j] = run[j] N = n*(n - 1)/2 B = np.concatenate([[N], B[0: (M - 1)]], axis=0) p = B/A e = np.log(p) return e, A, B def walsh_spectral_entropy(x): """ Compute Walsh Spectral Entropy for a categorical sequence `x`. References: Kirsch MR, Monahan K, Weng J, Redline S, Loparo KA. Entropy-based measures for quantifying sleep-stage transition dynamics: relationship to sleep fragmentation and daytime sleepiness. IEEE Trans Biomed Eng. 2012 Mar;59(3):787-96. doi: 10.1109/TBME.2011.2179032. Args: x (np.ndarray): 1D array with sequence of categories. Categories must be specified by a unique integer. Returns: entropy (float): the Walsh Spectral Entropy. """ from sympy import fwht # Check input. x = np.asarray(x).squeeze().copy() if x.ndim != 1: raise ValueError(f'Can only be applied to 1D arrays. Got array with shape {x.shape}.') # Truncate to power of 2. x = x[:2 ** int(np.floor(np.log2(len(x))))] n = len(x) # To shape (n, 1). x = x.reshape(-1, 1) # One-hot encode classes. enc = OneHotEncoder() x = enc.fit_transform(x).toarray() x -= np.mean(x, axis=-1, keepdims=True) if x.shape[1] == 2: # We only need one array (the second one is redundant. x = x[:, :1] # Walsh transform (column-wise). Ht = 1 / n * np.asarray(fwht(x)).astype(float) # Compute periodogram. P = Ht ** 2 # Add small constant for numerical stability. P = P + 1e-10 # Normalize. Pn = P / np.sum(P) # Compute entropy. entropy = -(1 / np.log(Pn.size)) * np.sum(Pn * np.log(Pn)) return entropy