Source code for nnsa.feature_extraction.frequency_analysis

"""
Algorithms for extraction of frequency features.
"""
import copy
import csv
import sys
from collections import defaultdict

import h5py
import pyprind
import scipy.integrate
import scipy.signal
import numpy as np

from nnsa.feature_extraction.common import check_multichannel_data_matrix, get_channel_index, \
    preprocess_segment, local_to_global_features
from nnsa.feature_extraction.result import ResultBase
from nnsa.parameters.parameters import ClassWithParameters, Parameters
from nnsa.stats.surrogates import compute_IAAFT_surrogate
from nnsa.utils.arrays import count_nans, \
    histogram_features_per_channel
from nnsa.utils.segmentation import compute_n_segments, segment_generator
from nnsa.utils.config import HORIZONTAL_RULE
from nnsa.utils.mathematics import nextpow2
from nnsa.utils.other import enumerate_label, convert_string_auto
from nnsa.utils.plotting import subplot_rows_columns, maximize_figure, remove_ticks
from nnsa.utils.testing import assert_equal

__all__ = [
    'PowerAnalysis',
    'PowerAnalysisResult',
    'Psd',
    'PsdResult',
    '_determine_welch_kwargs'
]


[docs]class PowerAnalysis(ClassWithParameters): """ Class for performing a power analysis of EEG signal. References: this code was ported from Mario Lavanga and Ofelie de Wel's MATLAB code. Main method: power_analysis() Args: see nnsa.ClassWithParameters. Examples: >>> np.random.seed(0) >>> x = np.random.rand(10, 10000) >>> power_analysis = PowerAnalysis() >>> assert_equal(power_analysis.parameters, PowerAnalysis.default_parameters()) >>> print(type(power_analysis.parameters).__name__) Parameters >>> power_analysis_result = power_analysis.power_analysis(x, fs=25, verbose=0) >>> print(type(power_analysis_result).__name__) PowerAnalysisResult >>> power_analysis_result.power['delta_1'][2, 3] 0.01603280237682399 """
[docs] @staticmethod def default_parameters(): """ Return the default parameters. Returns: (nnsa.Parameters): a default set of parameters for the object. """ # Parameters for segmentation of EEG recording into small time segments/epochs. segmentation = Parameters(**{ # Segment length in seconds: 'segment_length': 30, # Overlap in segments in seconds: 'overlap': 0, }) # Parameters for artefact detection/exclusion, see e.g. # nnsa.artefacts.artefact_detection.default_eeg_signal_quality_criteria() artefact_criteria = None # Frequency bands for which to compute the power. frequency_bands = dict() # Parameters for power spectral density. See nnsa.Psd() psd = Parameters(**{ # Window to use. MATLAB's default is 'hamming' (note that scipy's default is 'hanning'): 'window': 'hamming', # Length of window for Welch's method in seconds: 'window_length': 4, # Fraction of overlap between windows: 'overlap_fraction': 0.7, # Maximum accepted/desired frequency resolution in Hz (will determine the amount of zero-padding for DFT): 'df_max': 0.015625, # Detrend (see scipy.signal.welch()). 'detrend': 'constant', }) pars = { 'segmentation': segmentation, 'artefact_criteria': artefact_criteria, 'frequency_bands': frequency_bands, 'psd': psd, } return Parameters(**pars)
[docs] def power_analysis(self, data_matrix, fs, channel_labels=None, unit=None, verbose=1): """ Perform the power analysis on multichannel data as designed by Mario Lavanga and Ofelie De Wel. Code was ported from MATLAB and constsist of 5 steps: 1) Segmentation 2) Optional filtering 3) Artefact exclusion 4) PSD estimation 5) Computation of frequency power in specified frequency bands per segment Args: data_matrix (np.ndarray): see check_multichannel_data_matrix(). fs (float): sample frequency of the EEG signals. channel_labels (list of str, optional): see check_multichannel_data_matrix(). unit (str, optional): the physical unit of the signals in data_matrix (e.g. 'uV'). If None no unit is passed to the returned object (PowerAnalysisResult). Defaults to None. verbose (int, optional): verbose level. Defaults to 1. Returns: (nnsa.PowerAnalysisResult): object containing the powers of the frequency bands per segment and per channel. """ # Check input. data_matrix, channel_labels = check_multichannel_data_matrix(data_matrix, channel_labels) if verbose > 0: print(HORIZONTAL_RULE) print('Running power_analysis with parameters:') print(self.parameters) # Extract some parameters. seg_pars = self.parameters['segmentation'] artefact_criteria = self.parameters['artefact_criteria'] frequency_bands = self.parameters['frequency_bands'] n_channels = data_matrix.shape[0] n_bands = len(frequency_bands) dtype = data_matrix.dtype # Initialize Psd object for spectral power density estimation. psd = Psd(**self.parameters['psd']) # Segment the data in the time axis (create a generator). seg_generator = segment_generator(data_matrix, segment_length=seg_pars['segment_length'], overlap=seg_pars['overlap'], fs=fs, axis=-1) n_segments = compute_n_segments(data_matrix, segment_length=seg_pars['segment_length'], overlap=seg_pars['overlap'], fs=fs, axis=-1) # Initialize progress bar. bar = pyprind.ProgBar(n_segments, stream=sys.stdout) # Loop over segments. power_tensor = np.empty((n_bands, n_channels, n_segments), dtype=dtype) for i_seg, seg in enumerate(seg_generator): # Update progress bar. if verbose > 0: bar.update() # Preprocess segment (demean, optionally filter, find channels to exclude). seg, exclude_mask = preprocess_segment(seg, fs, artefact_criteria=artefact_criteria) # Loop over channels. power_channel = np.empty((n_bands, n_channels), dtype=dtype) for j_channel, excl, signal in zip(range(n_channels), exclude_mask, seg): # If channels is to be excluded, skip and use NaN to indicate artefacted channel. if excl: power_channel[:, j_channel] = np.nan else: # Estimate power spectral density. psd_result = psd.psd(signal, fs=fs) # Compute power in frequency bands. for i_band, band_limits in enumerate(frequency_bands.values()): power_channel[i_band, j_channel] = psd_result.compute_power(f_low=band_limits[0], f_high=band_limits[1]) power_tensor[:, :, i_seg] = power_channel # Create dictionary with frequency band labels as keys and (channels, segments) arrays as values. power = dict(zip(frequency_bands.keys(), (p for p in power_tensor))) # Return as a PowerAnalysisResult object. return PowerAnalysisResult(power, channel_labels=channel_labels, algorithm_parameters=self.parameters, unit='{}^2'.format(unit) if unit is not None else None)
[docs]class PowerAnalysisResult(ResultBase): """ High-level interface for processing the results of a power analysis as created by nnsa.PowerAnalysis(). Args: power (dict): dictionary with frequency band labels as keys and (channels, segments) arrays with power as values, e.g. as returned by nnsa.PowerAnalysis.power_analysis() algorithm_parameters (nnsa.Parameters): see ResultBase. channel_labels (list of str, optional): labels of the channels corresponding to the rows of the values in power. If None, default labels will be created. Default is None. data_info (str, optional): see ResultBase. unit (str, optional): unit of the values in power. segment_start_times (np.ndarray, optional): see ResultBase. segment_end_times (np.ndarray, optional): see ResultBase. fs (flaot, optional): see ResultBase. """ def __init__(self, power, algorithm_parameters, channel_labels=None, data_info=None, unit=None, segment_start_times=None, segment_end_times=None, fs=None): super().__init__(algorithm_parameters=algorithm_parameters, data_info=data_info, segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs) # Input check. data_shape = next((p for p in power.values())).shape if channel_labels is None: channel_labels = enumerate_label(data_shape[0], label='Channel') elif len(channel_labels) != data_shape[0]: raise ValueError('Length of channel_labels ({}) does not correspond to the shape of the data {}.' .format(len(channel_labels), data_shape)) # Store variables that are not already stored by the parent class (ResultBase). self.power = power self.channel_labels = channel_labels self.unit = unit @property def num_segments(self): """ Return the number of segments. Returns: (int): number of segments. """ return next((p.shape[-1] for p in self.power.values()))
[docs] def compute_relative_power(self): """ Compute the relative power per channel per segment for each band. Returns: rel_power (dict): relative power per band. """ # Compute total power per channel per segment. data_shape = next((p for p in self.power.values())).shape tot_power = np.zeros(data_shape) for k, v in self.power.items(): tot_power += v # Compute relative power. rel_power = dict() for k, v in self.power.items(): rel_power['{}_rel'.format(k)] = v/tot_power return rel_power
[docs] def extract_channels(self, channels, inplace=False): if not inplace: res = copy.deepcopy(self) else: res = self if channels is not None: idx_channels = get_channel_index(res.channel_labels, channels) for key in res.power.keys(): res.power[key] = res.power[key][idx_channels, :] res.channel_labels = [res.channel_labels[idx] for idx in idx_channels] if not inplace: return res
[docs] def extract_global_features(self, **kwargs): """ Extract features that characterize the entire recording. Args: **kwargs (optional): optional keyword arguments for local_to_global_features(). Returns: global_features (dict): see local_to_global_features(). """ # Compute (local) features. features = dict() features.update(self.power) features.update(self.compute_relative_power()) # Change the keys and indicate the type of feature. feature_signature = 'POW' for key in list(features.keys()): # Note the use of a list instead of iterator. features['{}_{}'.format(feature_signature, key)] = features.pop(key) # Compute global features. global_features = local_to_global_features(features, self.segment_start_times, self.segment_end_times, channel_labels=self.channel_labels, **kwargs) return global_features
[docs] def extract_histogram_features(self, ignore_nan=True): """ Compute the histogram features of each feature attribute. Args: ignore_nan (bool, optional): if True, ignore nan values. Defaults to True. Returns: out (dict): dict with feature names as keys and feature values as values. """ # TODO Use quantiles instead of histrogram bins. raise NotImplementedError('Use quantiles, deprecate the bins.') # Create bin edges for histogram per band. nbins = 10 band_bins = { # Absolute power. 'delta_1': np.linspace(0, 1200, nbins+1), 'delta_2': np.linspace(0, 200, nbins+1), 'theta': np.linspace(0, 100, nbins+1), 'alpha': np.linspace(0, 22, nbins+1), 'beta': np.linspace(0, 15, nbins+1), # Relative power. 'delta_1_rel': np.linspace(0.4, 1, nbins+1), 'delta_2_rel': np.linspace(0.0, 0.4, nbins + 1), 'theta_rel': np.linspace(0.0, 0.25, nbins + 1), 'alpha_rel': np.linspace(0.0, 0.1, nbins + 1), 'beta_rel': np.linspace(0.0, 0.05, nbins + 1), } # Absolute power. out = {} for band, data in self.power.items(): # Take median across channels. data = np.nanmedian(data, axis=0, keepdims=True) hist_features = histogram_features_per_channel(data, bins=band_bins[band], ignore_nan=ignore_nan).iloc[0, :].to_dict() # Change the keys and indicate the type of feature. feature_signature = 'POW_{}'.format(band) for key in list(hist_features.keys()): # Note the use of a list instead of iterator. hist_features['{}_{}'.format(feature_signature, key)] = hist_features.pop(key) out.update(hist_features) # Relative power. for band, data in self.compute_relative_power().items(): # Take median across channels. data = np.nanmedian(data, axis=0, keepdims=True) hist_features = histogram_features_per_channel(data, bins=band_bins[band], ignore_nan=ignore_nan).iloc[0, :].to_dict() # Change the keys and indicate the type of feature. feature_signature = 'POW_{}'.format(band) for key in list(hist_features.keys()): # Note the use of a list instead of iterator. hist_features['{}_{}'.format(feature_signature, key)] = hist_features.pop(key) out.update(hist_features) return out
[docs] def plot(self, channels=None, frequency_bands=None, legend=True, ax=None): """ Plot the power analysis. Creates a subplot for each frequency band and plots the corresponding power of all (specified) channels. Args: channels (list of str, optional): a list of labels that specify which channels to plot. If None, all channels are plotted. Defaults to None. frequency_bands (list of str, optional): a list of labels that specify which frequency bands to plot. If None, all bands are plotted. Defaults to None. legend (bool, optional): whether to plot the legend or not. Defaults to True. Returns: fig: Figure handle. ax (list): list with axes handles for each subplot. """ from matplotlib import pyplot as plt if ax is not None: plt.sca(ax) # Default arguments. if channels is None: # Plot all channels. channels = self.channel_labels if frequency_bands is None: # Plot all frequency bands. frequency_bands = self.power.keys() # Find the indices of the channels in the power arrays. idx_channels = get_channel_index(self.channel_labels, channels) # Number of rows and columns. n_plots = len(frequency_bands) n_rows, n_cols = subplot_rows_columns(n_plots, minimize='columns') # Extra formatting for the unit on the y axis. if self.unit == 'uV^2': # Special formatting for the unit. unit = 'uV$^2$' else: unit = self.unit # Time array in minutes. time_array = self.segment_times/60 fig = plt.figure() maximize_figure() ax = [] for i_band, band in enumerate(frequency_bands): loc = i_band + 1 ax.append(plt.subplot(n_rows, n_cols, loc, sharex=ax[0] if len(ax) > 0 else None)) power_band = self.power[band] for j_channel in idx_channels: plt.plot(time_array, power_band[j_channel], label=channels[j_channel]) # Figure make-up. plt.title(band) plt.ylabel('Power ({})'.format(unit) if unit is not None else 'Power') # Remove x axis ticks if not on the lowest subplot in its column. if loc <= n_plots - n_cols: remove_ticks('x') else: plt.xlabel('Time (min)') # Put legend only in first plot. if loc == 1 and legend: plt.legend() return fig, ax
[docs] def summary(self, cell_width=10, n_decimals=2, frequency_bands=None): """ Print a summary of the power analysis. Args: cell_width (int, optional): width of a cell in the table that is printed. n_decimals(int, optional): number of decimals to print. frequency_bands (list, optional): a list with labels for the frequency bands to print the summary of. """ unit = 'unspecified unit' if self.unit is None else self.unit # Print mean. title = 'Mean power over segments ({})'.format(unit) func = np.nanmean print(self._create_text_block(title, func, cell_width=cell_width, n_decimals=n_decimals, frequency_bands=frequency_bands, axis=-1)) # Print std. title = 'Std power over segments ({})'.format(unit) func = np.nanstd print(self._create_text_block(title, func, cell_width=cell_width, n_decimals=n_decimals, frequency_bands=frequency_bands, axis=-1)) # Print number of nans. title = 'Number of artefact segments (of total {} segments)'.format(len(self.segment_times)) func = count_nans first_band = next((band for band in self.power.keys())) print(self._create_text_block(title, func, cell_width=cell_width, n_decimals=0, frequency_bands=first_band, axis=-1))
def _create_text_block(self, title, func, *args, cell_width=10, n_decimals=4, frequency_bands=None, **kwargs): """ Create a table-like text block with displaying one value per frequency band and channel. The value that is computed per frequency band and channel is determined by func. Args: title (str): title string is the first line of the text block. func (function): a function that takes in as first argument a data array from self.power. *args (optional): optional argument to pass on to func. cell_width (int, optional): width of a cell in the table that is printed. n_decimals(int, optional): number of decimals to print. frequency_bands (list, optional): a list with labels for the frequency bands to include. **kwargs (optional): optional keyword arguments for func. Returns: (str): a text block containing the computed values for each frequency band and channel. """ # Input check. if frequency_bands is None: # Use all frequency band by default. frequency_bands = self.power.keys() elif type(frequency_bands) is str: # Convert to list. frequency_bands = [frequency_bands] # Channel header. n_channels = len(self.channel_labels) title_row = '{{:-^{}}}'.format((n_channels+1)*cell_width).format(title) channel_header = (' ' * cell_width + '{{:{}}}'.format(cell_width) * n_channels).format(*self.channel_labels) # Collect numbers per frequency band. block = ['', title_row, channel_header] for i_band in frequency_bands: numbers = func(self.power[i_band], *args, **kwargs) row_header = '{{:{}}}'.format(cell_width).format(i_band) row_str = ('{{:<{}.{}f}}'.format(cell_width, n_decimals) * n_channels).format(*numbers) row = ''.join([row_header, row_str]) block.append(row) return '\n'.join(block) def _extract_epoch(self, mask): for key in self.power.keys(): self.power[key] = self.power[key][:, mask] def _merge(self, other): """ See ResultBase. """ # Check if the channel labels of self and other are the same. if self.channel_labels != other.channel_labels: raise ValueError('Cannot merge results with different channel labels.') for key, val in self.power.items(): val_other = other.power[key] self.power[key] = np.concatenate((val, val_other), axis=-1) @staticmethod def _read_from_csv(filepath): """ Read result from csv file into a PowerAnalysisResult class. Args: filepath (str): see ResultBase._read_from_csv(). Returns: result (nnsa.PowerAnalysisResult): instance of PowerAnalysisResult containing the PowerAnalysis result. """ # Lines 1-4: Standard csv header (use the ResultBase method). algorithm_parameters, data_info, fs = ResultBase._read_csv_header(filepath)[1:] # Re-open the file and read the rest of the file, line by line. with open(filepath, 'r') as f: reader = csv.reader(f) # Lines 1-4: Standard csv header (already read, skip). [next(reader) for i in range(4)] # Line 5: Non-array data header (skip). next(reader) # Line 6: Non-array data. channel_labels, unit, data_shape = [convert_string_auto(i) for i in next(reader)] # Line 7: Array header. power_bands = next(reader) # Line 8-... : Array data (flattened). Each power band is saved on one row. power = dict() for band in power_bands: array_as_list = [float(i) for i in next(reader)] # Convert to numpy array and reshape. power[band] = np.reshape(array_as_list, data_shape) # Create a result object. result = PowerAnalysisResult(power=power, algorithm_parameters=algorithm_parameters, channel_labels=channel_labels, unit=unit, data_info=data_info, fs=fs) return result @staticmethod def _read_from_hdf5(filepath): """ Read result from hdf5 file into a PowerAnalysisResult class. Args: filepath (str): see ResultBase._read_from_hdf5(). Returns: result (nnsa.PowerAnalysisResult): instance of PowerAnalysisResult containing the PowerAnalysis result. """ # Read standard hdf5 header (use the ResultBase method). algorithm_parameters, data_info, segment_start_times, segment_end_times, fs, time_offset =\ ResultBase._read_hdf5_header(filepath)[1:] # Re-open the file and read the rest of the file. with h5py.File(filepath, 'r') as f: # Read array data. power = dict() p_ds = f['power'] for band in p_ds.keys(): power[band] = p_ds[band][:] # Read non-array data. channel_labels = [label.decode() for label in p_ds.attrs['channel_labels']] unit_ = p_ds.attrs['unit'].decode() unit = unit_ if unit_ != 'None' else None # Create a result object. result = PowerAnalysisResult(power=power, algorithm_parameters=algorithm_parameters, channel_labels=channel_labels, unit=unit, data_info=data_info, segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs) return result def _write_to_csv(self, filepath): """ Write the contents of the object to a csv file. Args: filepath (str): see ResultBase._write_to_csv(). """ # Lines 1-4: Standard csv header (use the ResultBase method). self._write_csv_header(filepath) # Append attributes to the csv file, line by line. with open(filepath, 'a', newline='') as csvfile: writer = csv.writer(csvfile) # Line 5: Non-array data header. writer.writerow(['channel_labels', 'unit', 'power[key].shape']) # Line 6: Non-array data. data_shape = next((p for p in self.power.values())).shape writer.writerow([self.channel_labels, self.unit, data_shape]) # Line 7: Array header. writer.writerow(self.power.keys()) # Line 8-... : Array data (flattened). Each power band is saved on one row. for p in self.power.values(): writer.writerow(p.reshape(-1).tolist()) def _write_to_hdf5(self, filepath): """ Write the contents of the object to an hdf5 file. Args: filepath (str): see ResultBase._write_to_hdf5(). """ # Write standard hdf5 header (use the ResultBase method). self._write_hdf5_header(filepath) # Append attributes to the hdf5 file. with h5py.File(filepath, 'a') as f: # Write array data. for band, data in self.power.items(): f.create_dataset('power/{}'.format(band), data=data) # Write non-array data as attributes to the 'power' group. # Convert strings to np.string_ type as recommended for compatibility. f['power'].attrs['channel_labels'] = [np.string_(label) for label in self.channel_labels] f['power'].attrs['unit'] = np.string_(self.unit)
[docs]class Psd(ClassWithParameters): """ Class for estimating the power spectral density of a signal using Welch's method. Main method: psd() Args: see nnsa.ClassWithParameters. Examples: >>> np.random.seed(0) >>> x = np.random.rand(10000) >>> psd = Psd() >>> assert_equal(psd.parameters, Psd.default_parameters()) >>> print(type(psd.parameters).__name__) Parameters >>> psd_result = psd.psd(x, fs=250) >>> print(type(psd_result).__name__) PsdResult >>> print(psd_result.power_density) [7.20648796e-01 1.43280205e+00 1.40759024e+00 ... 2.83669937e-04 2.81437262e-04 1.40343941e-04] """
[docs] @staticmethod def default_parameters(): """ Return the default parameters as a dictionary. Returns: (nnsa.Parameters): a default set of parameters for the object. """ pars = { # Window to use. MATLAB's default is 'hamming' (note that scipy's default is 'hanning'): 'window': 'hamming', # Length of window for Welch's method in seconds. If None, uses 256 samples: 'window_length': None, # Fraction of overlap between windows: 'overlap_fraction': 0.7, # Maximum accepted/desired frequency resolution in Hz (will determine the amount of zero-padding for DFT): 'df_max': 0.015625, # Detrend (see scipy.signal.welch()). Note that in MATLAB there is no detrending. 'detrend': 'constant', } return Parameters(**pars)
[docs] def psd(self, x, fs, unit='a.u.', axis=-1, verbose=0, **kwargs): """ Estimate the power spectral density using Welch's method. Args: x (np.ndarray): signal data array. fs (float): sample frequency of the signal in Hz. unit (str, optional): the unit of the signal values. Defaults to 'a.u.' axis (int, optional): axis along which the periodogram is computed. Defaults to -1. verbose (int, optional): verbose level. Defaults to 0. **kwargs (optional): for scipy.signal.welch(). Returns: (nnsa.feature_extraction.PsdResult): object containing the power spectral density estimate. """ # Extract and print parameters. pars = self.parameters if verbose > 0: print(HORIZONTAL_RULE) print('Running psd with parameters:') print(pars) # Determine the Welch kwargs from the parameters. welch_kwargs = _determine_welch_kwargs(pars, fs) # Update welch kwargs with **kwargs. welch_kwargs.update(kwargs) # Welch's method for estimation of power spectral density. x = np.nan_to_num(x) # Replace nans with zeros. frequencies, power_density = scipy.signal.welch(x, fs=fs, axis=axis, **welch_kwargs) # Determine frequency resolution. df = np.diff(frequencies[:2])[0] # Return the output as a PsdResult object. return PsdResult(power_density=power_density, df=df, algorithm_parameters=pars, unit='{}^2/Hz'.format(unit) if unit else None)
[docs] def process(self, *args, **kwargs): """ Alias for self.psd(). """ return self.psd(*args, **kwargs)
[docs]class PsdResult(ResultBase): """ High-level interface for power spectral density results as created by nnsa.Psd().psd(). Args: power_density (np.ndarray): power spectral density for frequencies 0, df, df*2, ..., df*(len(power_density)-1). df (float): frequency resolution of the power spectral density. algorithm_parameters (nnsa.Parameters): see ResultBase. data_info (str, optional): see ResultBase. unit (str, optional): the unit corresponding to power. """ def __init__(self, power_density, df, algorithm_parameters, data_info=None, unit=None): super().__init__(algorithm_parameters=algorithm_parameters, data_info=data_info) # Store variables that are not already stored by the parent class (ResultBase). self.power_density = power_density self.df = df self.unit = unit @property def frequencies(self): """ Return the frequencies corresponding to the powers in power_density. Returns: (np.ndarray): array of same length as self.power_denisty containing the corresponding frequencies. """ return np.arange(0, len(self.power_density)) * self.df @property def num_segments(self): """ Return the number of segments/samples. Returns: (int): number of segments/samples. """ # No segments for PSD. return None
[docs] def compute_power(self, f_low, f_high): """ Compute power of frequency band spanned by f_low and f_high by numerically integrating the psd over this domain. Args: f_low (float): lower limit of the frequency band. f_high (float): upper limit of the frequency band. Returns: power (float): the frequency power spanned by f_low and f_high. """ # Frequency resolution. df = self.df # Find indices in self.frequencies that corresponds to f_low and f_high. idx_low = int(np.ceil(f_low/df)) idx_high = int(np.floor(f_high/df)) # Integrate over the specified frequency domain using trapezium rule. power_segment = self.power_density[idx_low: idx_high + 1] # +1 to include the highest frequency in the segment. power = scipy.integrate.trapz(power_segment, dx=df) return power
[docs] def normalize(self, how='max', inplace=False): """ Normalize the PSD result. Args: how (str): how to normalize. Choose from: - "max": noramlize such that maximum power is 1. inplace (bool): whether to apply inplace or not. Returns: psd (PsdResult): normalized Psd (only returend if inplace is False). """ if inplace: psd = self else: psd = copy.deepcopy(self) # Compute normalized power. power = psd.power_density if how == "max": power = power/np.nanmax(power) else: raise ValueError('Invalid option how="{}". Choose from {}.' .format(how, ['max'])) # Set normalized power. psd.power_density = power if not inplace: return psd
[docs] def plot(self, *args, in_db=True, cumulative=False, title_postfix=None, ax=None, **kwargs): """ Plot the power spectrum density. Args: *args (optional): optional positional arguments for plt.semilogy function. in_db (bool): whether to plot the PSD in decibels (True) or just power (False). title_postfix (str, optional): if not None, this will be added to the plot title. Defaults to None. cumulative (bool): if True plots the cumulative power by integrating over frequency. ax (plt.Axes, optional): axes object to plot in. If None, plots in a new figure. Defaults to None. **kwargs (optional): optional keyword arguments for plt.semilogy function. """ from matplotlib import pyplot as plt # Set current axes. if ax is not None: plt.sca(ax) else: # Plot in current axes. ax = plt.gca() # Extract power and frequency array. power = self.power_density frequencies = self.frequencies if in_db: # Convert power to dB. power = 10*np.log10(power) if cumulative: # Integrate cumulatively. power = np.cumsum(power)*self.df title = 'Cumulative power' else: title = 'Power spectral density' # Plot. plt.plot(frequencies, power, *args, **kwargs) plt.title('{} {}'.format(title, title_postfix if title_postfix is not None else '')) plt.grid() plt.xlabel('Frequency (Hz)') # Extra formatting for the unit on the y axis. if in_db: unit = 'dB/Hz' elif self.unit == 'uV^2/Hz': # Special formatting for the unit. unit = 'uV$^2$/Hz' else: unit = self.unit if cumulative: # Remove the /Hz. unit = unit.replace('/Hz', '') plt.ylabel('PSD ({})'.format(unit))
@staticmethod def _read_from_csv(filepath): """ Read result from csv file into a PsdResult class. Args: filepath (str): see ResultBase._read_from_csv(). Returns: result (nnsa.PsdResult): instance of PsdResult containing the Psd result. """ # Lines 1-4: Standard csv header (use the ResultBase method). algorithm_parameters, data_info = ResultBase._read_csv_header(filepath)[1:3] # Re-open the file and read the rest of the file, line by line. with open(filepath, 'r') as f: reader = csv.reader(f) # Lines 1-4: Standard csv header (already read, skip). [next(reader) for i in range(4)] # Line 5: Non-array data header (skip). next(reader) # Line 6: Non-array data. df, unit, data_shape = [convert_string_auto(i) for i in next(reader)] # Line 7: Array header (skip). next(reader) # Line 8: Array data (flattened). array_as_list = [float(i) for i in next(reader)] # Convert to numpy array and reshape. power_density = np.reshape(array_as_list, data_shape) # Create a result object. result = PsdResult(power_density=power_density, df=df, algorithm_parameters=algorithm_parameters, unit=unit, data_info=data_info) return result @staticmethod def _read_from_hdf5(filepath): """ Read result from hdf5 file into a PsdResult class. Args: filepath (str): see ResultBase._read_from_hdf5(). Returns: result (nnsa.PsdResult): instance of PsdResult containing the Psd result. """ # Read standard csv header (use the ResultBase method). algorithm_parameters, data_info = ResultBase._read_hdf5_header(filepath)[1:3] # Re-open the file and read the rest of the file. with h5py.File(filepath, 'r') as f: # Read array data. pd_ds = f['power_density'] power_density = pd_ds[:] # Read non-array data. df = pd_ds.attrs['df'] unit_ = pd_ds.attrs['unit'].decode() unit = unit_ if unit_ != 'None' else None # Create a result object. result = PsdResult(power_density=power_density, df=df, algorithm_parameters=algorithm_parameters, unit=unit, data_info=data_info) return result def _write_to_csv(self, filepath): """ Write the contents of the object to a csv file. Args: filepath (str): see ResultBase._write_to_csv(). """ # Lines 1-4: Standard csv header (use the ResultBase method). self._write_csv_header(filepath) # Append attributes to the csv file, line by line. with open(filepath, 'a', newline='') as csvfile: writer = csv.writer(csvfile) # Line 5: Non-array data header. writer.writerow(['df', 'unit', 'power_density.shape']) # Line 6: Non-array data. writer.writerow([self.df, self.unit, self.power_density.shape]) # Line 7: Array header. writer.writerow(['power_density']) # Line 8: Array data (flattened). writer.writerow(self.power_density.reshape(-1).tolist()) def _write_to_hdf5(self, filepath): """ Write the contents of the object to an hdf5 file. Args: filepath (str): see ResultBase._write_to_hdf5(). """ # Write standard hdf5 header (use the ResultBase method). self._write_hdf5_header(filepath) # Append attributes to the hdf5 file. with h5py.File(filepath, 'a') as f: # Write array data. pd_ds = f.create_dataset('power_density', data=self.power_density) # Write non-array data as attributes to the 'power-density' dataset. # Convert strings to np.string_ type as recommended for compatibility. pd_ds.attrs['df'] = self.df pd_ds.attrs['unit'] = np.string_(self.unit)
def compute_bandpower(x, fs, fmin, fmax, **kwargs): # Compute bandpower. f, Pxx = scipy.signal.periodogram(x, fs=fs, **kwargs) ind_min = np.argmax(f > fmin) - 1 ind_max = np.argmax(f > fmax) - 1 return np.trapz(Pxx[ind_min: ind_max], f[ind_min: ind_max]) def compute_coherence_pvalue(x, y, f_low=0, f_high=None, fs=1.0, n_surrogates=200, seed=None, window='hann', nperseg=None, noverlap=None, nfft=None, detrend='constant'): """ Compute the (average) maginitude squared coherence and a p-value for signals `x` and `y`. Computes the maginitude squared coherence between x and y, averaged over frequencies and a corresponding p-value. The p-value indicates the chance that the computed coherence between `x` and `y` is a result of chance. More specifically, it is the empiric chance that the computed coherence value can be found between 2 unrelated, random signals, which have similar statistical properties as `x` and `y`. E.g., if p-value < 0.05, the coherence may be assumed significant, i.e. not likely to find by chance. Args: x (np.ndarray): 1D signal array. y (np.ndarray): 1D signal array. f_low (float, optional): frequency specifying the low edge frequency of the band in which to compute the average coherence. Defaults to 0. f_high (float, optional): frequency specifying the high edge frequency of the band in which to compute the average coherence. If None, this will be the Nyqvist frequency, i.e. fs/2. Defaults to None. fs (float, optional): the sample frequency of the signal in Hz. Specify this when passing f_low and f_high in Hz. Defaults to 1. n_surrogates (int, optional): number of surrogates to use for estimation of the p-value. Defaults to 200. seed (int, optional): seed for the random generator (for generation of surrogates). Defaults to None See documentation of scipy.signal.coherence function for other (optional) arguments. Returns: coh_xy (float): magnitude sqaured coherence between `x` and `y`, averaged over frequencies. pvalue (float): estimated pvalue of the coherence significance. Examples: >>> np.random.seed(43) >>> t = np.arange(1000) >>> x = np.sin(t/10) + np.tan(t/5 + 4) + np.random.rand(len(t)) >>> y = 2*np.sin(t/10) + 0.5*np.tan(t/5) + np.random.rand(len(t)) >>> coh_xy, pvalue =compute_coherence_pvalue(x, y, fs=1, f_low=0.003, f_high=0.02, seed=43, n_surrogates=500) >>> print('Coherence = {}'.format(coh_xy)) Coherence = 0.8097924933097895 >>> print('P-value = {}'.format(pvalue)) P-value = 0.002 """ if f_high is None: # Set to the Nyqvist frequency, i.e. the maximum frequency for which the FFT can be computed. f_high = fs/2 def compute_average_coherence(x, y): f, Cxy = scipy.signal.coherence(x, y, fs=fs, window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft, detrend=detrend) freq_mask = np.logical_and(f >= f_low, f <= f_high) # Depending on how many samples are used for the FFT, the discrete frequencies at which the # coherence is computed may not be suitable for the given edge frequencies. # Raise an error if less than 2 frequencies fall in the specified range. # If an error occurs, nperseg and/or nfft parameter can be set to control the number of FFT points. # The frequencies at which the FFT will be computed is np.linspace(0, fs/2, nfft). if np.sum(freq_mask) < 2: raise ValueError('Less than 2 frequencies fall in specified frequency band (f_low={}, f_high={}). \n' 'Computed frequencies are: range({}, {}, {}). \n' 'Adjust the band limits, or increase the nperseg and/or nfft parameter.' .format(f_low, f_high, f[0], f[-1], f[1]-f[0])) return np.mean(Cxy[freq_mask]) # Compute (average) coherence between x and y. coh_xy = compute_average_coherence(x, y) # Compute coherence for surrogates. coh_surrogates = [] for i_sur in range(n_surrogates): # Create surrogate. x_sur = compute_IAAFT_surrogate(x, seed=seed+i_sur) # Seed should not be equal for each surrogate. y_sur = compute_IAAFT_surrogate(y, seed=seed+i_sur+n_surrogates) # Seed of y surrogate not be equal to x surrogate. coh = compute_average_coherence(x_sur, y_sur) coh_surrogates.append(coh) # P-value is the chance that we find a coherence value between random, unrelated signals which is at least as high # as the coherence between the original signals x and y. pvalue = np.mean(np.array(coh_surrogates) >= coh_xy) return coh_xy, pvalue def compute_coherence_threshold(x, y, f_low=0, f_high=None, fs=1.0, alpha=0.95, n_surrogates=200, seed=None, window='hann', nperseg=None, noverlap=None, nfft=None, detrend='constant'): """ Compute a significance threshold for the (average) maginitude squared coherence given signals `x` and `y`. To come up with a threshold, a Monte-Carlo simulation is done using surrogate signals. Args: x (np.ndarray): 1D signal array. y (np.ndarray): 1D signal array. f_low (float, optional): frequency specifying the low edge frequency of the band in which to compute the average coherence. Defaults to 0. f_high (float, optional): frequency specifying the high edge frequency of the band in which to compute the average coherence. If None, this will be the Nyqvist frequency, i.e. fs/2. Defaults to None. fs (float, optional): the sample frequency of the signal in Hz. Specify this when passing f_low and f_high in Hz. Defaults to 1. alpha (float): the confidence level between 0 and 1. The returned threshold is chosen such that the number of surrogates with coherence values below the threshold equals n_surrogates*alpha. n_surrogates (int, optional): number of surrogates to use. Defaults to 200. seed (int, optional): seed for the random generator (for generation of surrogates). Defaults to None See documentation of scipy.signal.coherence function for other (optional) arguments. Returns: coh_thres (float): significance threshold for the coherence. Examples: >>> np.random.seed(43) >>> t = np.arange(1000) >>> x = np.sin(t/10) + np.tan(t/5 + 4) + np.random.rand(len(t)) >>> y = 2*np.sin(t/10) + 0.5*np.tan(t/5) + np.random.rand(len(t)) >>> coh_thres =compute_coherence_threshold(x, y, fs=1, f_low=0.003, f_high=0.02, seed=43) >>> print('Coherence threshold = {}'.format(coh_thres)) Coherence threshold = 0.603020037329525 """ if f_high is None: # Set to the Nyqvist frequency, i.e. the maximum frequency for which the FFT can be computed. f_high = fs/2 def compute_average_coherence(x, y): f, Cxy = scipy.signal.coherence(x, y, fs=fs, window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft, detrend=detrend) freq_mask = np.logical_and(f >= f_low, f <= f_high) # Depending on how many samples are used for the FFT, the discrete frequencies at which the # coherence is computed may not be suitable for the given edge frequencies. # Raise an error if less than 2 frequencies fall in the specified range. # If an error occurs, nperseg and/or nfft parameter can be set to control the number of FFT points. # The frequencies at which the FFT will be computed is np.linspace(0, fs/2, nfft). if np.sum(freq_mask) < 2: raise ValueError('Less than 2 frequencies fall in specified frequency band (f_low={}, f_high={}). \n' 'Computed frequencies are: range({}, {}, {}). \n' 'Adjust the band limits, or increase the nperseg and/or nfft parameter.' .format(f_low, f_high, f[0], f[-1], f[1]-f[0])) return np.mean(Cxy[freq_mask]) # Create surrogates and compute coherence. coh_surrogates = [] for i_sur in range(n_surrogates): # Create surrogate. x_sur = compute_IAAFT_surrogate(x, seed=seed+i_sur) # Seed should not be equal for each surrogate. y_sur = compute_IAAFT_surrogate(y, seed=seed+i_sur+n_surrogates) # Seed of y surrogate not be equal to x surrogate. # Compute coherence. coh = compute_average_coherence(x_sur, y_sur) coh_surrogates.append(coh) # Threshold is chosen such that the number of surrogates with coherence values below the threshold equals # n_surrogates*alpha. coh_thres = np.percentile(coh_surrogates, q=alpha*100) return coh_thres def _determine_welch_kwargs(pars, fs): # Prepare parameters for scipy.signal.welch function. if pars['window_length'] is not None: nperseg = pars['window_length'] * fs else: # Scipy's default. nperseg = 256 if pars['overlap_fraction'] is not None: noverlap = np.int(np.floor(nperseg * pars['overlap_fraction'])) else: # Scipy's default. noverlap = None if pars['df_max'] is not None: min_nfft = fs / pars['df_max'] nfft = 2 ** (nextpow2(max(min_nfft, nperseg))) else: # Scipy's default. nfft = None welch_kwargs = { 'window': pars['window'], 'nperseg': nperseg, 'noverlap': noverlap, 'nfft': nfft, 'detrend': pars['detrend'], 'scaling': 'density', } return welch_kwargs