Source code for nnsa.preprocessing.resample

"""
Resampling of a data array.
"""
import numpy as np
import scipy.signal
from scipy import interpolate

__all__ = [
    'resample_by_filtering',
    'resample_by_interpolation',
]

from nnsa.utils.arrays import interp_nan


[docs]def resample_by_filtering(x, fs, fs_new, **kwargs): """ Resample the signal x to new sampling frequency fs_new, using polyphase filtering. Interpolated nans prior to resampling and puts nans back in resampled signal. Args: x (np.ndarray): signal to resample. fs (flaot): current sample frequency of x. fs_new (float): new sampling frequency to resample to. **kwargs (optional): optional keyword arguments for scipy.signal.resample_poly. Returns: (np.ndarray): the resampled signal. """ axis = kwargs.get('axis', 0) # Verify that current fs and fs_new are whole values. if not float(fs).is_integer(): raise NotImplementedError('Resampling not implemented for non-integer sample frequency {}'.format(fs)) if not float(fs_new).is_integer(): # Try something else. up = fs down = fs/fs_new if not float(down).is_integer(): raise ValueError('New sample frequency fs_new should be an integer. Got fs_new = {}'.format(fs_new)) else: up = fs_new down = fs # Replace nans. nan_mask = np.isnan(x) x = interp_nan(x, axis=axis) # x[nan_mask] = 0.0 # Do resampling using polyphase filtering. x_resampled = scipy.signal.resample_poly(x, int(up), int(down), **kwargs) # Create interpolation function to interpolate new locations of nans. finterp = interpolate.interp1d( np.arange(x.shape[axis]) / fs, nan_mask.astype(float), assume_sorted=True, bounds_error=False, fill_value=1, **kwargs) # Call interpolation function on new time array. nan_mask_new = finterp(np.arange(x_resampled.shape[axis]) / fs_new) > 1e-10 # Put nans back. x_resampled[nan_mask_new] = np.nan return x_resampled
[docs]def resample_by_interpolation(x, t, fs_new, **kwargs): """ Resample the signal data to new sampling frequency fs_new, using 1d interpolation. Args: x (np.ndarray): signal to resample. t (np.ndarray): time array corresponding to x. Must be sorted in an increasing order and have same length as x. fs_new (float): new sampling frequency to resample to. **kwargs (optional): optional keyword arguments for the scipy.interpolate.interp1d() function. Returns: (np.ndarray): the resampled signal. Examples: >>> np.random.seed(0) >>> x = np.random.rand(100) >>> t = np.random.rand(100)*100 # Simulate signal with average sample frequency of approximately 100 Hz. >>> t.sort() >>> resample_by_interpolation(x, t, fs_new=50, kind='quadratic') array([0.5488135 , 0.58892456, 0.62690337, ..., 0.07706016, 0.05019593, 0.02297912]) """ # New time array. Add a small number to stop to include this stop. t_new = np.arange(t[0], t[-1] + 1/fs_new/1e10, 1/fs_new) # Create interpolation function. finterp = interpolate.interp1d(t, x, assume_sorted=True, **kwargs) # Call interpolation function on new time array. x_resampled = finterp(t_new) return x_resampled