Source code for nnsa.containers.time_series

"""
Module for creating an object that holds the data of a time series.
"""
import copy
import pickle
import warnings
import os

import h5py
import numpy as np
import scipy.io

from nnsa.edfreadpy.io.utils import check_unit, standardize_and_check_label
from scipy.stats import zscore

from nnsa.artefacts.artefact_detection import signal_quality, detect_artefact_samples, remove_flatlines, \
    remove_outliers, remove_values, remove_neighborhood_artefacts
from nnsa.cwt.utils import compute_power_cwt
from nnsa.feature_extraction.aeeg import AmplitudeEeg
from nnsa.feature_extraction.entropy import MultiScaleEntropy
from nnsa.feature_extraction.envelope import Envelope
from nnsa.feature_extraction.fractality import MultifractalAnalysis, LineLength
from nnsa.feature_extraction.frequency_analysis import PowerAnalysis, Psd
from nnsa.io.hdf5 import write_dict_to_hdf5
from nnsa.parameters.parameters import ClassWithParameters, Parameters
from nnsa.preprocessing.detrending import detrend_poly
from nnsa.preprocessing.resample import resample_by_interpolation, resample_by_filtering
from nnsa.preprocessing.saved_filters import filter_saved_filter
from nnsa.utils.paths import check_directory_exists
from nnsa.utils.arrays import interp_nan, stepwise_reduce, get_range_mask, moving_max, nanfun, \
    moving_mean, moving_median, clamp
from nnsa.utils.config import NUMERIC_TYPES
from nnsa.utils.conversions import convert_time_scale
from nnsa.utils.segmentation import segment_generator
from nnsa.utils.testing import assert_equal

__all__ = [
    'TimeSeries'
]


