import numpy as np
import scipy.signal
from nnsa.utils.segmentation import get_all_segments
__all__ = [
'create_hankel',
]
[docs]def create_hankel(x, window, stride=None, fs=1, concat=True):
"""
Create a (block) Hankel expansion for signals in `x` with specific window length and stride.
Args:
x (np.ndarray): 1D or 2D array with signal(s). If 2D, the size should be (n_samples, n_channels).
window (float): length of a row of the hankel matrix in seconds (truncation order / fs).
stride (float, optional): stride between the rows of the hankel matrix in seconds. If None, the stride is one
sample (real hankel).
Defaults to None.
fs (float, optional): sample frequency in Hz. If 1, `window` and `stride` can be given in numbe of samples.
Defaults to 1.
concat (bool, optional): if True, concatenates the hankel matrices of each channel horizontally.
If False, a list of hankel matrices per channel is returned.
Defaults to True.
Returns:
x_h_all (list or np.ndarray): concatenation of channel-wise hankel matrices (if concat is True) or a
list of the channel-wise hankel matrices (if concat is False).
Examples:
>>> x = np.arange(10) # 10 samples.
>>> x = np.vstack((x, 10*x, 100*x)).T # 3 channels.
>>> print(x)
[[ 0 0 0]
[ 1 10 100]
[ 2 20 200]
[ 3 30 300]
[ 4 40 400]
[ 5 50 500]
[ 6 60 600]
[ 7 70 700]
[ 8 80 800]
[ 9 90 900]]
>>> xh = create_hankel(x, window=3, stride=2, fs=1, concat=True)
>>> print(xh)
[[ 0 1 2 0 10 20 0 100 200]
[ 2 3 4 20 30 40 200 300 400]
[ 4 5 6 40 50 60 400 500 600]
[ 6 7 8 60 70 80 600 700 800]]
"""
x = np.asarray(x)
if x.ndim == 1:
x = np.reshape(x, (-1, 1))
# Number of columns of the hankel matrix of one channel (truncation order).
num_cols = int(round(window*fs))
# Loop over channels.
x_h_all = []
for i in range(x.shape[-1]):
if stride is None:
# Hankelize without stride.
x_h = scipy.linalg.hankel(x[:-num_cols+1, i], x[-num_cols:, i])
else:
# Segmentize.
x_h = get_all_segments(x[:, i], segment_length=window, overlap=window-stride, fs=fs)
assert x_h.shape[-1] == num_cols
x_h_all.append(x_h)
if concat:
# Concatenate channels horizontally ([ch1, ch2, ch3, ...]).
x_h_all = np.hstack(x_h_all)
return x_h_all