import sys
import warnings
import numpy as np
import pyprind
from scipy.signal import lfilter
import pandas as pd
__all__ = [
'compute_surrogate_fun',
'compute_surrogate_couplings_sample',
'compute_surrogate',
'compute_AAFT_surrogate',
'compute_IAAFT_surrogate',
'compute_FT_surrogate',
'compute_RS_surrogate',
'compute_AR_surrogate',
'compute_ar1',
]
from nnsa.utils.arrays import interp_nan
[docs]def compute_surrogate_fun(inputs, fun, n_surrogates=300, how='IAAFT', seed=None,
return_af_errors=False, verbose=1, **kwargs):
"""
Evaluate fun(*inputs) `n_surrogates` times, where each iteration new surrogates are used of the original inputs.
Args:
inputs (iterable): list or tuple of 1D arrays which are the original inputs for `fun`.
fun (function): function that takes as input len(inputs) 1D arrays and returns a value or array.
n_surrogates (int, optional): number of surrogates to use.
Defaults to 300.
how (str, optional): method how to generate the surrogates. See compute_surrogate in nnsa.stats.surrogates.
Defaults to 'IAAFT'.
seed (int, optional): seed for the random generator.
Defaults to None.
return_af_errors (bool, optional): if True, returns an additional array which contains the errors between
the coherence of clean vs artefacted surrogates. To create the artefact surrogates, artefacts are inserted
at locations where there are artefacts in the `inputs`.
verbose (int, optional): verbosity level.
Defaults to 1.
**kwargs: optional keyword arguments for `fun`.
Returns:
surrogate_values (np.ndarray): array with length `n_surrogates` containing the results for each iteration.
errors (np.ndarray): only returned if return_af_errors: array with length `n_surrogates` containing the
difference of the fun value between clean vs artefact surrogates.
Examples:
>>> np.random.seed(43)
>>> x = np.random.rand(512)
>>> y = np.random.rand(512)
>>> fun = lambda x, y: np.abs(np.corrcoef(x, y)[0, 1])
# Compute coupling for all surrogates.
>>> surrogate_values = compute_surrogate_fun((x, y), fun, n_surrogates=300, verbose=0)
>>> surrogate_values.shape
(300,)
# Determine significance threshold for correlation between x and y.
>>> sig_threshold = np.percentile(surrogate_values, 95)
>>> print(sig_threshold)
0.07910408001084722
"""
if n_surrogates is None or n_surrogates < 1:
return np.array([])
if isinstance(inputs, np.ndarray) and inputs.ndim == 1:
inputs = (inputs,)
# To arrays.
inputs = [np.asarray(x) for x in inputs]
if seed is not None:
# Seed random generator.
np.random.seed(seed)
# Precompute things if possible.
sur_kwargs = [precompute_surrogate_data(x=x, how=how) for x in inputs]
nan_masks = [np.isnan(x) for x in inputs]
any_nans = np.any([np.any(m) for m in nan_masks])
# Initialize progress bar.
if verbose > 0:
print('Computing surrogate function values...')
bar = pyprind.ProgBar(n_surrogates, stream=sys.stdout)
# Generate surrogates and compute coupling and optionally the errors introduces by artefacts.
surrogate_values = []
af_errors = []
for _ in range(n_surrogates):
# Create surrogate data for each input.
surrogates = []
surrogates_af = []
for x, sur_kwargs_x, nan_mask_x in zip(inputs, sur_kwargs, nan_masks):
# Surrogate.
x_sur = compute_surrogate(x, how=how, seed=None, **sur_kwargs_x) # Already seeded.
surrogates.append(x_sur)
if return_af_errors and any_nans:
if np.any(np.isnan(x_sur)):
# If we need to determine the MOI, there should not be any nans in the surrogate.
raise AssertionError('Nans in surrogate. This should not be the case when `determine_moi` is True.'
' Try another surrogate method.')
# Insert nans to simulate real case with missing data.
x_sur_af = x_sur.copy()
x_sur_af[nan_mask_x] = np.nan
surrogates_af.append(x_sur_af)
# Compute coupling.
coupling = fun(*surrogates, **kwargs)
surrogate_values.append(coupling)
# Warn in case of nans in the coupling result.
if np.any(np.isnan(coupling)):
msg = 'Nans in surrogate function values.'
warnings.warn(msg)
if return_af_errors:
if any_nans:
# Compute coupling in surrogates with artefacts.
coupling_af = fun(*surrogates_af)
else:
# No nans, so coupling does not change.
coupling_af = coupling
af_errors.append(coupling_af - coupling)
# Update progress bar.
if verbose > 0:
bar.update()
# Convert list to array.
surrogate_values = np.array(surrogate_values)
af_errors = np.array(af_errors)
if return_af_errors:
return surrogate_values, af_errors
else:
return surrogate_values
[docs]def compute_surrogate_couplings_sample(x, y, fun, axis=-1, nperseg=None, n_surrogates=300,
how='IAAFT', seed=None):
"""
Compute surrogate values for the coupling/interaction between random segments of `x` and `y` as computed by `fun`.
Create n_surrogate surrogates of segments of `x` and `y` by randomly selecting a channel and a starting index.
Computes the coupling between the surrogates and returns these.
Args:
x (np.ndarray): N-D array containing time series.
y (np.ndarray): N-D array containing time series. Should have the same shape as `x`.
fun (function): function that takes in two 1D arrays and returns a coupling value.
axis (int, optional): axis in `x` and `y` corresponding to the time dimension.
Defaults to -1.
nperseg (int, optional): number of samples in a randomly selected segment for computation of the coupling.
If None, the entire length of the signal is used to compute the coupling.
n_surrogates (int, optional): number of surrogates to use.
Defaults to 300.
how (str, optional): method how to generate the surrogates. See compute_surrogate in nnsa.stats.surrogates.
Defaults to 'IAAFT'.
seed (int, optional): seed for the random generator.
Defaults to None.
Returns:
surrogate_values (np.ndarray): `n_surrogates` coupling values of the surrogates.
Examples:
>>> np.random.seed(43)
>>> x = np.random.rand(10, 512*5, 20)
>>> y = np.random.rand(10, 512*5, 20)
>>> fun = lambda x, y: np.abs(np.corrcoef(x, y)[0, 1])
# Compute coupling for surrogates of random segments from random channels.
>>> surrogate_values = compute_surrogate_couplings_sample(x, y, axis=1, fun=fun, n_surrogates=300, nperseg=512)
>>> surrogate_values.shape
(300,)
# Determine significance threshold for correlation between x and y.
>>> sig_threshold = np.percentile(surrogate_values, 95)
>>> print(sig_threshold)
0.0820297882707145
"""
# Seed random generator.
if seed is not None:
np.random.seed(seed)
# Check inputs.
if x.shape != y.shape:
raise ValueError('`x` and `y` should have the same shape. Got shapes {} and {}.'.format(x.shape, y.shape))
n_time = x.shape[axis]
if nperseg is None:
nperseg = n_time
else:
nperseg = int(nperseg)
if nperseg > n_time:
raise ValueError('nperseg ({}) cannot be larger than the numebr of samples in the time dimension ({})'
.format(nperseg, n_time))
# Swap axis and reshape, so that we have a 2D array with the `axis`` dimension as the last dimension.
xn = x.swapaxes(axis, -1)
xn = xn.reshape(-1, n_time)
yn = y.swapaxes(axis, -1)
yn = yn.reshape(-1, n_time)
n_channels = xn.shape[0]
# Create surrogates and compute coupling.
surrogate_values = np.zeros(n_surrogates)
for i_sur in range(n_surrogates):
# Choose a random channel.
idx_chan = np.random.randint(0, n_channels, 1)[0]
# Choose a random start index.
idx_start = np.random.randint(0, n_time - nperseg + 1, 1)[0]
idx_stop = idx_start + nperseg
# Extract segment.
x_seg = xn[idx_chan, idx_start: idx_stop]
y_seg = yn[idx_chan, idx_start: idx_stop]
# Compute surrogates.
x_sur = compute_surrogate(x_seg, how=how, seed=None) # Already seeded.
y_sur = compute_surrogate(y_seg, how=how, seed=None) # Already seeded.
# Compute coupling.
coupling = fun(x_sur, y_sur)
surrogate_values[i_sur] = coupling
return surrogate_values
[docs]def compute_surrogate(x, how, seed=None, **kwargs):
"""
Compute a surrogate signal for `x` using the method specified by `how`.
Args:
x (np.ndarray): 1D time series.
how (str, optional): method how to generate the surrogates. Choose from:
'RS'/'random_shuffle',
'FT'/'forier_transform'/'RP'/'random_phase',
'AAFT',
'IAAFT',
'AR'.
See the codes for each of these methods for more information about them (nnsa.stats.surrogates).
seed (int, optional): seed for the random generator.
Defaults to None.
**kwargs (optional): optional keyword arguments for the surrogate function that is called.
Returns:
x_surrogate (np.ndarray): surrogate signal for `x`.
"""
# Input check.
x = np.asarray(x)
if x.ndim != 1:
raise ValueError('x should be 1-dimensional. Got shape {}.'.format(x.shape))
if np.any(np.isnan(x)):
msg = '\n`x` contains nan.'
warnings.warn(msg)
# Make how case-insensitive.
how = how.lower()
if how == 'aaft':
x_surrogate = compute_AAFT_surrogate(x, seed=seed, **kwargs)
elif how == 'iaaft':
x_surrogate = compute_IAAFT_surrogate(x, seed=seed, **kwargs)
elif how in ['rp', 'random_phase', 'ft', 'fourier_transform']:
x_surrogate = compute_FT_surrogate(x, seed=seed, **kwargs)
elif how in ['rs', 'random_shuffle']:
x_surrogate = compute_RS_surrogate(x, seed=seed, **kwargs)
elif how in ['ar', 'ar1']:
x_surrogate = compute_AR_surrogate(x, seed=seed, **kwargs)
else:
raise ValueError('Invalid parameter how="{}". Choose from {}.'.format(
how, ['aaft', 'iaaft', 'ft', 'random_shuffling', 'ar']
))
return x_surrogate
[docs]def compute_AAFT_surrogate(x, seed=None):
"""
Compute the Amplitude Adjusted Fourier Transform surrogate.
This method tries to preserve both the linear structure and the amplitude distribution.
The drawback of this method is precisely that the last step changes somewhat the linear structure.
Args:
x (np.ndarray): 1D signal array.
seed (int, optional): seed for the random generator.
Returns:
x_surrogate (np.ndarray): surrogate signal for `x`.
"""
# Seed random generator.
if seed is not None:
np.random.seed(seed)
# Scale the data to a Gaussian distribution.
gaussian = np.random.randn(x.shape[0])
gaussian.sort(axis=0)
ranks = interp_nan(x).argsort(axis=0).argsort(axis=0)
rescaled_x = gaussian[ranks]
# Perform a FT transformation of the rescaled data.
ft_surr = compute_FT_surrogate(rescaled_x, xf=None, seed=seed)
# Replace nans by random sampling with replacement from non-nan samples.
sorted_original = x.copy()
nan_mask = np.isnan(sorted_original)
sorted_original[nan_mask] = np.random.choice(sorted_original[~nan_mask],
np.sum(nan_mask),
replace=True)
# Rescale back to amplitude distribution of original data.
sorted_original.sort(axis=0)
ranks = ft_surr.argsort(axis=0).argsort(axis=0)
x_surrogate = sorted_original[ranks]
return x_surrogate
[docs]def compute_IAAFT_surrogate(x, n_iters=10, xf=None, seed=None):
"""
Compute the Iterative Amplitude Adjusted Fourier Transform surrogate.
This algorithm is an iterative version of AAFT. The steps are repeated so that the surrogate autocorrelation
function becomes more similar to the original.
Args:
x (np.ndarray): 1D signal array.
n_iters (np.ndarray, optional): number of iterations to do.
Defaults to 10.
xf (np.ndarray, optional): precomputed one-dimensional discrete Fourier Transform of x,
i.e. np.fft.rfft(x, axis=0).
If None, it is computed in the function.
Defaults to None.
seed (int, optional): seed for the random generator.
Returns:
x_surrogate (np.ndarray): surrogate signal for `x`.
"""
# Compute FFT.
if xf is None:
xf = np.fft.rfft(interp_nan(x), axis=0)
xf_amps = np.abs(xf)
# Copy original.
sorted_original = x.copy()
# Replace nans by random sampling with replacement from non-nan samples.
nan_mask = np.isnan(sorted_original)
sorted_original[nan_mask] = np.random.choice(sorted_original[~nan_mask],
np.sum(nan_mask),
replace=True)
# Sort original.
sorted_original.sort(axis=0)
# Starting point.
x_surrogate = compute_AAFT_surrogate(x, seed=seed)
# Iterate.
for _ in range(n_iters):
r_fft = np.fft.rfft(x_surrogate, axis=0)
r_phases = r_fft / (np.abs(r_fft) + 1e-14)
s = np.fft.irfft(xf_amps * r_phases, n=x.shape[0], axis=0)
ranks = s.argsort(axis=0).argsort(axis=0)
x_surrogate = sorted_original[ranks]
return x_surrogate
[docs]def compute_FT_surrogate(x, xf=None, seed=None):
"""
Compute the Fourier Transform surrogate.
Surrogate data is created by the inverse Fourier Transform of the Fourier Transform of the original signal with
new (uniformly random) phases. This preserves the linear correlation (the periodogram) of the original signal.
Args:
x (np.ndarray): 1D signal array.
xf (np.ndarray, optional): precomputed one-dimensional discrete Fourier Transform of x,
i.e. np.fft.rfft(x, axis=0).
If None, it is computed in the function.
Defaults to None.
seed (int, optional): seed for the random generator.
Returns:
x_surrogate (np.ndarray): surrogate signal for `x`.
"""
# Compute FFT.
if xf is None:
xf = np.fft.rfft(interp_nan(x), axis=0)
# Seed the random generator.
if seed is not None:
np.random.seed(seed)
# Random phase angles.
angle = np.random.uniform(0, 2 * np.pi, (xf.shape[0],))
# Set the slowest frequency (mean) to zero, i.e. not to be randomised.
angle[0] = 0
# Phase shift of FT coefficients.
cxf = xf * np.exp(1j * angle)
# Inverse FT.
x_surrogate = np.fft.irfft(cxf, n=x.shape[0], axis=0)
return x_surrogate
[docs]def compute_RS_surrogate(x, seed=None, replace_nan=True):
"""
Compute the random shuffle (RS) surrogate.
Surrogate data is created by randomly shuffling the samples in the time domain. The permutations guarantee the
same amplitude distribution than the original series, but destroy any linear correlation. This method is
associated to the null hypothesis of the data being uncorrelated noise (possibly Gaussian and measured by a
static nonlinear function).
Args:
x (np.ndarray): 1D signal array.
seed (int, optional): seed for the random generator.
Returns:
x_surrogate (np.ndarray): surrogate signal for `x`.
"""
# Seed the random generator.
if seed is not None:
np.random.seed(seed)
if replace_nan:
# Replace nans by random sampling with replacement from non-nan samples.
x = x.copy()
nan_mask = np.isnan(x)
x[nan_mask] = np.random.choice(
x[~nan_mask], np.sum(nan_mask), replace=True)
# Random permutation of the time series.
x_surrogate = np.random.permutation(x)
return x_surrogate
[docs]def compute_AR_surrogate(x, r=None, seed=None):
"""
Compute a red noise surrogate assuming x can be modelled with a univariate lag-1 autoregressive model.
Surrogate data is created by generating random red noise, using the lag-1 autocorrelation estimated from `x`.
Taken from the pycwt package.
Args:
x (np.ndarray): 1D signal array.
r (float, optional): precomputed lag-1 autocorrelation of x.
seed (int, optional): seed for the random generator.
Returns:
x_surrogate (np.ndarray): surrogate signal for `x`.
"""
# Seed the random generator.
if seed is not None:
np.random.seed(seed)
if r is None:
# Estimate the lag-1 autocorrelation.
r = compute_ar1(x)
# Twice the decorrelation time.
tau = int(np.ceil(-2 / np.log(np.abs(r))))
# Compute red noise.
N = len(x)
yr = lfilter([1, 0], [1, -r], np.random.normal(size=N + tau, scale=1, loc=0))
yr = yr[tau:]
return yr.squeeze()
[docs]def compute_ar1(x):
"""
Compute lag-1 autocorrelation.
Args:
x (np.ndarray): 1D signal array.
Returns:
r (float): lag-1 autocorrelation of x.
References:
[1] Allen, M. R. and Smith, L. A. Monte Carlo SSA: detecting
irregular oscillations in the presence of colored noise.
*Journal of Climate*, **1996**, 9(12), 3373-3404.
<http://dx.doi.org/10.1175/1520-0442(1996)009<3373:MCSDIO>2.0.CO;2>
[2] http://www.madsci.org/posts/archives/may97/864012045.Eg.r.html
"""
if np.any(np.isnan(x)):
msg = '\n`x` contains nan. Removing nans for lag-1 autocorrelation estimation.'
warnings.warn(msg)
x = np.asarray(x)
x = x[~np.isnan(x)]
N = x.size
xm = x.mean()
x = x - xm
# Estimates the lag zero and one covariance
c0 = x.dot(x) / N
c1 = x[0:N - 1].dot(x[1:N]) / (N - 1)
# According to A. Grinsteds' substitutions
B = -c1 * N - c0 * N ** 2 - 2 * c0 + 2 * c1 - c1 * N ** 2 + c0 * N
A = c0 * N ** 2
C = N * (c0 + c1 * N - c1)
D = B ** 2 - 4 * A * C
if D > 0:
r = (-B - D ** 0.5) / (2 * A)
else:
warnings.warn('Cannot place an upperbound on the unbiased AR(1). '
'Series is too short or trend is to large.')
r = np.corrcoef(x[:-1], x[1:])[0, 1]
return r
def precompute_surrogate_data(x, how):
"""
Precompute data and parameters for surrogate generation of signal `x` with method `how`.
Args:
x (np.ndarray): 1D signal array.
how (str): how to compute the surrogate. See compute_surrogate().
Returns:
sur_kwargs (dict): dictionary with optional keyword arguments for compute_surrogate(x=x, how=how, ...).
"""
x = np.asarray(x)
if x.ndim != 1:
raise ValueError('x should be 1-dimensional. Got shape {}.'.format(x.shape))
if np.any(np.isnan(x)):
msg = '\n`x` contains nan. Setting nans to zero for FFT.'
warnings.warn(msg)
# Initialize output.
sur_kwargs = dict()
# Make how case-insensitive.
how = how.lower()
if how in ['iaaft', #'aaft',
'rp', 'random_phase',
'ft', 'fourier_transform']:
# Precompute FFT of x.
sur_kwargs['xf'] = np.fft.rfft(interp_nan(x), axis=0)
elif how in ['rs', 'random_shuffle']:
pass
elif how == 'ar':
# Compute autocorrelation coefficient.
sur_kwargs['r'] = compute_ar1(x)
else:
pass
return sur_kwargs