[docs]class TimeSeries(ClassWithParameters): """ High-level interface for processing of any type of time series. Args: signal (np.ndarray): 1D array containing the signal. fs (float): sampling frequency of the signal array. label (str, optional): the label of the signal (if `check_label` is True, should be a valid standardized name, see check_label()). unit (str, optional): the unit of the signal (should be a valid standardized unit, see check_unit()). Defaults to 'a.u'. info (dict, optional): dict with info about the signal. Dict can contain 'source', specifying the path to the source file of the signal. It may have an optional additional field 'processing', which is a list of strings specifying several processing steps that were applied to the original signal loaded from 'source'. If None, the 'source' field in the dict is set to 'UNKNOWN'. Defaults to None. time_offset (float, optional): optional time offset in seconds. Time array starts at this offset. Defaults to 0. check_label (bool, optional): if True, the passed label is checked whether it is valid. If False, no check is performed and any label may be given. Defaults to True. check_unit (bool, optional): if True, the passed unit is checked whether it is valid. If False, no check is performed and any unit may be given. Defaults to True. **kwargs (optional): optional keyword arguments for setting parameters. """ def __init__(self, signal, fs, label='no label', unit='a.u.', info=None, time_offset=0, check_label=False, check_unit=False, **kwargs): # Call parent's __init__. super().__init__(**kwargs) self.signal = signal self._fs = fs self.check_label = check_label self.label = label self.check_unit = check_unit self.unit = unit self.info = info if info is not None else {} self.time_offset = time_offset def __repr__(self): """ Return a comprehensive info string about this object. Returns: (str): a comprehensive info string about this object. """ return '{} with label: {}, unit: {}, fs: {} Hz, number of samples: {}, info: {}'\ .format(self.__class__.__name__, self.label, self.unit, self.fs, len(self.signal), self.info) def __add__(self, other): """ Add an single value or array values to the signal. Args: other (TimeSeries, np.ndarray, int, float): thing to add. Returns: ts_out (TimeSeries): copy of the TimeSeries signal with the new signal values. """ # Other should be a TimeSeries or an array. if isinstance(other, TimeSeries): other_signal = other.signal elif isinstance(other, np.ndarray): other_signal = other elif isinstance(other, NUMERIC_TYPES): other_signal = other else: raise ValueError('Cannot add {} to TimeSeries.'.format(type(other))) # Check that the reference signal has equal length. if not isinstance(other, NUMERIC_TYPES) and other_signal.shape != self.signal.shape: raise ValueError('Expected signal with shape {}. Instead got shape {}.' .format(self.signal.shape, other_signal.shape)) # Add the other signal. signal_new = self.signal + other_signal # Make a copy of the current object and reset the signal. ts_out = copy.deepcopy(self) ts_out._signal = signal_new.astype(self.parameters['dtype']) # Return a new TimeSeries object with the new signal. return ts_out def __array__(self): """ We can convert to a numpy array using numpy.array or numpy.asarray, which will call this __array__ method. Returns: a copy of the 1D signal array. """ return self.signal.copy() def __len__(self): return len(self.signal) def __mul__(self, other): # Other should be a TimeSeries or an array. if isinstance(other, TimeSeries): other_signal = other.signal elif isinstance(other, np.ndarray): other_signal = other elif isinstance(other, NUMERIC_TYPES): other_signal = other else: raise ValueError('Cannot multiple TimeSeries with {}.'.format(type(other))) # Check that the reference signal has equal length. if not isinstance(other, NUMERIC_TYPES) and other_signal.shape != self.signal.shape: raise ValueError('Expected signal with shape {}. Instead got shape {}.' .format(self.signal.shape, other_signal.shape)) # Multiply with the other signal. signal_new = self.signal * other_signal # Make a copy of the current object and reset the signal. ts_out = copy.deepcopy(self) ts_out._signal = signal_new.astype(self.parameters['dtype']) # Return a new TimeSeries object with the new signal. return ts_out def __sub__(self, other): # Other should be a TimeSeries or an array. if isinstance(other, TimeSeries): other_signal = other.signal elif isinstance(other, np.ndarray): other_signal = other elif isinstance(other, NUMERIC_TYPES): other_signal = other else: raise ValueError('Cannot subtract {} from TimeSeries.'.format(type(other))) # Check that the reference signal has equal length. if not isinstance(other, NUMERIC_TYPES) and other_signal.shape != self.signal.shape: raise ValueError('Expected signal with shape {}. Instead got shape {}.' .format(self.signal.shape, other_signal.shape)) # Subtract the other signal. signal_new = self.signal - other_signal # Make a copy of the current object and reset the signal. ts_out = copy.deepcopy(self) ts_out._signal = signal_new.astype(self.parameters['dtype']) # Return a new TimeSeries object with the new signal. return ts_out def __truediv__(self, other): # Other should be a TimeSeries or an array. if isinstance(other, TimeSeries): other_signal = other.signal elif isinstance(other, np.ndarray): other_signal = other elif isinstance(other, NUMERIC_TYPES): other_signal = other else: raise ValueError('Cannot divide TimeSeries by {}.'.format(type(other))) # Check that the reference signal has equal length. if not isinstance(other, NUMERIC_TYPES) and other_signal.shape != self.signal.shape: raise ValueError('Expected signal with shape {}. Instead got shape {}.' .format(self.signal.shape, other_signal.shape)) # Divide by the other signal. signal_new = self.signal / other_signal # Make a copy of the current object and reset the signal. ts_out = copy.deepcopy(self) ts_out._signal = signal_new.astype(self.parameters['dtype']) # Return a new TimeSeries object with the new signal. return ts_out
[docs] @staticmethod def default_parameters(): """ Return the default parameters as a dictionary. Returns: (dict): a default set of parameters. """ pars = { # Precision (use lower precision for less memory usage). # If None, takes the same data type as the input. 'dtype': None, } return Parameters(**pars)
@property def dtype(self): """ Return the effective dtype (should be consistent with the dtype in the parameters). """ return self.signal.dtype @property def fs(self): """ Return the sampling frequency of the signal. Returns: (float): the sampling frequency of the signal. """ return self._fs @property def info(self): """ Return the info dictionary. Returns: (dict): info dictionary. """ return self._info @info.setter def info(self, value): """ Set the info dictionary. Args: value (dict): dictionary optionally a key 'source' which specifies the source file of the signal. Additionally, the key 'processing' may specify a list of strings that specify the processing that has been done to the original source signal. """ if 'source' not in value.keys(): value['source'] = 'UNKNOWN' if 'processing' not in value.keys(): value['processing'] = [] self._info = value @property def label(self): """ Return the label of the signal. Returns: (str): the label of the signal. """ return self._label @label.setter def label(self, value): """ Set the label of the signal. Args: value (str): the label of the signal (should be a valid standardized name, see check_label()). """ # Check if valid value. if self.check_label: label, valid = standardize_and_check_label(value) if not valid: raise ValueError('Invalid label "{}". ' 'See nnsa.containers.time_series.standardize_and_check_label().'.format(value)) else: label = '{}'.format(value) self._label = label @property def signal(self): """ Return the signal array. Returns: (np.ndarray): the (1D) signal array. """ return self._signal @signal.setter def signal(self, value): """ Set the signal array. Args: value (np.ndarray): the (1D) signal array. """ x = np.asarray(value).squeeze() if x.ndim != 1: raise ValueError('Signal for TimeSeries must be 1-dimensional. Got array with shape {}.'.format(x.shape)) if self.parameters['dtype'] is not None: x = x.astype(self.parameters['dtype']) self._signal = x @property def time(self): """ Shortcut to time_array (return the time array corresponding to self.signal). Returns: (np.ndarray): time array (in seconds). """ # Shortcut to time_array.. return self.time_array @property def time_array(self): """ Return the time array corresponding to self.signal. Returns: (np.ndarray): time array (in seconds). """ # Time array based on fs, starting at self.offset. return np.arange(len(self.signal)) / self.fs + self.time_offset @property def unit(self): """ Return the unit of the signal. Returns: (str): the unit of the signal. """ return self._unit @unit.setter def unit(self, value): """ Set the unit of the signal. Args: value (str): the unit of the signal (should be a valid standardized unit, see check_unit()). """ if self.check_unit: # Check if valid unit. valid = check_unit(value) if not valid: raise ValueError('Invalid unit "{}".'.format(value)) self._unit = value
[docs] def amplitude_eeg(self, verbose=1, **kwargs): """ Compute amplitude-integrated EEG (aEEG) from single channel continuous EEG. This is a wrapper that prepares the input for AmplitudeEeg.wct() and returns the result. Args: verbose (int, optional): verbose level. Defaults to 1. **kwargs (optional): optional keyword arguments for the AmplitudeEeg class. Returns: result (nnsa.AmplitudeEegResult): object containing the aEEG. """ if verbose > 0: print('Emulating aEEG of {}...'.format(self.label)) # Initialize AmplitudeEeg object (pass user specified keyword arguments). aeeg = AmplitudeEeg(**kwargs) # Process. result = aeeg.process(self.signal, self.fs, label=self.label.replace('EEG', 'aEEG'), verbose=verbose) # Add info string about data to result. self._postprocess_result(result) return result
[docs] def as_dict(self): """ Return the TimeSeries data in a dictionary. """ data = { 'signal': self.signal, 'fs': float(self.fs), 'label': self.label, 'unit': self.unit, 'info': self.info, 'time_offset': float(self.time_offset), } return data
[docs] def astype(self, dtype, inplace=False): if inplace: ts = self else: ts = copy.deepcopy(self) # Set type. ts.parameters['dtype'] = dtype def astype(x): # Type is changed when assigning array to ts.signal due to ts.parameters['dtype'], # so we don't need to do it here explicitly. return x ts.transform(astype, inplace=True) if not inplace: return ts
[docs] def burst_detection(self, verbose=1, **kwargs): """ Perform burst detection. Note: this should be used on unfiltered data. This is a wrapper that prepares the input for BurstDetection.burst_detection() and returns the result. Args: verbose (int, optional): verbose level. Defaults to 1. **kwargs (optional): optional keyword arguments for the BurstDetection class. Returns: result (nnsa.BurstDetectionResult): the result of the burst detection. """ from nnsa.feature_extraction.discontinuity import BurstDetection if any('Filtered' in log for log in self.info['processing']): msg = "\nWARNING: Signal seems to be filtered: self.info['processing'] = {}.\n" \ "The burst detection should be called on unfiltered data.".format(self.info['processing']) warnings.warn(msg) if verbose > 0: print('Burst detection of {}...'.format(self.label)) # Initialize BurstDetection object (pass user specified keyword arguments). bd = BurstDetection(**kwargs) # Process. result = bd.burst_detection(self.signal, self.fs, channel_labels=self.label, verbose=verbose) # Add info string about data to result. self._postprocess_result(result) return result
[docs] def clamp(self, threshold, inplace=False): """ Clamp values by applying a clamping function to reduce dynamic range. Args: threshold (float): threshold for the clamping function. inplace (bool, optional): whether to apply inplace (True) or not. Returns: ts (TimeSeries): new Dataset object (only returned if inplace is False). """ if inplace: ts = self else: ts = copy.deepcopy(self) # Process. ts._signal = clamp(ts._signal, neg_thres=-threshold, pos_thres=threshold) # Update the info object of the object to indicate the operation. ts.info['processing'].append('Clamped at "{}"'.format(threshold)) if not inplace: return ts
[docs] def compute_power_cwt(self, freq_low=None, freq_high=None, inplace=False, **kwargs): """ Compute power in specified band. Args: freq_low (float, optional): lower frequency cutoff for bandpower. If None, takes lowest possible. freq_high (float, optional): upper frequency cutoff for bandpower. If None, takes highest possible. inplace (bool, optional): if True, replaces the signal in place by its power. If False, returns a new object. **kwargs (optional): optional keyword arguments for compute_power_cwt(). Returns: ts (TimeSeries): TimeSeries with power of the signal (if inplace is False). """ if inplace: ts = self else: ts = copy.deepcopy(self) # Subtract. ts.signal = compute_power_cwt(x=ts.signal, dt=1/ts.fs, freq_low=freq_low, freq_high=freq_high, **kwargs) if not inplace: return ts
[docs] def cwt(self, verbose=1, **kwargs): """ Apply a continuous wavelet transform on the signal. Args: verbose (int, optional): verbosity level. **kwargs (optional): keyword arguments for Cwt(). Returns: CwtResult object containing the result. """ if verbose > 0: print('Computing continuous wavelet transform of {}...'.format(self.label)) # Initialize CWT object (pass user specified keyword arguments). from nnsa.feature_extraction.wavelets import CWT cwt = CWT(**kwargs) # Process. result = cwt.process(self.signal, self.fs, label=self.label, verbose=verbose) # Add info string about data to result. self._postprocess_result(result) return result
[docs] def demean(self, mean_fun=np.nanmean, inplace=False): """ Demean the time series. Args: mean_fun (function): function that takes in a 1D array and retruns the mean that will be subtracted. Defaults to np.nanmean. Other possibilities could be np.nanmedian, np.mean, ... inplace (bool, optional): whether to apply the operation inplace or return a new object. Returns: ts (nnsa.TimeSeries): object with demeaned signal (returned if inplace is False). """ if inplace: ts = self else: ts = copy.deepcopy(self) # Subtract. ts._signal -= mean_fun(ts.signal) if not inplace: return ts
[docs] def detrend(self, *args, **kwargs): return self.detrend_poly(*args, **kwargs)
[docs] def detrend_poly(self, order, inplace=False): """ Detrend the time series using a polynomial of a specific order. Args: order (int): degree of the polynomial. inplace (bool, optional): whether to apply the operation inplace or return a new object. Returns: ts (nnsa.TimeSeries): object with detrended signal (returned if inplace is False). """ if inplace: ts = self else: ts = copy.deepcopy(self) # Subtract. ts._signal = detrend_poly(ts.signal, order=order) if not inplace: return ts
[docs] def envelope(self, verbose=1, **kwargs): """ Compute envelope. This is a wrapper that prepares the input for Envelope.envelope() and returns the result. Args: verbose (int, optional): verbose level. Defaults to 1. **kwargs (optional): optional keyword arguments for the Envelope class. Returns: result (nnsa.EnvelopeResult): the result of the envelope computation. Examples: To get the envelope data as a new TimeSeries object: >>> signal = 4 * np.sin(np.arange(10000)) >>> ts = TimeSeries(signal=signal, fs=100, label='EEG Cz', unit='uV', info={'source': 'random'}) >>> envelope_result = ts.envelope(verbose=0) >>> ts_envelope = envelope_result.to_time_series() >>> print(type(ts_envelope).__name__) TimeSeries >>> print(ts_envelope.signal.mean()) 3.9998155216320725 """ if verbose > 0: print('Computing envelope of {}...'.format(self.label)) # Initialize Envelope object (pass user specified keyword arguments). envelope = Envelope(**kwargs) # Process. result = envelope.envelope(data_matrix=self.signal, fs=self.fs, channel_labels=self.label, verbose=verbose) # Add info string about data to result. self._postprocess_result(result) return result
[docs] def extract_epoch(self, begin=None, end=None, error_mode='raise'): """ Extract a piece of data in the specified interval. Args: begin (float, optional): start time in seconds (including time_offset) of the epoch to extract. If negative, begin is duration + begin (python-like indexing). If None, the beginning of the signal is taken. Defaults to None. end (float, optional): end time in seconds (including time_offset) of the epoch to extract. If negative, end is duration + end (python-like indexing). If None, the end of the signal is taken. Must point to a time greater than `begin`. Defaults to None. error_mode (str): whether to 'raise' an error when no time overlap is found, or to just 'warn'. Returns: ts_epoch (nnsa.TimeSeries): a new TimeSeries object with only the data of the specified epoch. """ # Create a copy, cut off the correct piece of the array and update the time_offset. ts = copy.deepcopy(self) # Get time and epoch mask. time = self.time time_mask = get_range_mask(time, x_min=begin, x_max=end) if not np.any(time_mask): msg = 'Specified epoch (begin={}, end={}) out of time range ({} - {} s).'\ .format(begin, end, time[0], time[-1]) if error_mode.lower() == 'raise': raise ValueError(msg) elif error_mode.lower() == 'warn': warnings.warn('\n{}'.format(msg)) elif error_mode.lower() == 'ignore': pass else: raise ValueError('Invalid option error_mode="{}"'.format(error_mode)) time_offset = begin else: time = time[time_mask] time_offset = time[0] # Extract data and update time offset. ts._signal = ts.signal[time_mask] ts.time_offset = time_offset return ts
[docs] def filter(self, filter_obj, remove_mean=False, inplace=False, verbose=1, **kwargs): """ Filter the signal with the given 1-D digital filter. Note: this introduces a delay in the filter signal. For zero-phase filtering use filtfilt. Args: filter_obj (FilterBase-derived): a child class from the nnsa.FilterBase class. The fs property does not need to be set correctly, as it will be automatically (re)set in this function. inplace (bool, optional): if True, filters inplace, else a new object is returned. Defaults to False. remove_mean (bool, optional): remove the mean value after filtering (useful for EEG data). verbose (int, optional): verbose level. Defaults to 1. **kwargs (optional): optional keyword arguments passed on to the filter() function of filter_obj. Returns: (TimeSeries): new TimeSeries object containing the filtered signal (if inplace is False). """ if verbose > 0: print('Filtering {} with {}...'.format(self.label, filter_obj)) if inplace: ts = self else: ts = copy.deepcopy(self) # Set the sample frequency (the FilterBase class resets any precomputed coefficients if necessary). filter_obj.fs = ts.fs # Do the filtering. signal_filtered = filter_obj.filter(ts.signal, **kwargs) # Replace the signal array. ts._signal = signal_filtered.astype(self.parameters['dtype']) # Remove mean if requested. if remove_mean: signal_filtered -= np.nanmean(signal_filtered) # Update the info object of the object to indicate the operation. ts.info['processing'].append('Filtered: "{}"'.format(filter_obj)) if not inplace: # Return the new TimeSeries object with the new signal. return ts
[docs] def filtfilt(self, filter_obj, remove_mean=False, inplace=False, verbose=1, **kwargs): """ Filter the signal with the given filter using zero-phase filtering (filtfilt). Args: filter_obj (FilterBase-derived): a child class from the nnsa.FilterBase class. The fs property does not need to be set correctly, as it will be automatically (re)set in this function. inplace (bool, optional): if True, filters inplace, else a new object is returned. Defaults to False. remove_mean (bool, optional): remove the mean value after filtering (useful for EEG data). verbose (int, optional): verbose level. Defaults to 1. **kwargs (optional): optional keyword arguments passed on to the filtfilt() function of filter_obj. Returns: (TimeSeries): new TimeSeries object containing the filtered signal (if inplace is False). """ if verbose > 0: print('Zero-phase filtering {} with {}...'.format(self.label, filter_obj)) if inplace: ts = self else: ts = copy.deepcopy(self) # Set the sample frequency (the FilterBase class resets any precomputed coefficients if necessary). filter_obj.fs = ts.fs # Remove mean if requested. if remove_mean: ts.signal -= np.nanmean(ts.signal) # Do the filtering. signal_filtered = filter_obj.filtfilt(ts.signal, **kwargs) # Replace the signal array. ts._signal = signal_filtered.astype(self.parameters['dtype']) # Update the info object of the object to indicate the operation. ts.info['processing'].append('Zero-phase filtered: "{}"'.format(filter_obj)) if not inplace: # Return the new TimeSeries object with the new signal. return ts
[docs] def fill_nan(self, value, inplace=False, verbose=0): """ Replace nans in the signal by a constant value. Args: value (float): the value to fill with. inplace (bool, optional): if True, resamples inplace, else a new object is returned. Defaults to False. verbose (int, optional): verbose level. Defaults to 1. Returns: (TimeSeries): new TimeSeries object containing the new signal (if inplace is False). """ if verbose > 0: print('Replacing nans with {}...'.format(self.label, value)) if inplace: ts = self else: ts = copy.deepcopy(self) # Replace the nans in the signal array. ts._signal[np.isnan(ts._signal)] = value # Update the info object of the object to indicate the operation. ts.info['processing'].append('Filled nans: filled with "{}"'.format(value)) if not inplace: # Return the new TimeSeries object with the new signal. return ts
[docs] def filter_saved_filter(self, filter_name, remove_mean=False, inplace=False, verbose=1, **kwargs): """ Filter the signal with a specified saved filter. This is just a wrapper of the nnsa.preprocessing.filter_saved_filter() function. Args: filter_name (str): see nnsa.preprocessing.filter_saved_filter(). remove_mean (bool, optional): remove the mean value after filtering (useful for EEG data). inplace (bool, optional): if True, resamples inplace, else a new object is returned. Defaults to False. verbose (int, optional): verbose level. Defaults to 1. **kwargs (optional): optional keyword arguments passed on to nnsa.preprocessing.filter_saved_filter(). Returns: (TimeSeries): new TimeSeries object containing the filtered signal (if inplace is False). """ if verbose > 0: print('Filtering {} with saved filter "{}"...'.format(self.label, filter_name)) if inplace: ts = self else: ts = copy.deepcopy(self) # Do the filtering. signal_filtered = filter_saved_filter(x=ts.signal, filter_name=filter_name, fs=ts.fs, **kwargs) # Remove mean if requested. if remove_mean: signal_filtered -= np.nanmean(signal_filtered) # Replace the signal array. ts._signal = signal_filtered.astype(self.parameters['dtype']) # Update the info object of the object to indicate the operation. ts.info['processing'].append('Filtered: saved filter "{}"'.format(filter_name)) if not inplace: # Return the new TimeSeries object with the new signal. return ts
[docs] def interp(self, t, **kwargs): """ Wrapper for np.interp, performing linear interpolation to get the signal values at times `t`. Args: t (np.ndarray): time instances at which to evaluate the signal. **kwargs (optional): optional keyword arguments for np.interp. Returns: y (np.ndarray): 1D array with interpolated values. """ interp_kwargs = dict({ 'left': np.nan, 'right': np.nan, }, **kwargs) y = np.interp(t, self.time_array, self.signal, **interp_kwargs) return y
[docs] def interp_nan(self, *args, **kwargs): return self.interpolate_nan(*args, **kwargs)
[docs] def interpolate_nan(self, inplace=False, max_nan_length=None): """ Linearly interpolate nan values. Args: inplace (bool, optional): interpolate inplace (True) or return a copy (False). Defaults to False. max_nan_length (int or str, optional): number of maximum allowable length of a nan segment that we interpolate (in seconds). If 'auto', the average duration of a constant value in the signal is taken. If None, no limit is applied, i.e. all nans are interpolated. Defaults to None. Returns: ts (TimeSeries): time series with nans interpolated (if inplace is False). """ if inplace: ts = self else: ts = copy.deepcopy(self) if max_nan_length is not None: if isinstance(max_nan_length, str): if max_nan_length == 'auto': x = ts.signal idx_transitions = np.where(np.abs(np.diff(x)) > 1e-12)[0] idx_transitions = np.concatenate([[0], idx_transitions, [len(x) - 1]]) line_lengths = np.diff(idx_transitions) max_nan_length = np.mean(line_lengths) else: raise ValueError('Invalid max_nan_length argument "{}".'.format(max_nan_length)) else: # Covert from seconds to samples. max_nan_length = int(round(max_nan_length * self.fs)) # Interpolate nans. signal_new = interp_nan(ts.signal, max_nan_length=max_nan_length) # Replace the signal array. ts._signal = signal_new.astype(self.parameters['dtype']) # Update the info object of the object to indicate the operation. ts.info['processing'].append('Nans interpolated with max_nan_length = {}' .format(max_nan_length)) if not inplace: # Return the new TimeSeries object with the new signal. return ts
[docs] def line_length(self, verbose=1, **kwargs): """ Compute line length feature. This is a wrapper that prepares the input for LineLength.line_length() and returns the result. Args: verbose (int, optional): verbose level. Defaults to 1. **kwargs (optional): optional keyword arguments for the LineLength class. Returns: result (nnsa.LineLengthResult): the result of the line length computation. """ if verbose > 0: print('Computing Line Length of {}...'.format(self.label)) # Initialize LineLength object (pass user specified keyword arguments). line_length = LineLength(**kwargs) # Run line_length. result = line_length.line_length(self.signal, self.fs, channel_labels=self.label, verbose=verbose) # Add info string about data to result. self._postprocess_result(result) return result
[docs] def mean(self, max_nan_frac=1): """ Return the mean. """ y = nanfun(np.nanmean, self.signal, max_nan_frac=max_nan_frac) return y
[docs] def median(self, max_nan_frac=1): """ Return the median. """ y = nanfun(np.nanmedian, self.signal, max_nan_frac=max_nan_frac) return y
[docs] def merge(self, other, inplace=False): """ Merge other TimeSeries object(s) into one object based on their time arrays. Fills gaps in between the time arrays with nan. Args: other (TimeSeries or list): list with (compatible) TimeSeries objects. inplace (bool, optional): if True, merges the data inplace by adding it to the current object. If False, a new object is returned, leaving the original ones unchanged. Defaults to False. Returns: out (TimeSeries): new TimeSeries object containing the merged time series (if inplace is False). """ if not isinstance(other, (list, tuple)): other = [other] if inplace: out = self else: out = copy.deepcopy(self) for item in other: # Check class type. if not isinstance(item, out.__class__): raise ValueError('Object of type "{}" cannot be merged with object of type "{}".' .format(type(item), type(out))) # Check sample rate. if out.fs != item.fs: raise ValueError('Cannot merge time series with different sample frequencies. Consider resampling first.') # Check unit. if out.unit != item.unit: msg = f'Different units when merging ({out.unit} and {item.unit})!' warnings.warn(msg) # Determine the index in the signal array that the first sample of other should be placed. fs = out.fs time_offset = out.time_offset time_offset_other = item.time_offset index = int(round((time_offset_other - time_offset)*fs)) if index < 0: if inplace: raise ValueError('`other` should occur after `self` in time to append inplace.') else: out = item.merge(out, inplace=False) else: # Merge signal array. n_samples = len(out.signal) if index < n_samples: # Cut piece off. msg = 'Overwriting data while merging.' warnings.warn(msg) out._signal = out.signal[:index] else: # Add nans. out._signal = np.concatenate( [out.signal, np.full((index - n_samples), fill_value=np.nan, dtype=out.signal.dtype)]) out._signal = np.concatenate((out.signal, item.signal), axis=-1) # Change label if necessary. if out.label != item.label: out.label = f'Merged {out.label} + {item.label}' out.info['processing'].append(f'Merged with {item.info}.') if not inplace: return out
[docs] def moving_mean(self, window, inplace=False, **kwargs): """ Helper-function to apply a moving average. Args: window (float): window of the moving average (in seconds). inplace (bool): whether to apply the filter inplace or not. **kwargs: for nnsa.moving_average() Returns: (TimeSeries): filtered data (if inplace is True). """ if inplace: out = self else: out = copy.deepcopy(self) n = int(round(self.fs*window)) out.transform(lambda x: moving_mean(x, n=n, **kwargs), inplace=True) if not inplace: return out
[docs] def moving_median(self, window, inplace=False, **kwargs): """ Helper-function to apply a moving average. Args: window (float): window of the moving average (in seconds). inplace (bool): whether to apply the filter inplace or not. **kwargs: for nnsa.moving_average() Returns: (TimeSeries): filtered data (if inplace is True). """ if inplace: out = self else: out = copy.deepcopy(self) n = int(round(self.fs*window)) out.transform(lambda x: moving_median(x, n=n, **kwargs), inplace=True) if not inplace: return out
[docs] def moving_average(self, *args, **kwargs): """ Helper-function to apply a moving average. Args: window (float): window of the moving average (in seconds). inplace (bool): whether to apply the filter inplace or not. **kwargs: for nnsa.moving_average() Returns: (TimeSeries): filtered data (if inplace is True). """ return self.moving_mean(*args, **kwargs)
[docs] def moving_max(self, window, inplace=False, **kwargs): """ Helper-function to apply a moving maximum. Args: window (float): window of the moving maximum (in seconds). inplace (bool): whether to apply the filter inplace or not. **kwargs: for nnsa.moving_max() Returns: (TimeSeries): filtered data (if inplace is True). """ if inplace: out = self else: out = copy.deepcopy(self) n = int(round(self.fs*window)) out.transform(lambda x: moving_max(x, n=n, **kwargs), inplace=True) if not inplace: return out
[docs] def multi_scale_entropy(self, verbose=1, **kwargs): """ Perform a multi-scale entropy (mse) analysis. This is a wrapper that prepares the input for MultiScaleEntropy.multi_scale_entropy() and returns the result. Args: verbose (int, optional): verbose level. Defaults to 1. **kwargs (optional): optional keyword arguments to overrule default parameters of the MultiScaleEntropy class. Returns: result (nnsa.MultiScaleEntropyResult): the result of the mse analysis. """ if verbose > 0: print('Multi-scale entropy analysis of {}...'.format(self.label)) # Initialize MultiScaleEntropy object (updates default parameters with user specified keyword arguments). mse = MultiScaleEntropy(**kwargs) # Run multi_scale_entropy. result = mse.multi_scale_entropy(self.signal, self.fs, channel_labels=self.label, verbose=verbose) # Add info string about data to result. self._postprocess_result(result) return result
[docs] def multifractal_analysis(self, verbose=1, **kwargs): """ Perform a multifractal analysis (mfa). This is a wrapper that prepares the input for MultifractalAnalysis.multifractal_analysis() and returns the result. Args: verbose (int, optional): verbose level. Defaults to 1. **kwargs (optional): optional keyword arguments for the MultifractalAnalysis class. Returns: result (nnsa.MultifractalAnalysisResult): the result of the mfa analysis. """ if verbose > 0: print('Multifractal analysis of {}...'.format(self.label)) # Initialize MultifractalAnalysis object (pass user specified keyword arguments). mfa = MultifractalAnalysis(**kwargs) # Run multifractal_analysis. result = mfa.multifractal_analysis(self.signal, self.fs, channel_labels=self.label, verbose=verbose) # Add info string about data to result. self._postprocess_result(result) return result
[docs] def normalize(self, how='zscore', inplace=False, **kwargs): """ Normalize the signal. Args: how (str): how to normalize. Choose from: - "zscore" inplace (bool, optional): whether to normalize inplace (True) or not. **kwargs (dict): for normalization functions. Depends on `how` (see code). Returns: (TimeSeries): normalized signal (only returned if inplace is False). """ if inplace: # We will apply the changes to the current object. ts = self else: # We will apply the changes to a copy of the current object and return this changed copy. ts = copy.deepcopy(self) # Transform signal. x = ts.signal if how == 'zscore': opts = dict({'nan_policy': 'omit'}, **kwargs) x_norm = zscore(x, **opts) else: raise ValueError('Invalid choice how="{}". Choose from {}.' .format(how, ['zscore'])) # Replace the signal array. ts._signal = x_norm.astype(ts.parameters['dtype']) # Update the signal label. ts._label += ' normalized' # Update the info object of the object to indicate the operation. ts.info['processing'].append('Normalized with {}' .format(how)) # Only return if not in place removing. if not inplace: return ts
[docs] def notch_filt(self, f0=50, verbose=1, inplace=False, **kwargs): """ Helper-function to apply a notch filter to remove e.g. power line interference. Args: f0 (float or list): frequencie(s) to suppress. verbose (int): verbosity level. inplace (bool): whether to apply the filter inplace or not. **kwargs (optional): for self.filtfilt(). Returns: (TimeSeries): filtered data (if inplace is True). """ from nnsa.preprocessing.filter import NotchIIR if inplace: out = self else: out = copy.deepcopy(self) if not isinstance(f0, (list, tuple)): f0 = [f0] for f in f0: out.filtfilt(NotchIIR(f0=f), verbose=verbose, inplace=True, **kwargs) if not inplace: return out
[docs] def plot(self, *args, begin=None, end=None, time_scale='seconds', ax=None, **kwargs): """ Plot (part of) the signal. Args: *args (optional): optional arguments for the plt.plot() function. begin (float, optional): begin time in seconds. If None, plots from the first sample. Default is None. end (float, optional): end time in seconds or None to specify the end of the entire signal. Default is None. time_scale (str, optional): the time scale to use. Choose from 'seconds', 'minutes', 'hours'. Defaults to 'seconds'. ax (plt.Axes, optional): axes to plot is. If None, plots in a new figure. Defaults to None. **kwargs (optional): optional keyword arguments for the plt.plot() function. """ # Local import due to long loading. import matplotlib.pyplot as plt if ax is not None: # Set current axes. plt.sca(ax) else: # Create new figure. ax = plt.gca() # Get time. time = self.time time_mask = get_range_mask(time, x_min=begin, x_max=end) time = time[time_mask] # Get part of the signal to plot. sig = self.signal[time_mask] # Define default plot keyword arguments. plot_kwargs = {'label': self.label} # Overrule the default keyword arguments with user specified arguments. plot_kwargs.update(kwargs) # Convert time scale to requested scale. time = convert_time_scale(time, time_scale) # Plot. plt.plot(time, sig, *args, **plot_kwargs) plt.title(self.label) plt.xlabel('Time ({})'.format(time_scale)) plt.ylabel('{} ({})'.format(self.label, self.unit))
[docs] def power_analysis(self, verbose=1, **kwargs): """ Perform a Fourier based power analysis. This is a wrapper that prepares the input for PowerAnalysis.power_analysis() and returns the result. Args: verbose (int, optional): verbose level. Defaults to 1. **kwargs (optional): optional keyword arguments to overrule default parameters of the PowerAnalysis class. Returns: result (nnsa.PowerAnalysisResult): the result of the power analysis. """ if verbose > 0: print('Power analysis of {}...'.format(self.label)) # Initialize PowerAnalysis object (updates default parameters with user specified keyword arguments). power_analysis = PowerAnalysis(**kwargs) # Run power_analysis. result = power_analysis.power_analysis(self.signal, self.fs, channel_labels=self.label, unit=self.unit, verbose=verbose) # Add info string about data to result. self._postprocess_result(result) return result
[docs] def psd(self, verbose=1, **kwargs): """ Estimate power spectral density. This is a wrapper that prepares the input for Psd.psd() and returns the results. Args: verbose (int, optional): verbose level. Defaults to 0. **kwargs (optional): optional keyword arguments to overrule default parameters of the Psd class. Returns: result (nnsa.PsdResult): object containing the result of the psd. """ if verbose > 0: print('Computing power spectral density of {}...'.format(self.label)) # Initialize Psd object. psd = Psd(**kwargs) # Estimate psd. result = psd.psd(self.signal, fs=self.fs, unit=self.unit, axis=-1, verbose=verbose) # Add info string about data to result. self._postprocess_result(result) return result
[docs] def reference(self, reference_signal, reference_label, inplace=False, verbose=1): """ Reference the signal to reference_channel by subtracting reference_signal from the signal. Args: reference_signal (TimeSeries or np.ndarray): 1D array containing the reference signal to subtract. reference_label (str): label of the reference signal. inplace (bool, optional): if True, referenced inplace, else a new object is returned. Defaults to False. verbose (int, optional): verbose level. Defaults to 1. Returns: (TimeSeries): new TimeSeries object containing the referenced signal. """ if verbose > 0: print('Referencing {} with {}...'.format(self.label, reference_label)) if inplace: ts_referenced = self else: ts_referenced = copy.deepcopy(self) # Subtract the reference signal. ts_referenced -= reference_signal # Update the info object indicating the referencing operation. ts_referenced.info['processing'].append('Referenced: reference channel "{}"'.format(reference_label)) if not inplace: # Return the new TimeSeries object with the referenced signal. return ts_referenced
[docs] def remove_artefacts(self, inplace=False, **kwargs): """ Replace samples that are artefacts by np.nan Args: inplace (bool, optional): If True, the samples are replaced directly from the TimeSeries object itself. If False, the function returns a copy of the original TimeSeries object in which the artefacted samples are replaced, leaving the data in the current TimeSeries unchanged. Default is False. **kwargs (optional): optional keyword arguments for overruling default sample quality criteria (see nnsa.artefacts.artefact_detection.detect_artefact_samples()). Returns: (TimeSeries): new TimeSeries object containing the same signal, but with artefacted samples changed to np.nan (only returned if inplace is False). """ if inplace: ts = self else: ts = copy.deepcopy(self) # Identify artefact samples. artefact_mask = detect_artefact_samples(ts.signal, **kwargs) # Replace artefact samples with np.nan. ts.signal[artefact_mask] = np.nan # Update the info object of the object to indicate the operation. ts.info['processing'].append('Removed artefacts with non-default criteria {}'.format(kwargs)) if not inplace: # Return the new TimeSeries object with the new signal. return ts
[docs] def remove_flatlines(self, n_flatline, inplace=False, **kwargs): """ Replace flatlines by np.nan Args: n_flatline (float): minimal number of seconds for a non-changing segment to be a flatline. inplace (bool, optional): If True, the samples are replaced directly from the TimeSeries object itself. If False, the function returns a copy of the original TimeSeries object in which the flatline samples are replaced, leaving the data in the current TimeSeries unchanged. Default is False. **kwargs (optional): optional keyword arguments for remove_flatlines. Returns: (TimeSeries): new TimeSeries object containing the same signal, but with flatline samples changed to np.nan (only returned if inplace is False). """ if inplace: ts = self else: ts = copy.deepcopy(self) # Replace artefact samples with np.nan. remove_flatlines(x=ts.signal, n_flatline=n_flatline, fs=ts.fs, inplace=True, **kwargs) # Update the info object of the object to indicate the operation. ts.info['processing'].append('Removed flatlines with non-default criteria {}'.format(kwargs)) if not inplace: # Return the new TimeSeries object with the new signal. return ts
[docs] def remove_neighborhood_artefacts(self, size, inplace=False, **kwargs): """ Replace values by np.nan if sample sin their neighborhood are nan. Args: size (int): size of the neighborhood (in seconds). E.g., if the neighborhood is 3, sample at t=t is an artefact if there is an artefact in the signal between t-1:t+1. inplace (bool, optional): If True, the samples are replaced directly from the TimeSeries object itself. If False, the function returns a copy of the original TimeSeries object in which the flatline samples are replaced, leaving the data in the current TimeSeries unchanged. Default is False. **kwargs (optional): optional keyword arguments for remove_neighborhood_artefacts(). Returns: (TimeSeries): new TimeSeries object containing the same signal, but with specific samples changed to np.nan (only returned if inplace is False). """ if inplace: ts = self else: ts = copy.deepcopy(self) # Replace artefact samples with np.nan. remove_neighborhood_artefacts(x=ts.signal, size=size, fs=ts.fs, inplace=True, **kwargs) # Update the info object of the object to indicate the operation. ts.info['processing'].append('Removed neighborhood artefacts with non-default criteria {}.' .format(kwargs)) if not inplace: # Return the new TimeSeries object with the new signal. return ts
[docs] def remove_outliers(self, *args, inplace=False, **kwargs): """ Replace outliers by np.nan Args: *args (float): arguments for remove_outliers(). inplace (bool, optional): If True, the samples are replaced directly from the TimeSeries object itself. If False, the function returns a copy of the original TimeSeries object in which the flatline samples are replaced, leaving the data in the current TimeSeries unchanged. Default is False. **kwargs (optional): optional keyword arguments for remove_outliers(). Returns: (TimeSeries): new TimeSeries object containing the same signal, but with outlier samples changed to np.nan (only returned if inplace is False). """ if inplace: ts = self else: ts = copy.deepcopy(self) # Replace artefact samples with np.nan. remove_outliers(*args, x=ts.signal, fs=ts.fs, inplace=True, **kwargs) # Update the info object of the object to indicate the operation. ts.info['processing'].append('Removed outliers with non-default criteria {} and {}'.format(args, kwargs)) if not inplace: # Return the new TimeSeries object with the new signal. return ts
[docs] def remove_values(self, *args, inplace=False, **kwargs): """ Replace values by np.nan Args: *args (float): arguments for remove_values(). inplace (bool, optional): If True, the samples are replaced directly from the TimeSeries object itself. If False, the function returns a copy of the original TimeSeries object in which the flatline samples are replaced, leaving the data in the current TimeSeries unchanged. Default is False. **kwargs (optional): optional keyword arguments for remove_values(). Returns: (TimeSeries): new TimeSeries object containing the same signal, but with specific samples changed to np.nan (only returned if inplace is False). """ if inplace: ts = self else: ts = copy.deepcopy(self) # Replace artefact samples with np.nan. remove_values(*args, x=ts.signal, fs=ts.fs, inplace=True, **kwargs) # Update the info object of the object to indicate the operation. ts.info['processing'].append('Removed values with non-default criteria {} and {}'.format(args, kwargs)) if not inplace: # Return the new TimeSeries object with the new signal. return ts
[docs] def resample(self, fs_new, method='polyphase_filtering', inplace=False, verbose=1, **kwargs): """ Resample the signal data to new sampling frequency fs_new. Deals with nans by interpolating them before resampling and putting nans back at locations near the artefacts. Args: fs_new (float): new sampling frequency to resample to. method (str, optional): the resampling method: If 'polyphase_filtering', the data will be resampled using polyphase filtering (see scipy.signal.resample_poly). If 'interpolation', the data will be resampled using interpolation (see scipy.interpolate.interp1d). Default is 'polyphase_filtering'. inplace (bool, optional): if True, resamples inplace, else a new object is returned. Defaults to False. verbose (int, optional): verbose level. Defaults to 1. **kwargs (optional): optional keyword arguments for scipy's resample/interpolation function that is applied (which depends on the method argument). Returns: (nnsa.TimeSeries): TimeSeries object with the resampled signal. """ if verbose > 0: print('Resampling {} using {}...'.format(self.label, method)) if inplace: ts = self else: ts = copy.deepcopy(self) # Extract signal and interpolate nan. signal = ts.signal # Resample the signal with specified method. if method == 'polyphase_filtering': signal_resampled = resample_by_filtering(x=signal, fs=self.fs, fs_new=fs_new, **kwargs) elif method == 'interpolation': # Construct time array corresponding to current fs. time = ts.time_array signal_resampled = resample_by_interpolation(x=signal, t=time, fs_new=fs_new, **kwargs) else: raise ValueError('Invalid method "{}" for resampling. Choose from "polyphase_filtering" (default), ' '"interpolation".'.format(method)) # Replace the signal array and update the sample frequency. ts._signal = signal_resampled.astype(ts.signal.dtype) ts._fs = fs_new # Update the info object of the object to indicate the operation. ts.info['processing'].append('Resampled using {}: {} Hz -> {} Hz'.format(method, self.fs, fs_new)) if not inplace: # Return the new TimeSeries object with the new signal. return ts
[docs] @staticmethod def read_hdf5(filepath, label=None, begin=None, end=None, **kwargs): """ Read a TimeSeries object from a .hdf5 file. Args: filepath (str or opened file): filepath to read or an open HDF5 file. label (str, optional): label of the hdf5 dataset to read. If None, reads the first TimeSeries that it can find in the file. Defaults to None. begin (float, optional): start second (wrt time offset). end (float, optional): end second (wrt time offset). **kwargs (optional): keyword arguments for TimeSeries(). Returns: ts (nnsa.TimeSeries): TimeSeries object with data read from the file. Examples: >>> signal = np.random.rand(1000) >>> ts = TimeSeries(signal=signal, fs=10, label='EEG Cz') >>> ts.save_hdf5('testfile.hdf5') >>> ts_read = TimeSeries.read_hdf5('testfile.hdf5') >>> ts_read_epoch = TimeSeries.read_hdf5('testfile.hdf5', begin=10, end=20) >>> os.remove('testfile.hdf5') >>> assert_equal(ts.signal, ts_read.signal) >>> assert_equal(ts.extract_epoch(begin=10, end=20).signal, ts_read_epoch.signal) """ def _read(f, label, **kwargs): if label is None: # Look for a timeseries saved in the hdf5. for lab in f: sig = f[lab] if 'type' in sig.attrs: if sig.attrs['type'].decode() == 'TimeSeries': # Found the time series. label = lab break elif 'fs' in sig.attrs: # Found time series. label = lab break if label is None: raise ValueError('No TimeSeries found in file {}.'.format(filepath)) # Read data. sig = f[label] fs = sig.attrs['fs'] time_offset = sig.attrs['time_offset'] unit = sig.attrs['unit'].decode() # Determine start and end index. end_idx = None if end is None else max([0, min([int((end - time_offset)*fs), sig.shape[-1]])]) begin_idx = 0 if begin is None else max([int((begin - time_offset)*fs), 0]) # Extract requested part. signal = sig[begin_idx:end_idx] # Update time_offset (depending on `begin`). time_offset = time_offset + begin_idx/fs # Do not read info, just put the current filepath. info = {'source': f.filename} # Create a TimeSeries object. ts = TimeSeries(signal=signal, fs=fs, label=label, unit=unit, info=info, time_offset=time_offset, **kwargs) return ts if isinstance(filepath, str): # Open the file and read. with h5py.File(filepath, 'r') as f: ts = _read(f, label=label, **kwargs) else: # Assume that `filepath` is an opened HDF5 file. ts = _read(filepath, label=label, **kwargs) return ts
[docs] @staticmethod def read_mat(filepath, **kwargs): """ Read a TimeSeries object from a .mat file. Args: filepath (str): filepath to read. **kwargs (optional): keyword arguments for TimeSeries(). Returns: ts (nnsa.TimeSeries): TimeSeries object with data read from the file. Examples: >>> signal = np.random.rand(1000) >>> ts = TimeSeries(signal=signal, fs=10, label='EEG Cz') >>> ts.save_mat('testfile.mat') >>> ts_read = TimeSeries.read_mat('testfile.mat') >>> os.remove('testfile.mat') >>> assert_equal(ts.signal, ts_read.signal) """ # Read the file. data = scipy.io.loadmat(filepath, squeeze_me=True, chars_as_strings=True, struct_as_record=True) signal = data['signal'] fs = data['fs'] label = data['label'] unit = data['unit'] info = {'source': filepath} time_offset = data['time_offset'] # Create a TimeSeries object. ts = TimeSeries(signal=signal, fs=fs, label=label, unit=unit, info=info, time_offset=time_offset, **kwargs) return ts
[docs] @staticmethod def read_pickle(filepath, **kwargs): """ Read a TimeSeries object from a pickle file. Args: filepath (str): filepath to read. **kwargs (optional): keyword arguments for TimeSeries(). Returns: ts (nnsa.TimeSeries): TimeSeries object with data read from the file. Examples: >>> signal = np.random.rand(1000) >>> ts = TimeSeries(signal=signal, fs=10, label='EEG Cz') >>> ts.save_pickle('testfile.pkl') >>> ts_read = TimeSeries.read_pickle('testfile.pkl') >>> os.remove('testfile.pkl') >>> assert_equal(ts.signal, ts_read.signal) """ # Read the file. with open(filepath, 'rb') as f: data = pickle.load(f) signal = data['signal'] fs = data['fs'] label = data['label'] unit = data['unit'] info = {'source': filepath} time_offset = data['time_offset'] # Create a TimeSeries object. ts = TimeSeries(signal=signal, fs=fs, label=label, unit=unit, info=info, time_offset=time_offset, **kwargs) return ts
[docs] def save_hdf5(self, filepath, mode='w', overwrite=False): """ Save the TimeSeries data to a .hdf5 file. Args: filepath (str): filepath to save to. mode (str, optional): 'w' for write mode or 'a' for append mode. Defaults to 'w'. overwrite (bool, optional): if True, overwrites existing files. If False, raises an error when `filepath` already exists and mode is 'w'. Defaults to False. """ _, file_extension = os.path.splitext(filepath) if not file_extension: # Add file extension. filepath = '{}.hdf5'.format(filepath) elif file_extension.lower() not in ['.hdf5', '.h5']: raise ValueError('Invalid file extension "{}". Use one of {}.' .format(file_extension, ['.hdf5', '.h5'])) if mode == 'w' and not overwrite: # Check if filepath already exists. if os.path.exists(filepath): raise ValueError('File "{}" already exists. Overwriting can be enabled by setting overwrite=True.') # Check the directory and create if it does not exist. check_directory_exists(filepath=filepath) # Write hdf5 file. with h5py.File(filepath, mode=mode) as f: # Write array data. sig = f.create_dataset(self.label, data=self.signal) sig.attrs['type'] = np.string_('TimeSeries') # Write non-array data as attributes to the signal array. sig.attrs['fs'] = float(self.fs) sig.attrs['time_offset'] = float(self.time_offset) # Convert strings to np.string_ type as recommended for compatibility. sig.attrs['unit'] = np.string_(self.unit) # Write dict as a separate dataset. write_dict_to_hdf5(f, self.info, 'info/{}'.format(self.label))
[docs] def save_mat(self, filepath, overwrite=False): """ Save the TimeSeries data to a MATLAB-style .mat file. Args: filepath (str): filepath to save to. overwrite (bool, optional): if True, overwrites existing files. If False, raises an error when `filepath` already exists. Defaults to False. """ _, file_extension = os.path.splitext(filepath) if not file_extension: # Add file extension. filepath = '{}.mat'.format(filepath) else: if file_extension.lower() not in ['.mat']: raise ValueError('Invalid file extension "{}". Use one of {}.' .format(file_extension, ['.mat'])) if not overwrite: # Check if filepath already exists. if os.path.exists(filepath): raise ValueError('File "{}" already exists. Overwriting can be enabled by setting overwrite=True.') # Create data dict to save. data = self.as_dict() # Save. scipy.io.savemat(filepath, data)
[docs] def save_pickle(self, filepath, overwrite=False): """ Save the TimeSeries data to a pickle file. Args: filepath (str): filepath to save to. overwrite (bool, optional): if True, overwrites existing files. If False, raises an error when `filepath` already exists. Defaults to False. """ _, file_extension = os.path.splitext(filepath) if not file_extension: # Add file extension. filepath = '{}.pkl'.format(filepath) else: if file_extension.lower() not in ['.pkl', '.pickle', '.p', '.pk']: raise ValueError('Invalid file extension "{}". Use one of {}.' .format(file_extension, ['.pkl', '.pickle', '.p', '.pk'])) if not overwrite: # Check if filepath already exists. if os.path.exists(filepath): raise ValueError('File "{}" already exists. Overwriting can be enabled by setting overwrite=True.') # Create data dict to save. data = self.as_dict() # Save. with open(filepath, 'wb') as f: pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
[docs] def segment(self, segment_length, overlap=0): """ Segment the signal (into smaller time segments). First segment starts at the start of the signal. Any left over samples at the end of the signal that cannot make up a segment of the specified length are ignored. Args: segment_length (float): length of a segment in seconds. overlap (float, optional): overlap between succesive segments in seconds. Defaults to 0. Yields: (TimeSeries): TimeSeries object holding the data of the next segment. """ # Create segment generator. seg_generator = segment_generator(self.signal, segment_length=segment_length, overlap=overlap, fs=self.fs, axis=0) # Loop over segments and collect them as TimeSeries objects. for i, seg in enumerate(seg_generator): # Update the info object of the current object for the return object with a description of the segment. # Make a copy so that we do not change the info of the current object. info_updated = copy.deepcopy(self.info) info_updated['processing'].append('Segment: {} s : {} s'.format(i * segment_length - i * overlap, (i + 1) * segment_length - i * overlap)) ts = TimeSeries(signal=np.copy(seg), # It is safest to save a copy of the segment (instead of a view). fs=self.fs, label=self.label, unit=self.unit, info=info_updated, **self.parameters) yield ts
[docs] def signal_quality(self): """ Collect some characteristics of the signal related to signal quality. This is a wrapper of nnsa.artefacts.artefact_detection.signal_quality() Returns: (dict): see nnsa.artefacts.artefact_detection.signal_quality() """ return signal_quality(self.signal)
[docs] def stepwise_moving_average(self, *args, avg_fun=np.nanmean, **kwargs): return self.stepwise_reduce(*args, reduce_fun=avg_fun, **kwargs)
[docs] def stepwise_reduce(self, window, stepsize=None, reduce_fun=np.nanmean, max_nan=0.5, inplace=False, verbose=1): """ Compute stepwise value. Args: window (float): length of the segments /moving window to compute the average in (in seconds). stepsize (float): stepsize of the moving average window in seconds. The output will have a sample frequency of 1/stepsize Hz. If None, stepsize will be eqaul to window. reduce_fun (function, optional): function to reduce a block a data. Must handle nan and accept an axis keyword. Defaults to np.nanmean. max_nan (float, optional): maximum fraction (0-1) of nan values in a window. If the fraction of nan samples exceeds `max_nan`, the mean is nan for that window. Defaults to 0.5. inplace (bool, optional): if True, operates inplace, else a new object is returned. Defaults to False. verbose (int, optional): verbose level. Defaults to 1. Returns: (nnsa.TimeSeries): TimeSeries object with the new signal. """ if verbose > 0: print('Applying stepwise reduce on {}...'.format(self.label)) if inplace: ts = self else: ts = copy.deepcopy(self) # Extract variables. fs = ts.fs x = ts.signal if 1/fs == window and window == stepsize: return ts x_avg, fs_avg = stepwise_reduce(x, window=window, stepsize=stepsize, reduce_fun=reduce_fun, fs=fs, max_nan=max_nan) # Replace the signal array and update the sample frequency. ts._signal = x_avg.astype(self.parameters['dtype']) ts._fs = fs_avg ts.time_offset += window/2 # center the moving average window. # Update the info object of the object to indicate the operation. ts.info['processing'].append('Stepwise moving average in windows of {} with stepsize {} s.' .format(window, stepsize)) if not inplace: # Return the new TimeSeries object with the new signal. return ts
[docs] def transform(self, fun, fs=None, label=None, unit=None, maintain_dtype=True, inplace=False): """ Apply a custom function to the signal. Args: fun (function): function that transform the data. Takes in a 1D array and returns a 1D array. fs (float): sampling frequency of the transformed signal. If not specified, assumes that the transformed signal has the same sampling frequency as the original and an extra check will be performed to verify that the signal length (number of samples) did not change. label (str): optional new label for the transformed signal. If not provided, will keep the original label. unit (str): optional new unit for the transformed signal. If not provided, will keep the original unit. maintain_dtype (bool): if True, the new signal is forced to have the same dtype as self. inplace (bool): transform the data inplace (True) or return a new TimeSeries instance (False). Returns: ts (TimeSeries): TimeSeries with the transformed data (if inplace is False). """ if inplace: # We will apply the changes to the current object. ts = self else: # We will apply the changes to a copy of the current object and return this changed copy. ts = copy.deepcopy(self) # Transform signal. original_len = len(ts.signal) signal_new = fun(ts.signal) # If the fs did not change, then the original and transformed signal # should have the same length. if fs is None or fs == ts.fs: # Check new length. if len(signal_new) != original_len: raise AssertionError( f'Transformed signal has different length ({len(signal_new)}) than original signal ({original_len}).' 'This is not allowed if the sampling frequency does not change.') else: # Change the fs of the output. ts._fs = fs if maintain_dtype: # Force the same dtype as original signal. signal_new = signal_new.astype(ts.parameters['dtype']) # Replace the signal array. ts._signal = signal_new if label is not None: # Replace the signal label. ts._label = label if unit is not None: # Replace the signal label. ts._unit = unit # Update the info object of the object to indicate the operation. ts.info['processing'].append('Transformed with {}' .format(fun)) # Only return if not in place removing. if not inplace: return ts
[docs] def zscore(self, ddof=0, nan_policy='omit', inplace=False): """ Normalize the signal by centering and unit standard deviation. Args: ddof (int, optional): degrees of freedom correction in the calculation of the standard deviation. nan_policy (str, optional): {'propagate', 'raise', 'omit'} defines how to handle when input contains nan. 'propagate' returns nan, 'raise' throws an error, 'omit' performs the calculations ignoring nan values. inplace (bool): transform the data inplace (True) or return a new TimeSeries instance (False). Returns: ts (TimeSeries): TimeSeries with the transformed data (if inplace is False). """ if inplace: # We will apply the changes to the current object. ts = self else: # We will apply the changes to a copy of the current object and return this changed copy. ts = copy.deepcopy(self) # Transform signal. signal_new = zscore(ts.signal, ddof=ddof, nan_policy=nan_policy) # Replace the signal array. ts._signal = signal_new.astype(ts.parameters['dtype']) # Update the info object of the object to indicate the operation. ts.info['processing'].append('Normalized with zscore') # Only return if not in place removing. if not inplace: return ts
def _add_info_to_result(self, result): """ Add info string about data to result. Args: result (nnsa.ResultBase-derived): ResultBase-derived object. """ result.data_info = str(self.info) def _postprocess_result(self, result): """ Postprocess the result class common to all feature extractions. Add info string about data to result. Update the segment time vectors of the result if time_offset of the TimeSeries was not 0. Args: result (nnsa.ResultBase-derived): ResultBase-derived object. """ self._add_info_to_result(result) # Time offset. time_offset = self.time_offset # Add time offset to result. result.time_offset = time_offset