Source code for nnsa.utils.dummy_data

import numpy as np
from scipy.signal import lfilter

from sklearn.utils import check_random_state

from nnsa.preprocessing.filter import MovingAverage
from nnsa.utils.arrays import mirror_boundaries, do_for_axis

__all__ = [
    'add_artefacts',
    'generate_eeg',
    'generate_timeseries',
    'generate_timeseries_AR1',
    'generate_af_data',
    'insert_artefacts',
    'match_correlation',
]


[docs]def add_artefacts(x, p, min_len=1, max_len=None, seed=None, fill_value=np.nan): """ Add artefacts to x with lengths from `min_len` to `max_len` each with a probability of `p`. """ # Get a (seeded) random generator. rng = np.random.default_rng(seed) # Some variables. n = len(x) if max_len is None: # Take 10% of signal length by default. max_len = np.ceil(n*0.1) max_len = int(max_len) # Precompute some things. y = x.copy() p_all = rng.random(max_len) add_all = p_all < p len_all = range(min_len, max_len) idx_all = rng.choice(range(int(n-max_len/2)), size=len(len_all), replace=True) for af_len, add, idx_start in zip(len_all, add_all, idx_all): if add: idx_stop = min([n, idx_start + af_len]) y[idx_start: idx_stop] = fill_value return y
[docs]def generate_af_data(n_repeat, fs, n_samples, n_cos=100, snr=None, n_pad=None, identical=True, af_len=1, squeeze=True): """ Generate test data with artefacts. """ # Predefine and compute some things. t = np.arange(n_samples) / fs f_min = 1 / t[-1] f_max = fs / 2 f_all = np.logspace(f_min, f_max, n_cos) tf0 = np.outer(t, f_all) # Artefact locations and lengths. locs = (np.array([0.05, 0.12, 0.2, 0.35, 0.5, 0.8]) * n_samples).astype(int) lens = (np.array([0.0025, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2]) * n_samples / 2 * af_len).astype(int) # Padding. pad_ar = np.full(n_pad, fill_value=np.nan) # Loop over repetitions. x_all = [] y_all = [] x_true_all = [] y_true_all = [] for _ in range(n_repeat): tf = tf0 + np.random.rand(1, n_cos) * 2 * np.pi amps = np.random.rand(1, n_cos) x = np.mean(amps * np.cos(2 * np.pi * tf), axis=-1) if identical: y = x.copy() else: tf = tf0 + np.random.rand(1, n_cos) * 2 * np.pi amps = np.random.rand(1, n_cos) y = np.mean(amps * np.cos(2 * np.pi * tf), axis=-1) if snr is not None: # Add noise. x_noise = np.random.rand(*x.shape) - 0.5 gain = np.sqrt(np.nanvar(x) / np.nanvar(x_noise) / snr) x += gain * x_noise y_noise = np.random.rand(*y.shape) - 0.5 gain = np.sqrt(np.nanvar(y) / np.nanvar(y_noise) / snr) y += gain * y_noise # Pad edges to simulate no coupling beyond edges (by default zero padding both will lead to coupling). if n_pad is not None: x = np.concatenate([pad_ar, x, pad_ar]) y = mirror_boundaries(y, n_left=n_pad, n_right=n_pad, axis=-1) assert len(x) == len(y) x_true_all.append(x.copy()) y_true_all.append(y.copy()) # Add artefacts. for ii, (af_loc, af_len) in enumerate(zip(locs, lens)): start_idx = (n_pad if n_pad is not None else 0) + int(af_loc - af_len / 2) if np.random.rand(1) > 0.5: x[start_idx:start_idx + af_len] = np.nan else: y[start_idx:start_idx + af_len] = np.nan x_all.append(x) y_all.append(y) x_all = np.asarray(x_all) y_all = np.asarray(y_all) x_true_all = np.asarray(x_true_all) y_true_all = np.asarray(y_true_all) if squeeze: x_all = np.squeeze(x_all) y_all = np.squeeze(y_all) x_true_all = np.squeeze(x_true_all) y_true_all = np.squeeze(y_true_all) return x_all, y_all, x_true_all, y_true_all
[docs]def generate_eeg(fs=250, duration=600, amplitude=75, f_low=0, f_high=20, n_f=50, seed=None): """ Generate a random toy EEG example. Args: fs (float, optional): sample frequency in Hz. duration (float, optional): duration in seconds. amplitude (float, optional): max amplitude of EEG signal. f_low (float, optional): minimum frequency to be present in the EEG (in Hz). f_high (float, optional): maximum frequency to be present in the EEG (in Hz). n_f (int, optional): number of frequencies to add to the EEG. seed (int, optional): seed for the random generator. Returns: signal (np.ndarray): 1D signal array. """ # Seed random generator. np.random.seed(seed) # Number of samples. n_t = duration * fs # Frequencies in Hz. f_all = np.linspace(f_low+n_f/n_f, f_high, n_f) # Power law coefficient. k = 2 weights_all = 1*f_all**(-k) weights_all[f_all < 1.5] = np.random.rand(np.sum(f_all < 1.5)) lags_all = np.random.rand(n_f) * 2*np.pi # Frequencies in rad/s. w_all = f_all*2*np.pi t = np.arange(n_t)/fs signal = np.zeros(n_t) ma_filt = MovingAverage(fs=fs, numtaps=4) for a, w, phi in zip(weights_all, w_all, lags_all): random_factor = np.random.rand(n_t) random_factor = ma_filt.filtfilt(random_factor) signal += a*np.sin(w*t + phi)*random_factor signal *= amplitude return signal
[docs]def generate_timeseries(size, axis=-1, demean=False, seed=43): """ Generate a time series based on a random walk. Args: size (tuple or int): size of time series (number of samples). axis (int): axis corresponding to time. demean (bool): if True, subtracts the mean from the resulting random walk. seed (int): seed for the random generator. Returns: x (np.ndarray): random walk time series. """ rng = np.random.default_rng(seed) steps = rng.normal(size=size) x = np.cumsum(steps, axis=axis) if demean: x -= np.mean(x, axis=axis, keepdims=True) return x
[docs]def generate_timeseries_AR1(n, r, seed=None): """ Generate random timeseries from AR1 model. Args: n (int): number of samples to generate. r (r): lag-1 autocorrelation. seed (int): seed for random generator. Returns: yr (np.ndarray): generated time series (1D array). """ # Get a (seeded) random generator. rng = np.random.default_rng(seed) # Twice the decorrelation time. tau = abs(int(np.ceil(-2 / np.log(np.abs(r))))) # Compute red noise. yr = lfilter([1, 0], [1, -r], rng.standard_normal(size=int(n + tau))) yr = yr[tau:] return yr.squeeze()
[docs]def insert_artefacts(x, n, min_len=1, max_len=None, axis=-1, random_state=None): """ Randomly insert n artefacts to x with at random locations with a specific minimum and maximum length, along a given axis. """ # Default inputs. random_state = check_random_state(random_state) if max_len is None: max_len = x.shape[axis] // 10 def _insert_artefacts(xi): """ Helper function for 1D case. """ # Copy xi to not insert nans inplace, xi = xi.copy() lens_all = random_state.randint(min_len, max_len, n) for len_i in lens_all: loc = random_state.choice(len(xi) - len_i, 1)[0] xi[loc: loc + len_i] = np.nan return xi return do_for_axis(x, fun=_insert_artefacts, axis=axis)
[docs]def match_correlation(x, y, z): """ Return q = x + p*y, with p, such that corr(q, y) == corr(z, y). """ x = np.asarray(x) y = np.asarray(y) z = np.asarray(z) assert len(x) == len(y) == len(z) # Target correlation. rt = np.corrcoef(z, y)[0, 1] # Precompute sample standard deviations and covariance. std_x = np.std(x, ddof=1) std_y = np.std(y, ddof=1) cov_xy = np.cov(x, y)[0, 1] r_xy = cov_xy / (std_x * std_y) # Compute scaling factor that leads to target correlation (own derivation, leads to abc-formula). a = std_y ** 2 * (rt ** 2 - 1 ** 2) b = 2 * (cov_xy * rt ** 2 - cov_xy) c = std_x ** 2 * (rt ** 2 - r_xy ** 2) dis = b ** 2 - 4 * a * c if dis < 0: raise ValueError('Solution does not exist.') sqrt_dis = np.sqrt(dis) # Two solutions. p_12 = np.array([ (-b + sqrt_dis) / (2 * a), (-b - sqrt_dis) / (2 * a) ]) q_12 = (x[np.newaxis, :] + np.outer(p_12, y)) # Choose the right one (the correlations are opposite in sign). rp_12 = np.corrcoef(q_12, y)[:-1, -1] p = p_12[np.argmin(np.abs(rp_12 - rt))] q = x + p * y # Check. r_qy = np.corrcoef(q, y)[0, 1] assert np.abs(r_qy - rt) < 1e-8 return q