"""
Module for general dynamic coupling results (i.e. coupling as function of time).
"""
import sys
from functools import partial
import h5py
import pyprind
import scipy.signal
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from nnsa.utils.plotting import format_time_axis
from nnsa.parameters.parameters import ClassWithParameters, Parameters
from nnsa.feature_extraction.result import ResultBase
from nnsa.containers.time_series import TimeSeries
from nnsa.feature_extraction.correlation import delay_correlation
from nnsa.stats.surrogates import compute_surrogate_couplings_sample, compute_surrogate_fun
from nnsa.utils.arrays import apply_delay
from nnsa.utils.config import HORIZONTAL_RULE
from nnsa.utils.conversions import convert_time_scale
from nnsa.utils.segmentation import segment_generator, compute_n_segments
__all__ = [
'DynamicCoupling',
'DynamicCouplingResult',
]
[docs]class DynamicCoupling(ClassWithParameters):
"""
Class for computing the dynamic coupling between two signals.
Main mehtod: dynamic_coupling().
Args:
see nnsa.ClassWithParameters.
Examples:
>>> t = np.linspace(0, 10, 200)
>>> x = np.sin(t)
>>> y = np.sin(t)
>>> dc = DynamicCoupling(segmentation={'segment_length': 50, 'overlap': 40}, method='correlation')
>>> result = dc.dynamic_coupling(x, y, fs=1, verbose=0)
>>> print(type(result).__name__)
DynamicCouplingResult
>>> result.coupling.shape
(16,)
>>> np.mean(result.coupling)
1.0
"""
[docs] @staticmethod
def default_parameters():
"""
Return the default parameters.
Returns:
(nnsa.Parameters): a default set of parameters for the object.
"""
# Parameters for segmentation of the signals into small time segments/epochs.
segmentation = Parameters(**{
# Segment length in seconds:
'segment_length': 300,
# Overlap in segments in seconds:
'overlap': 0,
})
# The method/algorithm to use for assessment of coupling. Choose from:
# 'correlation', 'coherence', 'transfer_function'/'TF'.
# See the dedicated methods in this class for each of these methods for more information
# (e.g. self.correlation_dynamic_coupling()).
# It can also be a function that takes in two arguments (x, y) and returns a single coupling value,
# see also self._compute_dynamic_coupling().
method = 'unspecified'
# Optional additional keyword arguments/parameters for the method/function that computes the coupling.
# These keyword arguments depend on the method specified above. E.g. if method is set to 'correlation', see the
# function self.correlation_dynamic_coupling() for the optional keyword argument that you can specify here:
method_kwargs = dict()
# Parameters for surrogate coupling. Set n_surrogates to 0 if no surrogate computations are desired.
surrogates = Parameters(**{
# Number of surrogates to compute. Set to 0 to not compute any surrogate values:
'n_surrogates': 100,
# How to generate surrogates (see nnsa.stats.surrogates.compute_surrogate() for options):
'how': 'IAAFT',
# Bool to specify if a surrogates should be created per segment (True) or surrogates characteristic
# for the entire signals (False):
'per_segment': True,
# Seed for the random generator:
'seed': None,
})
pars = {
'segmentation': segmentation,
'method': method,
'method_kwargs': method_kwargs,
'surrogates': surrogates,
}
return Parameters(**pars)
[docs] def correlation_dynamic_coupling(self, x, y, fs, delays=None, abs_corr=True, verbose=1):
"""
Compute correlation between x and y in a windowed way.
Args:
x (np.ndarray): 1D signal array.
y (np.ndarray): 1D signal array.
fs (float): sample frequency of x and y.
delays (np.ndarray, optional): delays to try to get maximum correlation. If None, no delay is applied.
Defaults to None.
abs_corr (bool, optional): return the absolute value of the correlation (True) or not (False).
Defaults to True.
verbose (int, optional): verbose level.
Defaults to 1.
Returns:
coupling (np.ndarray): 1D array with coupling values.
coupling_surrogates (np.ndarray): coupling values for each surrogate.
"""
# Check if we should apply a delay.
if delays is not None:
# Find optimal delay.
corr = np.abs(delay_correlation(x, y, delays))
best_delay = delays[np.argmax(corr)]
# Apply delay.
x, y = apply_delay(x, y, best_delay)
# Create function to compute the coupling between two segments.
def compute_corr(x, y):
cov = np.nanmean((x - np.nanmean(x))*(y - np.nanmean(y)))
rho = cov / (np.nanstd(x) * np.nanstd(y))
return rho
# Compute coupling.
coupling, coupling_surrogates = self._compute_dynamic_coupling(
x, y, fs=fs, coupling_fun=compute_corr, verbose=verbose)
# Take absolute correlation if requested.
if abs_corr:
coupling = np.abs(coupling)
coupling_surrogates = np.abs(coupling_surrogates)
return coupling, coupling_surrogates
[docs] def coherence_dynamic_coupling(self, x, y, fs, f_low=0, f_high=None, verbose=1,
window='hann', nperseg=None, noverlap=None, nfft=None, detrend='constant', **kwargs):
"""
Compute coherence between x and y in a windowed way.
Args:
x (np.ndarray): 1D signal array.
y (np.ndarray): 1D signal array.
fs (float): sample frequency of x and y.
f_low (float, optional): frequency specifying the low edge frequency of the band in which
to compute the average coherence.
Defaults to 0.
f_high (float, optional): frequency specifying the high edge frequency of the band in which
to compute the average coherence.
If None, this will be the Nyqvist frequency, i.e. fs/2.
Defaults to None.
verbose (int, optional): verbose level.
Defaults to 1.
See documentation of scipy.signal.coherence function for other (optional) arguments.
**kwargs: for scipy.signal.coherence().
Returns:
coupling (np.ndarray): 1D array with coupling values.
coupling_surrogates (np.ndarray): coupling values for each surrogate.
"""
if f_high is None:
# Set to the Nyqvist frequency, i.e. the maximum frequency for which the FFT can be computed.
f_high = fs / 2
# Create function to compute the coupling between two segments.
def compute_coh(x, y):
f, Cxy = scipy.signal.coherence(x, y, fs=fs, window=window, nperseg=nperseg, noverlap=noverlap,
nfft=nfft, detrend=detrend, **kwargs)
freq_mask = np.logical_and(f >= f_low, f <= f_high)
# Depending on how many samples are used for the FFT, the discrete frequencies at which the
# coherence is computed may not be suitable for the given edge frequencies.
# Raise an error if less than 2 frequencies fall in the specified range.
# If an error occurs, nperseg and/or nfft parameter can be set to control the number of FFT points.
# The frequencies at which the FFT will be computed is np.linspace(0, fs/2, nfft).
if np.sum(freq_mask) < 2:
raise ValueError('Less than 2 frequencies fall in specified frequency band (f_low={}, f_high={}). \n'
'Computed frequencies are: range({}, {}, {}). \n'
'Adjust the band limits, or increase the nperseg and/or nfft parameter.'
.format(f_low, f_high, f[0], f[-1], f[1] - f[0]))
# Return the coherence averaged over the specified frequency range.
return np.mean(Cxy[freq_mask])
# Compute coupling.
coupling, coupling_surrogates = self._compute_dynamic_coupling(
x, y, fs=fs, coupling_fun=compute_coh, verbose=verbose)
return coupling, coupling_surrogates
[docs] def process(self, x, y, fs, label=None, verbose=2):
"""
Compute dynamic coupling between x and y.
Args:
x (array-like): 1D signal array.
y (array-like): 1D signal array.
fs (float): sample frequency of x and y (x and y must have equal fs).
label (str, optional): label for the coupling output (e.g. 'EEG-NIRS').
If None, the `method` parameter is used as label.
Defaults to None.
verbose (int, optional): verbose level.
Set to 1 for a progress bar, 2 for also printing parameters.
Returns:
(nnsa.DynamicCouplingResult): object containing the result.
"""
# Check input.
x = np.asarray(x).squeeze()
y = np.asarray(y).squeeze()
if x.ndim != 1 or y.ndim != 1:
raise ValueError('`x` and `y` should be 1-dimensional.')
if verbose > 1:
print(HORIZONTAL_RULE)
print('Running dynamic_coupling with parameters:')
print(self.parameters)
# Extract some parameters.
method = self.parameters['method']
method_kwargs = self.parameters['method_kwargs']
# Call the corresponding function to compute the masks for the bursts and inter-burst-intervals (IBIs).
if isinstance(method, str):
method = method.lower()
# Default label.
if label is None:
label = method
# Select corresponding method and compute coupling in windows.
if method in ['correlation', 'corr']:
coupling, coupling_surrogates = self.correlation_dynamic_coupling(
x, y, fs=fs, verbose=verbose, **method_kwargs)
elif method in ['coherence', 'coh']:
coupling, coupling_surrogates = self.coherence_dynamic_coupling(
x, y, fs=fs, verbose=verbose, **method_kwargs)
elif method in ['transfer_function', 'tf']:
coupling, coupling_surrogates = self.tf_dynamic_coupling(
x, y, fs=fs, verbose=verbose, **method_kwargs)
elif method in ['wavelet_based_transfer_function', 'wbtf']:
coupling, coupling_surrogates = self.wbtf_dynamic_coupling(
x, y, fs=fs, verbose=verbose, **method_kwargs)
else:
raise ValueError('Invalid method "{}". Choose from "{}".'
.format(method, ['correlation', 'coherence',
'transfer_function', 'wavelet_based_transfer_function']))
elif callable(method):
# Assume that `method` is the coupling_fun for _compute_dynamic_coupling().
# Default label.
if label is None:
label = 'Custom coupling'
# Compute coupling in windows.
coupling, coupling_surrogates = self._compute_dynamic_coupling(
x, y, fs=fs, coupling_fun=partial(method, **method_kwargs), verbose=verbose)
else:
raise ValueError('Invalid method "{}". Choose from "{}", or specify your custom coupling function.'
.format(method, ['correlation', 'coherence',
'transfer_function', 'wavelet_based_transfer_function']))
result = DynamicCouplingResult(coupling=coupling,
coupling_surrogates=coupling_surrogates,
algorithm_parameters=self.parameters,
label=label)
return result
[docs] def dynamic_coupling(self, *args, **kwargs):
"""
Alias for self.process().
"""
return self.process(*args, **kwargs)
[docs] def tf_dynamic_coupling(self, x, y, fs, f_low=0, f_high=None, verbose=1,
window='hann', nperseg=None, noverlap=None, nfft=None, detrend='constant',
**kwargs):
"""
Compute transfer function gain between x and y in a windowed way.
Args:
x (np.ndarray): 1D signal array.
y (np.ndarray): 1D signal array.
fs (float): sample frequency of x and y.
f_low (float, optional): frequency specifying the low edge frequency of the band in which
to compute the average coherence.
Defaults to 0.
f_high (float, optional): frequency specifying the high edge frequency of the band in which
to compute the average coherence.
If None, this will be the Nyqvist frequency, i.e. fs/2.
Defaults to None.
verbose (int, optional): verbose level.
Defaults to 1.
See documentation of scipy.signal.csd function for other (optional) arguments.
**kwargs: for compute_transfer_function().
Returns:
coupling (np.ndarray): 1D array with coupling values.
coupling_surrogates (np.ndarray): coupling values for each surrogate.
"""
if f_high is None:
# Set to the Nyqvist frequency, i.e. the maximum frequency for which the FFT can be computed.
f_high = fs / 2
# Create function to compute the coupling between two segments.
def compute_tf(x, y):
f, gain, _, _ = compute_transfer_function(
x, y, fs=fs, window=window, nperseg=nperseg, noverlap=noverlap,
nfft=nfft, detrend=detrend, **kwargs)
freq_mask = np.logical_and(f >= f_low, f <= f_high)
# Depending on how many samples are used for the FFT, the discrete frequencies at which the
# coherence is computed may not be suitable for the given edge frequencies.
# Raise an error if less than 2 frequencies fall in the specified range.
# If an error occurs, nperseg and/or nfft parameter can be set to control the number of FFT points.
# The frequencies at which the FFT will be computed is np.linspace(0, fs/2, nfft).
if np.sum(freq_mask) < 2:
raise ValueError('Less than 2 frequencies fall in specified frequency band (f_low={}, f_high={}). \n'
'Computed frequencies are: range({}, {}, {}). \n'
'Adjust the band limits, or increase the nperseg and/or nfft parameter.'
.format(f_low, f_high, f[0], f[-1], f[1] - f[0]))
# Return the gain averaged over the specified frequency range.
return np.mean(gain[freq_mask])
# Compute coupling.
coupling, coupling_surrogates = self._compute_dynamic_coupling(
x, y, fs=fs, coupling_fun=compute_tf, verbose=verbose)
return coupling, coupling_surrogates
[docs] def wbtf_dynamic_coupling(self, x, y, fs, verbose=1):
"""
Compute coherence between x and y in a windowed way.
Args:
x (np.ndarray): 1D signal array.
y (np.ndarray): 1D signal array.
fs (float): sample frequency of x and y.
verbose (int, optional): verbose level.
Defaults to 1.
Returns:
coupling (np.ndarray): 1D array with coupling values.
coupling_surrogates (np.ndarray): coupling values for each surrogate.
"""
# Create function to compute the coupling between two segments.
def compute_wbtf(x, y):
raise NotImplementedError
# Compute coupling.
coupling, coupling_surrogates = self._compute_dynamic_coupling(
x, y, fs=fs, coupling_fun=compute_wbtf, verbose=verbose)
return coupling, coupling_surrogates
def _compute_dynamic_coupling(self, x, y, fs, coupling_fun, verbose=1):
"""
Compute the coupling between segments of x and y using a couping function.
Args:
x (np.ndarray): 1D signal array.
y (np.ndarray): 1D signal array.
fs (float): sample frequency of x and y.
coupling_fun (function): function that takes in two arrays and returns a single coupling value.
verbose (int, optional): verbose level.
For a progress bar set to 1.
Returns:
coupling (np.ndarray): 1D array with coupling values.
coupling_surrogates (np.ndarray or None): array with coupling values for each surrogate. If per_segment,
has shape (n_surrogates, n_segments). Otherwise, has shape (n_surrogates,).
"""
# Extract some parameters.
seg_pars = self.parameters['segmentation']
segment_length = seg_pars['segment_length']
overlap = seg_pars['overlap']
sur_pars = self.parameters['surrogates']
n_surrogates = sur_pars['n_surrogates']
per_segment = sur_pars['per_segment']
how = sur_pars['how']
seed = sur_pars['seed']
# Initialize coupling surrogates. If not computed, remains None.
coupling_surrogates = None
if n_surrogates > 0 and not per_segment:
# Compute global surrogates.
nperseg = int(segment_length*fs)
coupling_surrogates = compute_surrogate_couplings_sample(
x, y, fun=coupling_fun, axis=-1, nperseg=nperseg, n_surrogates=n_surrogates, how=how, seed=seed)
coupling_surrogates
# Segment the data in the time axis (create a generator).
x_gen = segment_generator(x, segment_length=segment_length,
overlap=overlap, fs=fs, axis=-1)
y_gen = segment_generator(y, segment_length=segment_length,
overlap=overlap, fs=fs, axis=-1)
n_segments_x = compute_n_segments(x, segment_length=segment_length,
overlap=overlap, fs=fs, axis=-1)
n_segments_y = compute_n_segments(y, segment_length=segment_length,
overlap=overlap, fs=fs, axis=-1)
n_segments = min([n_segments_x, n_segments_y])
# Initialize progress bar.
bar = pyprind.ProgBar(n_segments, stream=sys.stdout)
# Init matrices that will contain the results.
coupling = np.zeros(n_segments)
if n_surrogates > 0 and per_segment:
coupling_surrogates = np.zeros((n_surrogates, n_segments))
# Loop over segments.
for i_seg, (x_seg, y_seg) in enumerate(zip(x_gen, y_gen)):
# Check if one of the signals contains NaNs.
is_nan = np.any(np.isnan(x_seg)) or np.any(np.isnan(y_seg))
# Compute coupling between segments.
if is_nan:
coupling[i_seg] = np.nan
else:
coupling[i_seg] = coupling_fun(x_seg, y_seg)
# Compute surrogate couplings if requested.
if n_surrogates > 0 and per_segment:
if is_nan:
coupling_surrogates[:, i_seg] = np.nan
else:
coupling_surrogates[:, i_seg] = compute_surrogate_fun(
(x_seg, y_seg), fun=coupling_fun,
n_surrogates=n_surrogates, how=how, seed=seed, verbose=verbose > 2)
# Update progress bar.
if verbose > 0:
bar.update()
return coupling, coupling_surrogates
[docs]class DynamicCouplingResult(ResultBase):
"""
High-level interface for processing dynamic coupling as computed by nnsa.DynamicCoupling().
Args:
coupling (np.ndarray): 1D array with coupling values with size (n_segments,).
algorithm_parameters (nnsa.Parameters): see ResultBase.
coupling_surrogates (np.ndarray, optional): optional surrogate values for the coupling.
Can be a 1D array, with shape (n_surrogates,) or a 2D array with shape (n_surrogates, n_segments).
If not specified, some methods using surrogate data will raise errors.
Defaults to None.
label (str, optional): label for the coupling output (e.g. 'EEG-NIRS').
Defaults to 'coupling'.
data_info (str, optional): see ResultBase.
segment_start_times (np.ndarray, optional): see ResultBase.
segment_end_times (np.ndarray, optional): see ResultBase.
fs (flaot, optional): see ResultBase.
"""
def __init__(self, coupling, algorithm_parameters, coupling_surrogates=None,
label='coupling', data_info=None,
segment_start_times=None, segment_end_times=None, fs=None):
super().__init__(algorithm_parameters=algorithm_parameters, data_info=data_info,
segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs)
# Input check.
coupling = np.asarray(coupling).squeeze()
if coupling.ndim != 1:
raise ValueError('`coupling` should be 1-dimensional.')
# Store variables that are not already stored by the parent class (ResultBase).
self.coupling = coupling
self.coupling_surrogates = coupling_surrogates
self.label = label
@property
def num_segments(self):
"""
Return the number of segments.
Returns:
(int): number of segments.
"""
return len(self.coupling)
[docs] def get_significance(self, **kwargs):
"""
Return a TimeSeries with the significance.
Significance is defined as the fraction of surrogates that have lower coupling values as the real value.
Therefore, if the significance is close to 1, the computed coupling can be deemed significant.
Args:
**kwargs (optional): keyword arguments for TimeSeries.
Returns:
ts (TimeSeries): TimeSeries object containing the significance.
"""
if self.coupling_surrogates is None:
raise ValueError('No surrogate coupling values available. Cannot determine significance.')
# Compute significance.
significance = np.nanmean(self.coupling > self.coupling_surrogates, axis=0)
# Convert to TimeSeries and plot.
ts = TimeSeries(signal=significance, fs=self.fs, label='Coupling significance',
time_offset=self.segment_times[0], check_label=False, **kwargs)
return ts
[docs] def plot(self, time_scale=None, alpha=0.05, ax=None, **kwargs):
""""
Plot the coupling (and the surrogate coupling values if available) in the current axes.
Args:
time_scale (str): see format_time_axis().
alpha (float): significance value for surrogates.
ax (plt.Axes): axes to plot in.
**kwargs (optional): keyword arguments for plt.plot() for plotting the coupling values.
"""
if ax is None:
ax = plt.gca()
# Time array.
t_seg = self.segment_times
# Plot surrogates first (if they exist).
if self.coupling_surrogates is not None:
coup_surs = self.coupling_surrogates
threshold = np.percentile(coup_surs, q=(1 - alpha) * 100, axis=0)
if coup_surs.ndim == 1:
# Not per segment.
ax.axhspan(np.min(coup_surs, axis=0), np.max(coup_surs, axis=0),
color='k', alpha=0.2, label='Surrogates range')
ax.axhline(threshold, color='k', ls='--', label='Significance threshold')
else:
# Per segment.
ax.fill_between(t_seg, np.min(coup_surs, axis=0), np.max(coup_surs, axis=0),
color='k', alpha=0.2, label='Surrogates range')
ax.plot(t_seg, threshold, color='k', ls='--', label='Significance threshold')
# Plot the real coupling values as funcion of time.
plot_kwargs = dict(dict(label=self.label), **kwargs)
ax.plot(t_seg, self.coupling, **plot_kwargs)
ax.legend()
# Format time axis.
format_time_axis(time_scale=time_scale)
[docs] def plot_significance(self, **kwargs):
"""
Plot the significance (if coupling surrogates is available).
Args:
**kwargs (optional): keyword arguments for TimeSeries.plot().
"""
# Draw an horizontal line to indicate the 0.95 significance level.
plt.axhline(0.95, linestyle='--', color='k', alpha=0.2)
# Plot significance.
self.get_significance().plot(**kwargs)
[docs] def to_time_series(self, **kwargs):
"""
Convert result to TimeSeries object.
Args:
**kwargs (optional): keyword arguments for TimeSeries.
Returns:
ts (TimeSeries): TimeSeries object containing the coupling.
"""
ts = TimeSeries(signal=self.coupling, fs=self.fs, label=self.label, check_label=False,
time_offset=self.segment_times[0], **kwargs)
return ts
def _merge(self, other):
"""
See ResultBase.
"""
self.couling = np.concatenate((self.coupling, other.coupling), axis=-1)
@staticmethod
def _read_from_hdf5(filepath):
"""
Read result from hdf5 file into a DynamicCouplingResult class.
Args:
filepath (str): see ResultBase._read_from_hdf5().
Returns:
result (nnsa.DynamicCouplingResult): instance of DynamicCouplingResult containing the
DynamicCoupling result.
"""
# Read standard hdf5 header (use the ResultBase method).
algorithm_parameters, data_info, segment_start_times, segment_end_times, fs, time_offset = \
ResultBase._read_hdf5_header(filepath)[1:]
# Re-open the file and read the rest of the file.
with h5py.File(filepath, 'r') as f:
# Read array data.
coupling = f['coupling'][:]
if 'coupling_surrogates' in f:
coupling_surrogates = f['coupling_surrogates'][:]
else:
coupling_surrogates = None
# Read non-array data.
label = f['coupling'].attrs['label'].decode()
# Create a result object.
result = DynamicCouplingResult(coupling=coupling, algorithm_parameters=algorithm_parameters,
coupling_surrogates=coupling_surrogates,
label=label, data_info=data_info,
segment_start_times=segment_start_times,
segment_end_times=segment_end_times,
fs=fs)
return result
def _write_to_hdf5(self, filepath):
"""
Write the contents of the object to an hdf5 file.
Args:
filepath (str): see ResultBase._write_to_hdf5().
"""
# Write standard hdf5 header (use the ResultBase method).
self._write_hdf5_header(filepath)
# Append attributes to the hdf5 file.
with h5py.File(filepath, 'a') as f:
# Write array data.
f.create_dataset('coupling', data=self.coupling)
if self.coupling_surrogates is not None:
f.create_dataset('coupling_surrogates', data=self.coupling_surrogates)
# Write non-array data as attributes to the 'coupling' group.
# Convert strings to np.string_ type as recommended for compatibility.
f['coupling'].attrs['label'] = np.string_(self.label)
def bprsa(x, y, L, anchor_mode='increase', T=1, normalize=True, center_anchor=True,
verbose=1, **kwargs):
"""
Do bivariate phase rectified signal averaging without the averaging step (just returns all segments).
Args:
x: 1D trigger signal.
y: target signal(s) with same length as x.
L (int or tuple): int or tuple specifying the window length(s) around the anchor point.
If int, i-L:i+L is the window around anchor i.
If tuples, then i-L[0]:i+L[1] is the window around anchor i.
anchor_mode (str): choose from
- 'increase'
- 'decrease'
- 'peaks'
T (int): number of samples to average when looking for anchor points (after resampling).
normalize (bool): normalize the signals (zero mean, unit SD).
center_anchor (bool): move anchor at zero for each segment.
verbose (int): verbosity level.
**kwargs: for find_peaks if anchor_mode is "peaks".
Returns:
x_all (np.ndarray): trigger segments with shape (n_segments, L[0]+L[1]).
y_all (np.ndarray): target segments with shape (n_segments, L[0]+L[1], ...).
idx_anchors (np.ndarray): 1D array with indices of the anchor points.
"""
x = np.asarray(x)
y = np.asarray(y)
# Check shapes.
if x.ndim != 1:
raise ValueError('`x` should be 1D. Got shape {}.'
.format(x.shape))
if len(x) != len(y):
raise ValueError('`x` and `y` should have lengths. Got {} and {}.'
.format(len(x), len(y)))
# Normalize (zero mean, unit std).
if normalize:
x = (x - np.nanmean(x, axis=0, keepdims=True)) / np.nanstd(x, axis=0, keepdims=True)
y = (y - np.nanmean(y, axis=0, keepdims=True)) / np.nanstd(y, axis=0, keepdims=True)
# Find anchors.
if anchor_mode in ['increase', 'decrease']:
# Determine convolutional kernels. Note that the kernels are reversed when computing convolution.
kernel_current = np.concatenate((np.ones(T), np.zeros(T)))
kernel_previous = np.concatenate((np.zeros(T), np.ones(T)))
# Convolve to get the current sample estimate (a) and previous sample estimate (b).
current = np.convolve(x, kernel_current, 'same')
previous = np.convolve(x, kernel_previous, 'same')
if anchor_mode == 'increase':
idx_anchors = np.where(current > previous)[0]
elif anchor_mode == 'decrease':
idx_anchors = np.where(current < previous)[0]
else:
raise AssertionError
elif anchor_mode in ['peak', 'peaks']:
# Peaks in x are anchor points.
x_smooth = np.convolve(x, np.ones(T) / T, 'same')
idx_anchors, _ = find_peaks(x=x_smooth, **kwargs)
else:
raise ValueError('Invalid choice for anchor_mode="{}".'.format(anchor_mode))
# Remove points too close to the edges,
idx_anchors = idx_anchors[((idx_anchors - L[0]) >= 0) & ((idx_anchors + L[1]) <= len(x))]
if verbose:
print('{} anchors identified.'.format(len(idx_anchors)))
# Loop over anchor points and collect phase rectified segments.
x_all = []
y_all = []
bar = pyprind.ProgBar(len(idx_anchors), stream=sys.stdout)
for idx in idx_anchors:
idx_start = idx - L[0]
idx_stop = idx + L[1]
x_i = x[idx_start: idx_stop]
y_i = y[idx_start: idx_stop]
# Center around anchor (move anchor point to zero).
if center_anchor:
x_i = x_i - x[idx:idx + 1]
y_i = y_i - y[idx:idx + 1]
x_all.append(x_i)
y_all.append(y_i)
if verbose:
bar.update()
# To arrays.
x_all = np.asarray(x_all)
y_all = np.asarray(y_all)
return x_all, y_all, idx_anchors
def compute_transfer_function(
x, y, fs=1.0, window='hann', nperseg=None, noverlap=None, nfft=None, detrend='constant',
return_onesided=True, scaling='density', axis=-1, average='mean'):
"""
Compute the transfer function (TF) gain and phase between two signals.
Uses the Welch method to estimate the power spectral densities.
Args:
x (np.ndarray): input signal.
y (np.ndarray): output signal.
See documentation of scipy.signal.csd function for other (optional) arguments.
Returns:
f (np.ndarray): frequency vector corresponding to `TF`.
gain (float): TF gain, averaged over all frequencies.
phase (float): TF phase, averaged over all frequencies.
H (np.ndarray): complex tranfer function values as function of frequency.
Examples:
>>> np.random.seed(43)
>>> t = np.arange(1000)
>>> x = np.sin(t/10) + np.random.rand(len(t))
>>> y = 2*np.sin(t/10 + 2) + np.random.rand(len(t))
>>> f, gain, phase, H = compute_transfer_function(x, y)
"""
csd_kwargs = dict(window=window, nperseg=nperseg, noverlap=noverlap,
nfft=nfft, detrend=detrend, return_onesided=return_onesided,
scaling=scaling, axis=axis, average=average)
# Compute input-output cross power spectral density.
f, Pxy = scipy.signal.csd(x, y, fs=fs, **csd_kwargs)
# Compute input power spectral density.
f, Pxx = scipy.signal.csd(x, x, fs=fs, **csd_kwargs)
# Compute transfer function.
H = Pxy/Pxx
# Compute gain and phase per frequency.
gain = np.abs(H)
phase = np.unwrap(np.angle(H))
return f, gain, phase, H