Source code for nnsa.preprocessing.expansions

import numpy as np
import scipy.signal
from nnsa.utils.segmentation import get_all_segments


__all__ = [
    'create_hankel',
]


[docs]def create_hankel(x, window, stride=None, fs=1, concat=True): """ Create a (block) Hankel expansion for signals in `x` with specific window length and stride. Args: x (np.ndarray): 1D or 2D array with signal(s). If 2D, the size should be (n_samples, n_channels). window (float): length of a row of the hankel matrix in seconds (truncation order / fs). stride (float, optional): stride between the rows of the hankel matrix in seconds. If None, the stride is one sample (real hankel). Defaults to None. fs (float, optional): sample frequency in Hz. If 1, `window` and `stride` can be given in numbe of samples. Defaults to 1. concat (bool, optional): if True, concatenates the hankel matrices of each channel horizontally. If False, a list of hankel matrices per channel is returned. Defaults to True. Returns: x_h_all (list or np.ndarray): concatenation of channel-wise hankel matrices (if concat is True) or a list of the channel-wise hankel matrices (if concat is False). Examples: >>> x = np.arange(10) # 10 samples. >>> x = np.vstack((x, 10*x, 100*x)).T # 3 channels. >>> print(x) [[ 0 0 0] [ 1 10 100] [ 2 20 200] [ 3 30 300] [ 4 40 400] [ 5 50 500] [ 6 60 600] [ 7 70 700] [ 8 80 800] [ 9 90 900]] >>> xh = create_hankel(x, window=3, stride=2, fs=1, concat=True) >>> print(xh) [[ 0 1 2 0 10 20 0 100 200] [ 2 3 4 20 30 40 200 300 400] [ 4 5 6 40 50 60 400 500 600] [ 6 7 8 60 70 80 600 700 800]] """ x = np.asarray(x) if x.ndim == 1: x = np.reshape(x, (-1, 1)) # Number of columns of the hankel matrix of one channel (truncation order). num_cols = int(round(window*fs)) # Loop over channels. x_h_all = [] for i in range(x.shape[-1]): if stride is None: # Hankelize without stride. x_h = scipy.linalg.hankel(x[:-num_cols+1, i], x[-num_cols:, i]) else: # Segmentize. x_h = get_all_segments(x[:, i], segment_length=window, overlap=window-stride, fs=fs) assert x_h.shape[-1] == num_cols x_h_all.append(x_h) if concat: # Concatenate channels horizontally ([ch1, ch2, ch3, ...]). x_h_all = np.hstack(x_h_all) return x_h_all