"""
Resampling of a data array.
"""
import numpy as np
import scipy.signal
from scipy import interpolate
__all__ = [
'resample_by_filtering',
'resample_by_interpolation',
]
from nnsa.utils.arrays import interp_nan
[docs]def resample_by_filtering(x, fs, fs_new, **kwargs):
"""
Resample the signal x to new sampling frequency fs_new, using polyphase filtering.
Interpolated nans prior to resampling and puts nans back in resampled signal.
Args:
x (np.ndarray): signal to resample.
fs (flaot): current sample frequency of x.
fs_new (float): new sampling frequency to resample to.
**kwargs (optional): optional keyword arguments for scipy.signal.resample_poly.
Returns:
(np.ndarray): the resampled signal.
"""
axis = kwargs.get('axis', 0)
# Verify that current fs and fs_new are whole values.
if not float(fs).is_integer():
raise NotImplementedError('Resampling not implemented for non-integer sample frequency {}'.format(fs))
if not float(fs_new).is_integer():
# Try something else.
up = fs
down = fs/fs_new
if not float(down).is_integer():
raise ValueError('New sample frequency fs_new should be an integer. Got fs_new = {}'.format(fs_new))
else:
up = fs_new
down = fs
# Replace nans.
nan_mask = np.isnan(x)
x = interp_nan(x, axis=axis)
# x[nan_mask] = 0.0
# Do resampling using polyphase filtering.
x_resampled = scipy.signal.resample_poly(x, int(up), int(down), **kwargs)
# Create interpolation function to interpolate new locations of nans.
finterp = interpolate.interp1d(
np.arange(x.shape[axis]) / fs,
nan_mask.astype(float), assume_sorted=True, bounds_error=False, fill_value=1, **kwargs)
# Call interpolation function on new time array.
nan_mask_new = finterp(np.arange(x_resampled.shape[axis]) / fs_new) > 1e-10
# Put nans back.
x_resampled[nan_mask_new] = np.nan
return x_resampled
[docs]def resample_by_interpolation(x, t, fs_new, **kwargs):
"""
Resample the signal data to new sampling frequency fs_new, using 1d interpolation.
Args:
x (np.ndarray): signal to resample.
t (np.ndarray): time array corresponding to x. Must be sorted in an increasing order and have same length as x.
fs_new (float): new sampling frequency to resample to.
**kwargs (optional): optional keyword arguments for the scipy.interpolate.interp1d() function.
Returns:
(np.ndarray): the resampled signal.
Examples:
>>> np.random.seed(0)
>>> x = np.random.rand(100)
>>> t = np.random.rand(100)*100 # Simulate signal with average sample frequency of approximately 100 Hz.
>>> t.sort()
>>> resample_by_interpolation(x, t, fs_new=50, kind='quadratic')
array([0.5488135 , 0.58892456, 0.62690337, ..., 0.07706016, 0.05019593,
0.02297912])
"""
# New time array. Add a small number to stop to include this stop.
t_new = np.arange(t[0], t[-1] + 1/fs_new/1e10, 1/fs_new)
# Create interpolation function.
finterp = interpolate.interp1d(t, x, assume_sorted=True, **kwargs)
# Call interpolation function on new time array.
x_resampled = finterp(t_new)
return x_resampled