import numpy as np
from scipy.signal import lfilter
from sklearn.utils import check_random_state
from nnsa.preprocessing.filter import MovingAverage
from nnsa.utils.arrays import mirror_boundaries, do_for_axis
__all__ = [
'add_artefacts',
'generate_eeg',
'generate_timeseries',
'generate_timeseries_AR1',
'generate_af_data',
'insert_artefacts',
'match_correlation',
]
[docs]def add_artefacts(x, p, min_len=1, max_len=None, seed=None, fill_value=np.nan):
"""
Add artefacts to x with lengths from `min_len` to `max_len` each with a probability of `p`.
"""
# Get a (seeded) random generator.
rng = np.random.default_rng(seed)
# Some variables.
n = len(x)
if max_len is None:
# Take 10% of signal length by default.
max_len = np.ceil(n*0.1)
max_len = int(max_len)
# Precompute some things.
y = x.copy()
p_all = rng.random(max_len)
add_all = p_all < p
len_all = range(min_len, max_len)
idx_all = rng.choice(range(int(n-max_len/2)), size=len(len_all), replace=True)
for af_len, add, idx_start in zip(len_all, add_all, idx_all):
if add:
idx_stop = min([n, idx_start + af_len])
y[idx_start: idx_stop] = fill_value
return y
[docs]def generate_af_data(n_repeat, fs, n_samples, n_cos=100,
snr=None, n_pad=None, identical=True, af_len=1, squeeze=True):
"""
Generate test data with artefacts.
"""
# Predefine and compute some things.
t = np.arange(n_samples) / fs
f_min = 1 / t[-1]
f_max = fs / 2
f_all = np.logspace(f_min, f_max, n_cos)
tf0 = np.outer(t, f_all)
# Artefact locations and lengths.
locs = (np.array([0.05, 0.12, 0.2, 0.35, 0.5, 0.8]) * n_samples).astype(int)
lens = (np.array([0.0025, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2]) * n_samples / 2 * af_len).astype(int)
# Padding.
pad_ar = np.full(n_pad, fill_value=np.nan)
# Loop over repetitions.
x_all = []
y_all = []
x_true_all = []
y_true_all = []
for _ in range(n_repeat):
tf = tf0 + np.random.rand(1, n_cos) * 2 * np.pi
amps = np.random.rand(1, n_cos)
x = np.mean(amps * np.cos(2 * np.pi * tf), axis=-1)
if identical:
y = x.copy()
else:
tf = tf0 + np.random.rand(1, n_cos) * 2 * np.pi
amps = np.random.rand(1, n_cos)
y = np.mean(amps * np.cos(2 * np.pi * tf), axis=-1)
if snr is not None:
# Add noise.
x_noise = np.random.rand(*x.shape) - 0.5
gain = np.sqrt(np.nanvar(x) / np.nanvar(x_noise) / snr)
x += gain * x_noise
y_noise = np.random.rand(*y.shape) - 0.5
gain = np.sqrt(np.nanvar(y) / np.nanvar(y_noise) / snr)
y += gain * y_noise
# Pad edges to simulate no coupling beyond edges (by default zero padding both will lead to coupling).
if n_pad is not None:
x = np.concatenate([pad_ar, x, pad_ar])
y = mirror_boundaries(y, n_left=n_pad, n_right=n_pad, axis=-1)
assert len(x) == len(y)
x_true_all.append(x.copy())
y_true_all.append(y.copy())
# Add artefacts.
for ii, (af_loc, af_len) in enumerate(zip(locs, lens)):
start_idx = (n_pad if n_pad is not None else 0) + int(af_loc - af_len / 2)
if np.random.rand(1) > 0.5:
x[start_idx:start_idx + af_len] = np.nan
else:
y[start_idx:start_idx + af_len] = np.nan
x_all.append(x)
y_all.append(y)
x_all = np.asarray(x_all)
y_all = np.asarray(y_all)
x_true_all = np.asarray(x_true_all)
y_true_all = np.asarray(y_true_all)
if squeeze:
x_all = np.squeeze(x_all)
y_all = np.squeeze(y_all)
x_true_all = np.squeeze(x_true_all)
y_true_all = np.squeeze(y_true_all)
return x_all, y_all, x_true_all, y_true_all
[docs]def generate_eeg(fs=250, duration=600, amplitude=75, f_low=0, f_high=20, n_f=50, seed=None):
"""
Generate a random toy EEG example.
Args:
fs (float, optional): sample frequency in Hz.
duration (float, optional): duration in seconds.
amplitude (float, optional): max amplitude of EEG signal.
f_low (float, optional): minimum frequency to be present in the EEG (in Hz).
f_high (float, optional): maximum frequency to be present in the EEG (in Hz).
n_f (int, optional): number of frequencies to add to the EEG.
seed (int, optional): seed for the random generator.
Returns:
signal (np.ndarray): 1D signal array.
"""
# Seed random generator.
np.random.seed(seed)
# Number of samples.
n_t = duration * fs
# Frequencies in Hz.
f_all = np.linspace(f_low+n_f/n_f, f_high, n_f)
# Power law coefficient.
k = 2
weights_all = 1*f_all**(-k)
weights_all[f_all < 1.5] = np.random.rand(np.sum(f_all < 1.5))
lags_all = np.random.rand(n_f) * 2*np.pi
# Frequencies in rad/s.
w_all = f_all*2*np.pi
t = np.arange(n_t)/fs
signal = np.zeros(n_t)
ma_filt = MovingAverage(fs=fs, numtaps=4)
for a, w, phi in zip(weights_all, w_all, lags_all):
random_factor = np.random.rand(n_t)
random_factor = ma_filt.filtfilt(random_factor)
signal += a*np.sin(w*t + phi)*random_factor
signal *= amplitude
return signal
[docs]def generate_timeseries(size, axis=-1, demean=False, seed=43):
"""
Generate a time series based on a random walk.
Args:
size (tuple or int): size of time series (number of samples).
axis (int): axis corresponding to time.
demean (bool): if True, subtracts the mean from the resulting random walk.
seed (int): seed for the random generator.
Returns:
x (np.ndarray): random walk time series.
"""
rng = np.random.default_rng(seed)
steps = rng.normal(size=size)
x = np.cumsum(steps, axis=axis)
if demean:
x -= np.mean(x, axis=axis, keepdims=True)
return x
[docs]def generate_timeseries_AR1(n, r, seed=None):
"""
Generate random timeseries from AR1 model.
Args:
n (int): number of samples to generate.
r (r): lag-1 autocorrelation.
seed (int): seed for random generator.
Returns:
yr (np.ndarray): generated time series (1D array).
"""
# Get a (seeded) random generator.
rng = np.random.default_rng(seed)
# Twice the decorrelation time.
tau = abs(int(np.ceil(-2 / np.log(np.abs(r)))))
# Compute red noise.
yr = lfilter([1, 0], [1, -r], rng.standard_normal(size=int(n + tau)))
yr = yr[tau:]
return yr.squeeze()
[docs]def insert_artefacts(x, n, min_len=1, max_len=None, axis=-1, random_state=None):
"""
Randomly insert n artefacts to x with at random locations with a specific
minimum and maximum length, along a given axis.
"""
# Default inputs.
random_state = check_random_state(random_state)
if max_len is None:
max_len = x.shape[axis] // 10
def _insert_artefacts(xi):
"""
Helper function for 1D case.
"""
# Copy xi to not insert nans inplace,
xi = xi.copy()
lens_all = random_state.randint(min_len, max_len, n)
for len_i in lens_all:
loc = random_state.choice(len(xi) - len_i, 1)[0]
xi[loc: loc + len_i] = np.nan
return xi
return do_for_axis(x, fun=_insert_artefacts, axis=axis)
[docs]def match_correlation(x, y, z):
"""
Return q = x + p*y, with p, such that corr(q, y) == corr(z, y).
"""
x = np.asarray(x)
y = np.asarray(y)
z = np.asarray(z)
assert len(x) == len(y) == len(z)
# Target correlation.
rt = np.corrcoef(z, y)[0, 1]
# Precompute sample standard deviations and covariance.
std_x = np.std(x, ddof=1)
std_y = np.std(y, ddof=1)
cov_xy = np.cov(x, y)[0, 1]
r_xy = cov_xy / (std_x * std_y)
# Compute scaling factor that leads to target correlation (own derivation, leads to abc-formula).
a = std_y ** 2 * (rt ** 2 - 1 ** 2)
b = 2 * (cov_xy * rt ** 2 - cov_xy)
c = std_x ** 2 * (rt ** 2 - r_xy ** 2)
dis = b ** 2 - 4 * a * c
if dis < 0:
raise ValueError('Solution does not exist.')
sqrt_dis = np.sqrt(dis)
# Two solutions.
p_12 = np.array([
(-b + sqrt_dis) / (2 * a),
(-b - sqrt_dis) / (2 * a)
])
q_12 = (x[np.newaxis, :] + np.outer(p_12, y))
# Choose the right one (the correlations are opposite in sign).
rp_12 = np.corrcoef(q_12, y)[:-1, -1]
p = p_12[np.argmin(np.abs(rp_12 - rt))]
q = x + p * y
# Check.
r_qy = np.corrcoef(q, y)[0, 1]
assert np.abs(r_qy - rt) < 1e-8
return q