Source code for nnsa.utils.arrays

"""
This module contains functions dealing with numpy arrays.
"""
import warnings
from functools import partial

import numpy as np
import pandas as pd
import scipy.stats
import scipy.ndimage

from nnsa.utils.segmentation import segment_generator, get_all_segments

__all__ = [
    'apply_delay',
    'clamp',
    'count_nans',
    'do_for_axis',
    'get_bin_edges',
    'histogram_features_per_channel',
    'interp_nan',
    'mirror_boundaries',
    'moving_average',
    'moving_envelope',
    'moving_line_length',
    'moving_mad',
    'moving_max',
    'moving_mean',
    'moving_median',
    'moving_std',
    'nanfun',
    'strided_x',
]


[docs]def apply_delay(x, y, delay, fs=1, d_min=None, d_max=None): """ Delay x with `delay` seconds wrt y and truncate. Args: x (np.ndarray): 1D array to delay. y (np.ndarray): 1D array. delay (float or int): number of samples (or seconds if fs is given) to delay x with. fs (float, optional): sample frequency of x and y. If specified, the delay can be given in seconds. Returns: x_out (np.ndarray): delayed version of x, with length len(x) - abs(delay). y_out (np.ndarray): delayed version of y, same length as x. """ delay = int(round(fs * delay)) if delay <= 0: # Shift x left. x_out = x[-delay:] y_out = y[:len(x_out)] else: # > 0 # Shift x right (-> shift y left). y_out = y[delay:] x_out = x[:len(y_out)] if d_min is not None and d_max is not None: # Cut piece off such that for all delays, the same part of x data is selected. start_idx = min(-d_min + delay, -d_min) stop_idx = len(x_out) + max(delay - d_max, -d_max) x_out = x_out[start_idx: stop_idx] y_out = y_out[start_idx: stop_idx] return x_out, y_out
def asnumber(x, order=None): """ Convert (if needed) the values of an array to numbers. Args: x (np.ndarray): array with values to convert to numbers. order (list): if x is not numeric, the elements in `x` will be converted to evenly spaced numbers between 0 and 1, and the order controls the order of the elements in their new numeric form. Returns: x (np.ndarray): same shape as input, but the element are numeric. """ x = np.asarray(x) if order is None: # Default order. order = np.unique(x) order = list(order) if not np.issubdtype(x.dtype, np.number): # Convert to numbers. x = np.array([(order.index(xi) + 1) / len(order) if xi in order else 0 for xi in x]) return x
[docs]def count_nans(x, **kwargs): """ Count the number of NaN (Not a Number) values in array x. Args: x (np.ndarray): the array in which to count the NaNs. **kwargs (optional): optional keyword arguments for np.sum() to specify e.g. the axis along which to count the NaNs. Returns: (np.int32 or np.ndarray of np.int32): the """ return np.sum(np.isnan(x), **kwargs)
[docs]def clamp(x, neg_thres=-np.inf, pos_thres=np.inf): """ Apply a clampling function to the array x. References: L. Webb, M. Kauppila, J. A. Roberts, S. Vanhatalo, and N. J. Stevenson, “Automated detection of artefacts in neonatal EEG with residual neural networks,” Computer Methods and Programs in Biomedicine, vol. 208, p. 106194, Sep. 2021, doi: 10.1016/j.cmpb.2021.106194. """ if pos_thres < 0: raise ValueError('`pos_thres` must be positive.') if neg_thres > 0: raise ValueError('`neg_thres` must be negative.') neg_thres = -neg_thres # Make it positive, so that it is in agreement with Eq. (1) in Webb et al. x = np.asarray(x).copy() mask_pos = x > pos_thres mask_neg = x < -neg_thres x[mask_pos] = pos_thres * (np.log(x[mask_pos]) - np.log(pos_thres) + 1) x[mask_neg] = -neg_thres * (np.log(-x[mask_neg]) - np.log(neg_thres) + 1) return x
def cummean(x, axis=-1): """ Compute the cumulative mean of an array along an axis. Deals with nans by igoring them (same as np.nanmean). Args: x (np.ndarray): array. axis (int, optional): axis along which to compute the cumulative mean. Returns: x_cm (np.ndarray): array with cumulative mean. Has same shape as `x`. Examples: >>> np.random.seed(43) >>> x = np.random.rand(3, 1000) >>> n = x.shape[-1] >>> x[:, np.random.randint(low=0, high=n, size=int(0.2*n))] = np.nan >>> x_cm = cummean(x, axis=-1) >>> print(x_cm[:, -1]) [0.51089681 0.51458907 0.51665756] >>> print(np.nanmean(x, axis=-1)) [0.51089681 0.51458907 0.51665756] """ x_cs = np.nancumsum(x, axis=axis) count_cs = np.cumsum(~np.isnan(x), axis=axis) x_cm = x_cs / count_cs return x_cm
[docs]def do_for_axis(x, fun, axis): """ Apply a function operating on 1D arrays on a multidimensional array `x`, operating along the `axis`. Args: x (np.ndarray): multidimensional array. fun (function): function that takes in one argument: a 1D array. axis (int): the axis in x along which the function should be applied. *args, **kwargs: positional and keyword arguments for fun. Returns: result (np.ndarray): the result. Has the same number of dimensions as x. The length of the `axis` dimension might differ from `x` depending on what `fun` returns. Examples: >>> x = np.random.rand(10, 20, 30) # Mean along second axis using numpy. >>> mean1 = np.mean(x, axis=1, keepdims=True) >>> mean1.shape (10, 1, 30) # Mean along second axis using this function. >>> mean2 = do_for_axis(x, fun=np.mean, axis=1, keepdims=True) >>> mean2.shape (10, 1, 30) >>> np.max(np.abs(mean1 - mean2)) < 1e-15 True # Cumsum. >>> cumsum1 = np.cumsum(x, axis=1) >>> cumsum1.shape (10, 20, 30) >>> cumsum2 = do_for_axis(x, fun=np.cumsum, axis=1) >>> cumsum2.shape (10, 20, 30) >>> np.max(np.abs(cumsum1 - cumsum2)) < 1e-15 True """ # Swap axis and reshape, so that we have a 2D array with the `axis`` dimension as the last dimension. xn = x.swapaxes(axis, -1) interm_shape = xn.shape xn = xn.reshape(-1, interm_shape[-1]) # Call function one each 1D array, operating along `axis` and collect in an array. result = np.array(list(map(fun, xn))) # Reshape and swap back. new_shape = (interm_shape[:-1]) + (-1,) # The length of the `axis` dimension might have changed during the function call. result = result.reshape(new_shape).swapaxes(axis, -1) return result
[docs]def get_bin_edges(x, n_bins): """ Get bin edges for variable `x` and `n_bins`. Args: x (np.ndarray): array to divide into bins. n_bins (int): number of bins to divide into. Returns: bin_edges (np.ndarray): array with length `n_bins`+1, containing bin edges. """ x = np.asarray(x) xmin = np.nanmin(x) xmax = np.nanmax(x) bin_edges = np.linspace(xmin, xmax, n_bins + 1) # Make last bin a bit bigger such that xmax falls within last bin. binwidth = (xmax - xmin) / n_bins bin_edges[-1] += 1e-6 * binwidth return bin_edges
def get_range_mask(x, x_min=None, x_max=None): """ Return a mask such that all(x_min <= x[mask] <= x_max). Args: x (np.ndarray): 1D array. x_min (float, optional): lower limit. If None, no limit is set. x_max (float, optional): upper limit. If None, no limit is set. Returns: mask (np.ndarray): boolean array with the same shape as `x` indicating where `x` is within the provided range. """ mask = np.full(x.shape, fill_value=True) if x_min is not None: mask = np.logical_and(mask, x >= x_min) if x_max is not None: mask = np.logical_and(mask, x <= x_max) return mask
[docs]def histogram_features_per_channel(x, bins, channel_labels=None, ignore_nan=True,): """ Compute features of the distribution of the channel data in x using a histogram representation. To compute the histogram features on a 1D array, reshape it to a one-channel array, i.e. x.reshape(1, -1). Args: x (np.ndarray): array containing data with dimensions (channels, segments). bins (np.ndaaray or list): bin edges of the histogram, see np.histogram(). channel_labels (list, optional): list of the channel/feature labels corresponding to the rows of `x``. ignore_nan (bool, optional): if True, ignore nan values in `x`. Defaults to True. Returns: (pd.DataFrame): dataframe with the channel labels as row index and the histogram features as columns. """ # Feature labels. feature_labels = ['bin_{}'.format(i+1) for i in range(len(bins) - 1)] feature_labels.extend(['mean', 'median', 'std', 'iqr', 'skewness', 'kurtosis', 'q5', 'q95']) # Loop over channels. data = [] for x_ch in x: idx_nan = np.isnan(x_ch) if np.sum(np.isnan(x_ch)) == len(x_ch): # If all values are nan, set features to nan. features = np.full(28, np.nan) else: if ignore_nan: # Remove nan values. x_ch = x_ch[~idx_nan] # Compute histogram and its features. hist, bin_edges_ = np.histogram(x_ch, bins=bins, density=True) # Use the density for normalization. q5, q25, q50, q75, q95 = np.percentile(x_ch, [5, 25, 50, 75, 95]) features = np.concatenate((hist, [np.mean(x_ch), q50, np.std(x_ch), q75 - q25, scipy.stats.skew(x_ch), scipy.stats.kurtosis(x_ch), q5, q95 ])) data.append(features) data = np.vstack(data) return pd.DataFrame(data=data, index=channel_labels, columns=feature_labels)
[docs]def interp_nan(x, max_nan_length=None, axis=-1): """ Linearly interpolate nan values in x. Args: x (np.ndarray): data array. max_nan_length (int, optional): number of maximum consecutive nan samples to interpolate. If specified, data that is missing for more than `max_nan_lengh` is not interpolated. Instead, nans are remained. Additionally, no extrapolation is done if max_nan_length is specified. If None, all nan values are interpolated or extrapolated, no matter the length of the missing data. Defaults to None. axis (int, optional): the axis of `x` which to interpolate along. Defaults to -1. Returns: x_interp (np.ndarray): data array with same shape as `x`, where np.nan values have been linearly interpolated. Examples: >>> x = np.array([1, 1, 1, np.nan, np.nan, 2, 2, np.nan, 0]) >>> interp_nan(x) array([1. , 1. , 1. , 1.33333333, 1.66666667, 2. , 2. , 1. , 0. ]) >>> x = np.array([1, np.nan, np.nan, np.nan, np.nan, 2, 2, np.nan, 0]) >>> interp_nan(x, max_nan_length=3) array([ 1., nan, nan, nan, nan, 2., 2., 1., 0.]) """ def _interp_nan(x): # Get nan mask and not nan data values. nan_mask = np.isnan(x) not_nan_indices = (~nan_mask).nonzero()[0] not_nan_values = x[~nan_mask] if max_nan_length is None: # Interpolate all nans. nan_indices = nan_mask.nonzero()[0] else: nan_indices = [] # Get onsets and offsets of missing data parts. d = np.diff(nan_mask.astype(int)) idx_onsets = np.where(d == 1)[0] + 1 # First 1. idx_offsets = np.where(d == -1)[0] # Last 1. # If first offset is earlier than first onset: discard this part. if len(idx_offsets) > 0 and len(idx_onsets) > 0 and idx_offsets[0] < idx_onsets[0]: idx_offsets = idx_offsets[1:] # If last onset is later than last offset: discard this part. if len(idx_offsets) > 0 and len(idx_onsets) > 0 and idx_onsets[-1] > idx_offsets[-1]: idx_onsets = idx_onsets[:-1] # Collect indices to interpolate. for on, off in zip(idx_onsets, idx_offsets): duration = off - on + 1 if duration <= max_nan_length: nan_indices.extend(list(range(on, off + 1))) # Convert to array. nan_indices = np.array(nan_indices) # Make a copy of x, so we do not mutate the original array. x_interp = x.copy() if len(nan_indices) != 0: if len(not_nan_indices) != 0: x_interp[nan_indices] = np.interp(nan_indices, not_nan_indices, not_nan_values) else: raise ValueError('Array contains too many NaNs.') return x_interp # Apply along axis. x_interp = do_for_axis(np.asarray(x), fun=_interp_nan, axis=axis) return x_interp
[docs]def mirror_boundaries(x, n_left=0, n_right=0, axis=-1): """ Mirror array x at the start and end of the array, along a specified dimension. Args: x (np.ndarray): data array. n_left (int, optional): number of samples to mirror at the start of the array. Cannot be greater than x.shape[axis]. Defaults to 0. n_right (int, optional): number of samples to mirror at the end of the array. Cannot be greater than x.shape[axis]. Defaults to 0. axis (int, optional): axis along which to mirror. Defaults to -1. Returns: x (np.ndarray): original array x, with n_left mirrord samples appended to the left (before the start) and n_right mirrored samples added to the right (after the end). Examples: >>> x = np.arange(10) >>> mirror_boundaries(x) array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) >>> mirror_boundaries(x, n_left=3) array([3, 2, 1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) >>> mirror_boundaries(x, n_left=3, n_right=2) array([3, 2, 1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 8, 7]) >>> x2 = np.arange(8).reshape(4, 2) >>> x2 array([[0, 1], [2, 3], [4, 5], [6, 7]]) >>> mirror_boundaries(x2, n_left=2, n_right=1, axis=0) array([[4, 5], [2, 3], [0, 1], [2, 3], [4, 5], [6, 7], [4, 5]]) """ # Check inputs. max_n = x.shape[axis] if n_left > max_n: raise ValueError('n_left={} exceeds matrix dimension {}.'.format(n_left, max_n)) if n_right > max_n: raise ValueError('n_right={} exceeds matrix dimension {}.'.format(n_right, max_n)) # add_to_left = x[n_left:0:-1] for 1d arrays. slc = [slice(None)] * len(x.shape) slc[axis] = slice(n_left, 0, -1) add_to_left = x[tuple(slc)] # add_to_right = x[-2:-(n_right+2):-1] for 1d arrays. slc = [slice(None)] * len(x.shape) slc[axis] = slice(-2, -(n_right + 2), -1) add_to_right = x[tuple(slc)] # Concatenate. x = np.concatenate([add_to_left, x, add_to_right], axis=axis) return x
[docs]def moving_average(x, n, axis=-1, maintain_nan=True): """ Compute the moving (centered) average of `x`, with window size `n`. Ignores np.nan values, but outputs np.nan at locations where x is np.nan if maintain_nan is True. The output at index i is the average of a window of x, with the center of the window located at x[i]. Mirrors the input at the borders to reduce boundary effects. Adapted from: https://stackoverflow.com/questions/39919050/calculate-moving-average-in-numpy-array-with-nans. Args: x (np.ndarray): data array. n (int): window size/number of taps. If -1 or larger than 2*len(x), computes global (non-moving) average. axis (int, optional): axis along which to compute the moving average. Defaults to -1. maintain_nan (bool, optional): if True, output is nan at locations where x is nan. If False, the local average is returned even if the current sample was a nan. If all values in the local average are nan, returns nan at that index. Defaults to True. Returns: ret (np.ndarray): the moving average of `x`. Has same shape as `x`. counts (np.ndarray): the number of non-np.nan samples that participated in the average. Has same shape as `ret`. Examples: >>> x = np.arange(10).astype(float) >>> moving_average(x, n=3)[0] array([0.66666667, 1. , 2. , 3. , 4. , 5. , 6. , 7. , 8. , 8.33333333]) >>> moving_average(x, n=2)[0] array([0.5, 0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5]) >>> x[[2, 4]] = np.nan >>> x array([ 0., 1., nan, 3., nan, 5., 6., 7., 8., 9.]) >>> moving_average(x, n=3)[0] array([0.66666667, 0.5 , nan, 3. , nan, 5.5 , 6. , 7. , 8. , 8.33333333]) >>> moving_average(x, n=3, maintain_nan=False)[0] array([0.66666667, 0.5 , 2. , 3. , 4. , 5.5 , 6. , 7. , 8. , 8.33333333]) >>> x2 = np.arange(8).reshape(4, 2) >>> x2 array([[0, 1], [2, 3], [4, 5], [6, 7]]) >>> moving_average(x2, n=3, axis=0)[0] array([[1.33333333, 2.33333333], [2. , 3. ], [4. , 5. ], [4.66666667, 5.66666667]]) """ msg = '\nDeprecationWarning. Use moving_mean() instead of moving_average()[0].' warnings.warn(msg) x = np.asarray(x) if n is None or n == -1: # Return global mean (memory efficient). return np.nanmean(x, axis=axis, keepdims=True), np.sum(~np.isnan(x), axis=axis) if n >= 2*x.shape[axis]: msg = ('Parameter n={} too large for input of size {}. ' 'Taking a non-moving average.' .format(n, x.shape[axis])) warnings.warn(msg) movmean = np.ones_like(x) * np.nanmean(x, axis=axis, keepdims=True) if maintain_nan: movmean[np.isnan(x)] = np.nan return movmean, np.sum(~np.isnan(x), axis=axis) elif n >= x.shape[axis]: msg = ('Moving average window ({}) >= than signal length ({}). ' 'Taking a non-moving average.').format(n, x.shape[axis]) warnings.warn(msg) # Cast to float. if not isinstance(x, (np.float32, np.float64, float)): x = x.astype(float) n = int(n) # Mirror at boundaries to reduce boundary effects (alternative to zero-padding). if np.mod(n, 2) == 0: # n is even. n_mirror_left = int(n/2) n_mirror_right = int(n/2 - 1) else: # n is odd. n_mirror_left = int((n-1)/2) n_mirror_right = n_mirror_left xm = mirror_boundaries(x, n_mirror_left, n_mirror_right, axis=axis) # Create a masked array. xm = np.ma.masked_array(xm, np.isnan(xm)) # Cumulative sum, ignoring missing data. ret = np.cumsum(xm.filled(0), axis=axis) # Sum in windows of size n. ret[n:] = ret[n:] - ret[:,-n] for 1d arrays. slc1 = [slice(None)] * len(ret.shape) slc1[axis] = slice(n, None) slc2 = [slice(None)] * len(ret.shape) slc2[axis] = slice(None, -n) ret[tuple(slc1)] = ret[tuple(slc1)] - ret[tuple(slc2)] # Count of non-missing data in window size n. counts = np.cumsum(~xm.mask, axis=axis) counts[tuple(slc1)] = counts[tuple(slc1)] - counts[tuple(slc2)] # Compute mean. ret /= counts # Remove first n-1 samples (the number of samples we added to deal with boundary effects). slc3 = [slice(None)] * len(ret.shape) slc3[axis] = slice(n-1, None) ret = ret[tuple(slc3)] counts = counts[tuple(slc3)] if maintain_nan: # Set ret[i] to np.nan where x[i] is np.nan. ret[np.isnan(x)] = np.nan return ret, counts
[docs]def moving_line_length(x, n, fs=1, axis=-1, max_nan_frac=0.1): n = int(fs*n) d = np.diff(x, prepend=0, axis=axis) * fs # change per second. line_length = moving_mean(np.abs(d), n=n, axis=axis, max_nan_frac=max_nan_frac) return line_length
[docs]def moving_envelope(x, n, fs=1, axis=-1, max_nan_frac=0.1): n = int(fs*n) env = moving_mean(x**2, n=n, axis=axis, max_nan_frac=max_nan_frac) env = np.sqrt(2*env) return env
[docs]def moving_mad(x, n, axis=-1, maintain_nan=True, max_nan_frac=1, k=1.4826): """ Compute the moving (centered) median absolute deviation (MAD) of `x`, with window size `n`. The output at index i is the average of a window of x, with the center of the window located at x[i]. Mirrors the input at the borders to reduce boundary effects. Args: x (np.ndarray): data array. n (int): window size/number of taps. If n=-1 or None, computes metric over the entire array. axis (int, optional): axis along which to compute the moving metric. Defaults to -1. maintain_nan (bool, optional): if True, output is nan at locations where x is nan. If False, the local metric is returned even if the current sample was a nan (if other samples in window were not nan). Defaults to True. max_nan_frac (float): fraction between 0 and 1 specifying how many nans (proportion) are maximally allowed in a local window. If more nans are in the window, the output is nan for the corresponding output sample. k (float): scaling factor for MAD. If k=1.4826, MAD ~ std for Gaussian data. See also https://en.wikipedia.org/wiki/Median_absolute_deviation. Returns: running_val (np.ndarray): the moving MAD of `x`. Has same shape as `x`. Examples: >>> x = np.array([[0, 0, 0, 1 , 2 , 3 , 3, 2, 10, 10], ... [0, 0, 0, np.nan, np.nan, np.nan, 3, 2, 10, 10]]) >>> moving_mad(x, n=3, axis=-1) array([[0. , 0. , 0. , 1.4826, 1.4826, 0. , 0. , 1.4826, 0. , 0. ], [0. , 0. , 0. , nan, nan, nan, 0.7413, 1.4826, 0. , 0. ]]) >>> moving_mad(x, n=3, axis=0, maintain_nan=False, max_nan_frac=1) array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.], [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]]) >>> moving_mad(x, n=-1) array([[2.2239, 2.2239, 2.2239, 2.2239, 2.2239, 2.2239, 2.2239, 2.2239, 2.2239, 2.2239], [2.9652, 2.9652, 2.9652, nan, nan, nan, 2.9652, 2.9652, 2.9652, 2.9652]]) >>> moving_mad(x, n=3, max_nan_frac=0) array([[0. , 0. , 0. , 1.4826, 1.4826, 0. , 0. , 1.4826, 0. , 0. ], [0. , 0. , nan, nan, nan, nan, nan, 1.4826, 0. , 0. ]]) """ x = np.asarray(x) if n is not None and n >= 2 * x.shape[axis]: msg = ('Parameter n={} too large for input of size {}. ' 'Computing over entire window.' .format(n, x.shape[axis])) warnings.warn(msg) n = -1 if n is None or n == -1: # Compute one fixed (non-moving) value and fill array with it. med = np.nanmedian(x, axis=axis, keepdims=True) glob_val = k * nanfun(np.nanmedian, np.abs(x - med), axis=axis, max_nan_frac=max_nan_frac, keepdims=True) running_val = np.ones_like(x) * glob_val else: n = int(round(n)) if n >= x.shape[axis]: msg = ('Moving window ({}) >= than signal length ({}). ' 'Consider computing metric globally (non-moving).').format(n, x.shape[axis]) warnings.warn(msg) # Mirror at boundaries to reduce boundary effects (alternative to zero-padding). if np.mod(n, 2) == 0: # n is even. n_mirror_left = int(n/2) n_mirror_right = int(n/2 - 1) else: # n is odd. n_mirror_left = int((n-1)/2) n_mirror_right = n_mirror_left xm = mirror_boundaries(x, n_mirror_left, n_mirror_right, axis=axis) # Function to compute rolling metric on 1D arrays. def _mov_val(xi): # To pandas Series, so that we can use the fast rolling() functionality. ts = pd.Series(xi) # Rolling with center=True, so that the window is centered around the value. ts_roll = ts.rolling(window=n, center=True, min_periods=round(n * (1 - max_nan_frac))) # Compute running median. ts_mov_median = ts_roll.median() # Cut off padded values. mov_median = ts_mov_median.values[n_mirror_left: -n_mirror_right] # Create segmented array with stride 1. xm_seg = strided_x(xi, n=n, stride=1) # Compute MAD. x_mov = k * np.nanmedian(np.absolute(xm_seg - mov_median[:, None]), axis=1) return x_mov # Do for axis. running_val = do_for_axis(xm, fun=_mov_val, axis=axis) if maintain_nan: # Set running_val[i] to np.nan where x[i] is np.nan. running_val[np.isnan(x)] = np.nan return running_val
[docs]def moving_max(x, n, axis=-1, maintain_nan=True, max_nan_frac=1): """ Compute the moving (centered) max of `x`, with window size `n`. The output at index i is the average of a window of x, with the center of the window located at x[i]. Mirrors the input at the borders to reduce boundary effects. Args: x (np.ndarray): data array. n (int): window size/number of taps. If n=-1 or None, computes metric over the entire array. axis (int, optional): axis along which to compute the moving metric. Defaults to -1. maintain_nan (bool, optional): if True, output is nan at locations where x is nan. If False, the local metric is returned even if the current sample was a nan (if other samples in window were not nan). Defaults to True. max_nan_frac (float): fraction between 0 and 1 specifying how many nans (proportion) are maximally allowed in a local window. If more nans are in the window, the output is nan for the corresponding output sample. Returns: running_val (np.ndarray): the moving max of `x`. Has same shape as `x`. Examples: >>> x = np.array([[0, 0, 0, 1 , 2 , 3 , 3, 2, 10, 10], ... [0, 0, 0, np.nan, np.nan, np.nan, 3, 2, 10, 10]]) >>> moving_max(x, n=3, axis=-1) array([[ 0., 0., 1., 2., 3., 3., 3., 10., 10., 10.], [ 0., 0., 0., nan, nan, nan, 3., 10., 10., 10.]]) >>> moving_max(x, n=3, axis=0, maintain_nan=False, max_nan_frac=1) array([[ 0., 0., 0., 1., 2., 3., 3., 2., 10., 10.], [ 0., 0., 0., 1., 2., 3., 3., 2., 10., 10.]]) >>> moving_max(x, n=-1) array([[10., 10., 10., 10., 10., 10., 10., 10., 10., 10.], [10., 10., 10., nan, nan, nan, 10., 10., 10., 10.]]) >>> moving_max(x, n=3, max_nan_frac=0) array([[ 0., 0., 1., 2., 3., 3., 3., 10., 10., 10.], [ 0., 0., nan, nan, nan, nan, nan, 10., 10., 10.]]) """ x = np.asarray(x) if n is not None and n >= 2 * x.shape[axis]: msg = ('Parameter n={} too large for input of size {}. ' 'Computing over entire window.' .format(n, x.shape[axis])) warnings.warn(msg) n = -1 if n is None or n == -1: # Compute one fixed (non-moving) value and fill array with it. glob_val = nanfun(np.nanmax, x=x, axis=axis, max_nan_frac=max_nan_frac, keepdims=True) running_val = np.ones_like(x) * glob_val else: n = int(round(n)) if n >= x.shape[axis]: msg = ('Moving window ({}) >= than signal length ({}). ' 'Consider computing metric globally (non-moving).').format(n, x.shape[axis]) warnings.warn(msg) # Mirror at boundaries to reduce boundary effects (alternative to zero-padding). if np.mod(n, 2) == 0: # n is even. n_mirror_left = int(n / 2) n_mirror_right = int(n / 2 - 1) else: # n is odd. n_mirror_left = int((n - 1) / 2) n_mirror_right = n_mirror_left xm = mirror_boundaries(x, n_mirror_left, n_mirror_right, axis=axis) # Function to compute rolling metric on 1D arrays. def _mov_val(xi): # To pandas Series, so that we can use the fast rolling() functionality. ts = pd.Series(xi) # Rolling with center=True, so that the window is centered around the value. ts_roll = ts.rolling(window=n, center=True, min_periods=round(n * (1 - max_nan_frac))) # Compute metric. ts_mov_val = ts_roll.max() # Cut off padded values. x_mov = ts_mov_val.values[n_mirror_left: -n_mirror_right] return x_mov # Do for axis. running_val = do_for_axis(xm, fun=_mov_val, axis=axis) if maintain_nan: # Set running_val[i] to np.nan where x[i] is np.nan. running_val[np.isnan(x)] = np.nan return running_val
[docs]def moving_mean(x, n, axis=-1, maintain_nan=True, max_nan_frac=1): """ Compute the moving (centered) mean of `x`, with window size `n`. The output at index i is the average of a window of x, with the center of the window located at x[i]. Mirrors the input at the borders to reduce boundary effects. Args: x (np.ndarray): data array. n (int): window size/number of taps. If n=-1 or None, computes metric over the entire array. axis (int, optional): axis along which to compute the moving metric. Defaults to -1. maintain_nan (bool, optional): if True, output is nan at locations where x is nan. If False, the local metric is returned even if the current sample was a nan (if other samples in window were not nan). Defaults to True. max_nan_frac (float): fraction between 0 and 1 specifying how many nans (proportion) are maximally allowed in a local window. If more nans are in the window, the output is nan for the corresponding output sample. Returns: running_val (np.ndarray): the moving mean of `x`. Has same shape as `x`. Examples: >>> x = np.array([[0, 0, 0, 1 , 2 , 3 , 3, 2, 10, 10], ... [0, 0, 0, np.nan, np.nan, np.nan, 3, 2, 10, 10]]) >>> moving_mean(x, n=3, axis=-1) array([[ 0. , 0. , 0.33333333, 1. , 2. , 2.66666667, 2.66666667, 5. , 7.33333333, 10. ], [ 0. , 0. , 0. , nan, nan, nan, 2.5 , 5. , 7.33333333, 10. ]]) >>> moving_mean(x, n=3, axis=0, maintain_nan=False, max_nan_frac=1) array([[ 0., 0., 0., 1., 2., 3., 3., 2., 10., 10.], [ 0., 0., 0., 1., 2., 3., 3., 2., 10., 10.]]) >>> moving_mean(x, n=-1) array([[3.1 , 3.1 , 3.1 , 3.1 , 3.1 , 3.1 , 3.1 , 3.1 , 3.1 , 3.1 ], [3.57142857, 3.57142857, 3.57142857, nan, nan, nan, 3.57142857, 3.57142857, 3.57142857, 3.57142857]]) >>> moving_mean(x, n=3, max_nan_frac=0) array([[ 0. , 0. , 0.33333333, 1. , 2. , 2.66666667, 2.66666667, 5. , 7.33333333, 10. ], [ 0. , 0. , nan, nan, nan, nan, nan, 5. , 7.33333333, 10. ]]) """ x = np.asarray(x) if n is not None and n >= 2 * x.shape[axis]: msg = ('Parameter n={} too large for input of size {}. ' 'Computing over entire window.' .format(n, x.shape[axis])) warnings.warn(msg) n = -1 if n is None or n == -1: # Compute one fixed (non-moving) value and fill array with it. glob_val = nanfun(np.nanmean, x=x, axis=axis, max_nan_frac=max_nan_frac, keepdims=True) running_val = np.ones_like(x) * glob_val elif n == 1: return x else: n = int(round(n)) if n >= x.shape[axis]: msg = ('Moving window ({}) >= than signal length ({}). ' 'Consider computing metric globally (non-moving).').format(n, x.shape[axis]) warnings.warn(msg) # Mirror at boundaries to reduce boundary effects (alternative to zero-padding). if np.mod(n, 2) == 0: # n is even. n_mirror_left = int(n / 2) n_mirror_right = int(n / 2 - 1) else: # n is odd. n_mirror_left = int((n - 1) / 2) n_mirror_right = n_mirror_left xm = mirror_boundaries(x, n_mirror_left, n_mirror_right, axis=axis) # Function to compute rolling metric on 1D arrays. def _mov_val(xi): # To pandas Series, so that we can use the fast rolling() functionality. ts = pd.Series(xi) # Rolling with center=True, so that the window is centered around the value. ts_roll = ts.rolling(window=n, center=True, min_periods=round(n * (1 - max_nan_frac))) # Compute metric. ts_mov_val = ts_roll.mean() # Cut off padded values. x_mov = ts_mov_val.values[n_mirror_left: -n_mirror_right] return x_mov # Do for axis. running_val = do_for_axis(xm, fun=_mov_val, axis=axis) if maintain_nan: # Set running_val[i] to np.nan where x[i] is np.nan. running_val[np.isnan(x)] = np.nan return running_val
[docs]def moving_median(x, n, axis=-1, maintain_nan=True, max_nan_frac=1): """ Compute the moving (centered) median of `x`, with window size `n`. The output at index i is the average of a window of x, with the center of the window located at x[i]. Mirrors the input at the borders to reduce boundary effects. Args: x (np.ndarray): data array. n (int): window size/number of taps. If n=-1 or None, computes metric over the entire array. axis (int, optional): axis along which to compute the moving metric. Defaults to -1. maintain_nan (bool, optional): if True, output is nan at locations where x is nan. If False, the local metric is returned even if the current sample was a nan (if other samples in window were not nan). Defaults to True. max_nan_frac (float): fraction between 0 and 1 specifying how many nans (proportion) are maximally allowed in a local window. If more nans are in the window, the output is nan for the corresponding output sample. Returns: running_val (np.ndarray): the moving median of `x`. Has same shape as `x`. Examples: >>> x = np.array([[0, 0, 0, 1 , 2 , 3 , 3, 2, 10, 10], ... [0, 0, 0, np.nan, np.nan, np.nan, 3, 2, 10, 10]]) >>> moving_median(x, n=3, axis=-1) array([[ 0. , 0. , 0. , 1. , 2. , 3. , 3. , 3. , 10. , 10. ], [ 0. , 0. , 0. , nan, nan, nan, 2.5, 3. , 10. , 10. ]]) >>> moving_median(x, n=3, axis=0, maintain_nan=False, max_nan_frac=1) array([[ 0., 0., 0., 1., 2., 3., 3., 2., 10., 10.], [ 0., 0., 0., 1., 2., 3., 3., 2., 10., 10.]]) >>> moving_median(x, n=-1) array([[ 2., 2., 2., 2., 2., 2., 2., 2., 2., 2.], [ 2., 2., 2., nan, nan, nan, 2., 2., 2., 2.]]) >>> moving_median(x, n=3, max_nan_frac=0) array([[ 0., 0., 0., 1., 2., 3., 3., 3., 10., 10.], [ 0., 0., nan, nan, nan, nan, nan, 3., 10., 10.]]) """ x = np.asarray(x) if n is not None and n >= 2 * x.shape[axis]: msg = ('Parameter n={} too large for input of size {}. ' 'Computing over entire window.' .format(n, x.shape[axis])) warnings.warn(msg) n = -1 if n is None or n == -1: # Compute one fixed (non-moving) value and fill array with it. glob_val = nanfun(np.nanmedian, x=x, axis=axis, max_nan_frac=max_nan_frac, keepdims=True) running_val = np.ones_like(x) * glob_val else: n = int(round(n)) if n >= x.shape[axis]: msg = ('Moving window ({}) >= than signal length ({}). ' 'Consider computing metric globally (non-moving).').format(n, x.shape[axis]) warnings.warn(msg) # Mirror at boundaries to reduce boundary effects (alternative to zero-padding). if np.mod(n, 2) == 0: # n is even. n_mirror_left = int(n / 2) n_mirror_right = int(n / 2 - 1) else: # n is odd. n_mirror_left = int((n - 1) / 2) n_mirror_right = n_mirror_left xm = mirror_boundaries(x, n_mirror_left, n_mirror_right, axis=axis) # Function to compute rolling metric on 1D arrays. def _mov_val(xi): # To pandas Series, so that we can use the fast rolling() functionality. ts = pd.Series(xi) # Rolling with center=True, so that the window is centered around the value. ts_roll = ts.rolling(window=n, center=True, min_periods=round(n * (1 - max_nan_frac))) # Compute metric. ts_mov_val = ts_roll.median() # Cut off padded values. x_mov = ts_mov_val.values[n_mirror_left: -n_mirror_right] return x_mov # Do for axis. running_val = do_for_axis(xm, fun=_mov_val, axis=axis) if maintain_nan: # Set running_val[i] to np.nan where x[i] is np.nan. running_val[np.isnan(x)] = np.nan return running_val
def moving_quantile(x, n, q, axis=-1, maintain_nan=True, max_nan_frac=1): """ Compute the moving (centered) quantile of `x`, with window size `n`. The output at index i is the average of a window of x, with the center of the window located at x[i]. Mirrors the input at the borders to reduce boundary effects. Args: x (np.ndarray): data array. n (int): window size/number of taps. If n=-1 or None, computes metric over the entire array. q (float): qauntile to compute (value between 0 and 1). axis (int, optional): axis along which to compute the moving metric. Defaults to -1. maintain_nan (bool, optional): if True, output is nan at locations where x is nan. If False, the local metric is returned even if the current sample was a nan (if other samples in window were not nan). Defaults to True. max_nan_frac (float): fraction between 0 and 1 specifying how many nans (proportion) are maximally allowed in a local window. If more nans are in the window, the output is nan for the corresponding output sample. Returns: running_val (np.ndarray): the moving median of `x`. Has same shape as `x`. Examples: >>> x = np.array([[0, 0, 0, 1 , 2 , 3 , 3, 2, 10, 10], ... [0, 0, 0, np.nan, np.nan, np.nan, 3, 2, 10, 10]]) >>> moving_quantile(x, n=3, q=0.5, axis=-1) array([[ 0. , 0. , 0. , 1. , 2. , 3. , 3. , 3. , 10. , 10. ], [ 0. , 0. , 0. , nan, nan, nan, 2.5, 3. , 10. , 10. ]]) >>> moving_quantile(x, n=3, q=0.5, axis=0, maintain_nan=False, max_nan_frac=1) array([[ 0., 0., 0., 1., 2., 3., 3., 2., 10., 10.], [ 0., 0., 0., 1., 2., 3., 3., 2., 10., 10.]]) >>> moving_quantile(x, n=-1, q=0.5) array([[ 2., 2., 2., 2., 2., 2., 2., 2., 2., 2.], [ 2., 2., 2., nan, nan, nan, 2., 2., 2., 2.]]) >>> moving_quantile(x, n=3, q=0.5, max_nan_frac=0) array([[ 0., 0., 0., 1., 2., 3., 3., 3., 10., 10.], [ 0., 0., nan, nan, nan, nan, nan, 3., 10., 10.]]) """ x = np.asarray(x) if n is not None and n >= 2 * x.shape[axis]: msg = ('Parameter n={} too large for input of size {}. ' 'Computing over entire window.' .format(n, x.shape[axis])) warnings.warn(msg) n = -1 if n is None or n == -1: # Compute one fixed (non-moving) value and fill array with it. glob_val = nanfun(partial(np.nanquantile, q=q), x=x, axis=axis, max_nan_frac=max_nan_frac, keepdims=True) running_val = np.ones_like(x) * glob_val else: n = int(round(n)) if n >= x.shape[axis]: msg = ('Moving window ({}) >= than signal length ({}). ' 'Consider computing metric globally (non-moving).').format(n, x.shape[axis]) warnings.warn(msg) # Mirror at boundaries to reduce boundary effects (alternative to zero-padding). if np.mod(n, 2) == 0: # n is even. n_mirror_left = int(n / 2) n_mirror_right = int(n / 2 - 1) else: # n is odd. n_mirror_left = int((n - 1) / 2) n_mirror_right = n_mirror_left xm = mirror_boundaries(x, n_mirror_left, n_mirror_right, axis=axis) # Function to compute rolling metric on 1D arrays. def _mov_val(xi): # To pandas Series, so that we can use the fast rolling() functionality. ts = pd.Series(xi) # Rolling with center=True, so that the window is centered around the value. ts_roll = ts.rolling(window=n, center=True, min_periods=round(n * (1 - max_nan_frac))) # Compute metric. ts_mov_val = ts_roll.quantile(q) # Cut off padded values. x_mov = ts_mov_val.values[n_mirror_left: -n_mirror_right] return x_mov # Do for axis. running_val = do_for_axis(xm, fun=_mov_val, axis=axis) if maintain_nan: # Set running_val[i] to np.nan where x[i] is np.nan. running_val[np.isnan(x)] = np.nan return running_val
[docs]def moving_std(x, n, axis=-1, maintain_nan=True, max_nan_frac=1, ddof=0): """ Compute the moving (centered) standard deviation of `x`, with window size `n`. The output at index i is the average of a window of x, with the center of the window located at x[i]. Mirrors the input at the borders to reduce boundary effects. Args: x (np.ndarray): data array. n (int): window size/number of taps. If n=-1 or None, computes metric over the entire array. axis (int, optional): axis along which to compute the moving metric. Defaults to -1. maintain_nan (bool, optional): if True, output is nan at locations where x is nan. If False, the local metric is returned even if the current sample was a nan (if other samples in window were not nan). Defaults to True. max_nan_frac (float): fraction between 0 and 1 specifying how many nans (proportion) are maximally allowed in a local window. If more nans are in the window, the output is nan for the corresponding output sample. ddof (int): degrees of freedom for the denominator when computing the std (see np.std()). Returns: running_val (np.ndarray): the moving std of `x`. Has same shape as `x`. Examples: >>> x = np.array([[0, 0, 0, 1 , 2 , 3 , 3, 2, 10, 10], ... [0, 0, 0, np.nan, np.nan, np.nan, 3, 2, 10, 10]]) >>> moving_std(x, n=3, axis=-1) array([[0. , 0. , 0.47140452, 0.81649658, 0.81649658, 0.47140452, 0.47140452, 3.55902608, 3.77123617, 0. ], [0. , 0. , 0. , nan, nan, nan, 0.5 , 3.55902608, 3.77123617, 0. ]]) >>> moving_std(x, n=3, axis=0, maintain_nan=False, max_nan_frac=1) array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.], [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]]) >>> moving_std(x, n=-1) array([[3.6180105 , 3.6180105 , 3.6180105 , 3.6180105 , 3.6180105 , 3.6180105 , 3.6180105 , 3.6180105 , 3.6180105 , 3.6180105 ], [4.20398256, 4.20398256, 4.20398256, nan, nan, nan, 4.20398256, 4.20398256, 4.20398256, 4.20398256]]) >>> moving_std(x, n=3, max_nan_frac=0) array([[0. , 0. , 0.47140452, 0.81649658, 0.81649658, 0.47140452, 0.47140452, 3.55902608, 3.77123617, 0. ], [0. , 0. , nan, nan, nan, nan, nan, 3.55902608, 3.77123617, 0. ]]) """ if n is not None and n >= 2*x.shape[axis]: msg = ('Parameter n={} too large for input of size {}. ' 'Computing over entire window.' .format(n, x.shape[axis])) warnings.warn(msg) n = -1 if n is None or n == -1: # Compute one fixed (non-moving) value and fill array with it. glob_val = nanfun(np.nanstd, x=x, axis=axis, max_nan_frac=max_nan_frac, keepdims=True, ddof=ddof) running_val = np.ones_like(x) * glob_val else: n = int(round(n)) if n >= x.shape[axis]: msg = ('Moving window ({}) >= than signal length ({}). ' 'Consider computing metric globally (non-moving).').format(n, x.shape[axis]) warnings.warn(msg) # Mirror at boundaries to reduce boundary effects (alternative to zero-padding). if np.mod(n, 2) == 0: # n is even. n_mirror_left = int(n/2) n_mirror_right = int(n/2 - 1) else: # n is odd. n_mirror_left = int((n-1)/2) n_mirror_right = n_mirror_left xm = mirror_boundaries(x, n_mirror_left, n_mirror_right, axis=axis) # Function to compute rolling metric on 1D arrays. def _mov_val(xi): # To pandas Series, so that we can use the fast rolling() functionality. ts = pd.Series(xi) # Rolling with center=True, so that the window is centered around the value. ts_roll = ts.rolling(window=n, center=True, min_periods=round(n*(1 - max_nan_frac))) # Compute metric. ts_mov_val = ts_roll.std(ddof=ddof) # Cut off padded values. x_mov = ts_mov_val.values[n_mirror_left: -n_mirror_right] return x_mov # Do for axis. running_val = do_for_axis(xm, fun=_mov_val, axis=axis) if maintain_nan: # Set running_val[i] to np.nan where x[i] is np.nan. running_val[np.isnan(x)] = np.nan return running_val
def moving_std_old(x, n, axis=-1, maintain_nan=True): """ Compute the moving (centered) standard deviation of `x`, with window size `n`. Ignores np.nan values, but outputs np.nan at locations where x is np.nan if maintain_nan is True. The output at index i is the average of a window of x, with the center of the window located at x[i]. Mirrors the input at the borders to reduce boundary effects. Args: x (np.ndarray): data array. n (int): window size/number of taps. If n=-1, takes the STD of the entire array. axis (int, optional): axis along which to compute the moving std. Defaults to -1. maintain_nan (bool, optional): if True, output is nan at locations where x is nan. If False, the local std is returned even if the current sample was a nan. If all values in the local std are nan, returns nan at that index. Defaults to True. Returns: running_std (np.ndarray): the moving std of `x`. Has same shape as `x`. Examples: >>> x = np.array([ 0, 0, 0, 1, 2, 3, 3, 2, 10, 10]) >>> moving_std_old(x, n=3) array([0. , 0. , 0.47140452, 0.81649658, 0.81649658, 0.47140452, 0.47140452, 3.55902608, 3.77123617, 0. ]) """ if n is None or n == -1: # Return global SD (memory efficient). return np.nanstd(x, axis=axis, keepdims=True) if n >= 2*x.shape[axis]: msg = ('Parameter n={} too large for input of size {}. ' 'Taking a non-moving std.' .format(n, x.shape[axis])) warnings.warn(msg) movstd = np.ones_like(x) * np.nanstd(x, ddof=1, axis=axis, keepdims=True) if maintain_nan: movstd[np.isnan(x)] = np.nan return movstd elif n >= x.shape[axis]: msg = ('Moving std window ({}) >= than signal length ({}). ' 'Consider taking a non-moving std.').format(n, x.shape[axis]) warnings.warn(msg) # Mirror at boundaries to reduce boundary effects (alternative to zero-padding). if np.mod(n, 2) == 0: # n is even. n_mirror_left = int(n/2) n_mirror_right = int(n/2 - 1) else: # n is odd. n_mirror_left = int((n-1)/2) n_mirror_right = n_mirror_left xm = mirror_boundaries(x, n_mirror_left, n_mirror_right, axis=axis) # Segment. seg_generator = segment_generator(xm, segment_length=n, overlap=n-1, fs=1, axis=axis) # Compute std on each segment. running_std = [np.nanstd(seg, axis=axis, keepdims=True) for seg in seg_generator] running_std = np.concatenate(running_std, axis=axis) if maintain_nan: # Set running_std[i] to np.nan where x[i] is np.nan. running_std[np.isnan(x)] = np.nan return running_std
[docs]def nanfun(fun, x, axis=None, max_nan_frac=0.5, **kwargs): """ Compute things like nanmean(x, axis=axis), but set the output to np.nan if more than `max_nan_frac` proportion of the data was nan. Args: fun: a function that accepts an array and an axis parameter, such as np.nanmean, np.nanmedian. x (np.ndarray): array on which to apply `fun`. axis (int): the axis along which to apply the function. max_nan_frac (float): maximum fraction of nan. **kwargs (optional): for fun. Returns: y (np.ndarray): result of the fun(x, axis=axis) with nans where there were too many nans. Examples: >>> x = np.array([[0, 1, 2, 3, np.nan, np.nan, 6],[0, np.nan, np.nan, 3, np.nan, 5, np.nan]]) >>> nanfun(np.nanmean, x, axis=1, max_nan_frac=0.5) array([2.4, nan]) >>> nanfun(np.nanmean, x, axis=1, max_nan_frac=1) array([2.4 , 2.66666667]) """ nan_frac = np.mean(np.isnan(x), axis=axis) # I expect to see "RuntimeWarning: All-NaN slice encountered" in this block with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) y = fun(x, axis=axis, **kwargs) if not isinstance(y, np.ndarray): # y is a single value. if nan_frac > max_nan_frac: y = np.nan else: # y is an array. y[nan_frac > max_nan_frac] = np.nan return y
def remove_nans(*args): """ E.g. remove entries in x and y where x or y is nan. Args: *args (np.ndarray): data arrays with same shapes. Returns: out (tuple): arrays with same shape without. Examples: >>> x = np.arange(9, dtype=float) >>> y = np.arange(9, dtype=float) >>> x[x < 3] = np.nan >>> y[y > 6] = np.nan >>> xn, yn = remove_nans(x, y) >>> print(xn) [3. 4. 5. 6.] >>> print(yn) [3. 4. 5. 6.] """ # Determine nan mask (where any array is nan). nan_mask = np.any(np.array([np.isnan(x) for x in args]), axis=0) # Extract the non nan samples from all arrays. out = (np.asarray(x)[~nan_mask] for x in args) return out def slice_along_axis(x, axis, start=None, stop=None, step=None): """ Return a slice of x along a specified axis. Examples: >>> x = np.random.rand(4, 10, 3) >>> np.all(slice_along_axis(x, axis=1, stop=3) == x[:, :3, :]) True >>> np.all(slice_along_axis(x, axis=-1, start=1, stop=2) == x[:, :, 1:2]) True >>> np.all(slice_along_axis(x, axis=0, step=-1) == x[::-1, :, :]) True """ slc = [slice(None)] * len(x.shape) slc[axis] = slice(start, stop, step) return x[tuple(slc)] def stepwise_reduce(x, window, stepsize=None, reduce_fun=np.nanmean, fs=1, max_nan=0.5, axis=-1): """ Segment an array and reduce the values in the segment to 1 number using specified `reduce_fun`. Args: x (np.ndarray): array. window (float): length of the segments /moving window to compute the average in (in seconds). stepsize (float or None): stepsize of the moving average window in seconds. The output will have a sample frequency of 1/stepsize. If None, stepsize will be equal to window, i.e. no overlap. reduce_fun (function, optional): function to use to compute the average. Must handle nan and accept an axis keyword. Defaults to np.nanmean. fs (float, optional): sample frequency in Hz. If 1, window and stepsize are in number of sample. Defaults to 1. max_nan (float, optional): maximum fraction (0-1) of nan values in a window. If the fraction of nan samples exceeds `max_nan`, the mean is nan for that window. If None, skips this step. Defaults to 0.5. axis (int): axis to apply reduction on. Returns: x_red (np.array): array with the specified axis reduced. fs_red (float): new sample frequency of the reduced axis. """ x = np.asarray(x) if stepsize is None: stepsize = window # Segment as a large array. x_seg = get_all_segments(x, segment_length=window, overlap=window - stepsize, fs=fs, axis=axis) # Swap axis, the segment axis will become the output's time axis. if axis < 0: # Convert negative axis to positive. axis += x.ndim x_seg = x_seg.swapaxes(0, axis+1) x_red = reduce_fun(x_seg, axis=0) fs_red = 1 / stepsize # Set segments to nan if more than `max_nan`*100 % of the samples in the segment are nan. if max_nan is not None: nan_frac = np.mean(np.isnan(x_seg), axis=0) x_red[nan_frac > max_nan] = np.nan return x_red, fs_red
[docs]def strided_x(x, n, stride=1): """ Segment 1D array x using efficient striding. From this post: http://stackoverflow.com/a/40085052/3293881 Args: x (np.ndarray): 1D array to segment. n (int): segment length (samples). stride (int): the stride/step length (samples). Returns: x_seg = (np.ndarray): array with shape (-1, n). """ if x.ndim != 1: raise ValueError('`x` should be 1D. Got x.shape={}.'.format(x.shape)) nrows = ((x.size-n)//stride)+1 q = x.strides[0] x_seg = np.lib.stride_tricks.as_strided(x, shape=(nrows, n), strides=(stride*q, q)) return x_seg
def check_eeg_is_long(eeg, mode='transpose'): """ Verify that eeg input has shape (n_samples, n_channels), i.e., long format. Raises a ValueError if number of dimensions are not correct. The `mode` parameter determines what to do if the the second dimension is larger than the first. Args: eeg (np.ndarray): input array to check. mode (str): what to do when the second dimension is larger than the first (this is unexpected, since the time dimension is typically larger than the channel dimension): - 'tranpose': transpose the input array (assuming the user passed the EEG in the wrong orientation). Also a warning message is printed. - 'warn': only print a warning that there are more channels than time samples, but continue with that. - 'error': raise an error. Returns: eeg (np.ndarray): the same as the input array, but possibly transposed (if `mode` is set to 'transpose' and the number of rows was less than columns). """ mode = mode.lower() eeg = np.asarray(eeg) if eeg.ndim != 2: raise ValueError('Expected input array with shape (n_samples, n_channels). Got shape {}.' .format(eeg.shape)) if eeg.shape[1] > eeg.shape[0]: msg = '\nMore channels than samples in `eeg`. Might need to be transposed?\n' \ 'Input array shape should be (n_samples, n_channels). Got shape {}.'.format(eeg.shape) if mode == 'transpose': # Transpose and warn. eeg = eeg.T msg = '\nAutomatically transposed the input array to shape {}, ' \ 'assuming now it corresponds to (n_samples, n_channels).'.format(eeg.shape) warnings.warn(msg) elif 'warn' in mode: warnings.warn(msg) elif 'error' in mode: raise ValueError(msg) else: raise ValueError(f'Invalid option mode="{mode}".') return eeg def check_eeg_is_wide(eeg, mode='transpose'): """ Verify that eeg input has shape (n_channels, n_samples), i.e., wide format. Raises a ValueError if number of dimensions are not correct. The `mode` parameter determines what to do if the the first dimension is larger than the second. Args: eeg (np.ndarray): input array to check. mode (str): what to do when the first dimension is larger than the second (this is unexpected, since the time dimension is typically larger than the channel dimension): - 'tranpose': transpose the input array (assuming the user passed the EEG in the wrong orientation). Also a warning message is printed. - 'warn': only print a warning that there are more channels than time samples, but continue with that. - 'error': raise an error. Returns: eeg (np.ndarray): the same as the input array, but possibly transposed (if `mode` is set to 'transpose' and the number of rows was less than columns). """ mode = mode.lower() eeg = np.asarray(eeg) if eeg.ndim != 2: raise ValueError('Expected input array with shape (n_channels, n_samples). Got shape {}.' .format(eeg.shape)) if eeg.shape[0] > eeg.shape[1]: msg = '\nMore channels than samples in `eeg`. Might need to be transposed?\n' \ 'Input array shape should be (n_channels, n_samples). Got shape {}.'.format(eeg.shape) if mode == 'transpose': # Transpose and warn. eeg = eeg.T msg = '\nAutomatically transposed the input array to shape {}, ' \ 'assuming now it corresponds to (n_channels, n_samples).'.format(eeg.shape) warnings.warn(msg) elif 'warn' in mode: warnings.warn(msg) elif 'error' in mode: raise ValueError(msg) else: raise ValueError(f'Invalid option mode="{mode}".') return eeg