"""
Functions for filtering of signals.
"""
import numpy as np
import scipy.signal
from nnsa.parameters.parameters import ClassWithParameters, Parameters
from nnsa.preprocessing.saved_filters import filter_saved_filter
from nnsa.utils.arrays import interp_nan
from nnsa.utils.dictionaries import itemize_items
from nnsa.utils.mathematics import mag2db
import matplotlib.pyplot as plt
__all__ = [
'FilterBase',
'Butterworth',
'RemezFIR',
'MovingAverage',
'NeatFIR',
'NotchIIR',
'WinFIR',
'filter_signal',
'remezord',
'_check_fir_specification',
'_equalize_transition_widths',
'_get_filter_type'
]
[docs]class FilterBase(ClassWithParameters):
"""
Base class for a filter object with common utilities.
Child classes inheriting from this class need to implement at least one of _compute_sos() or _compute_tf().
Depending on which of these are implemented, the filtfilt() method will call the appropriate function to do
the filtering (see filtfilt()).
Args:
fs (float, optional): frequency of the signal to filter in Hz.
tf (tuple, optional): tuple of arrays (b, a) containing the transfer function filter coefficients.
sos (tuple, optional): sos filter coefficients.
**kwargs: see ClassWithParameters.
"""
def __init__(self, tf=None, sos=None, fs=None, **kwargs):
super().__init__(**kwargs)
self._fs = fs
self._tf = tf
self._sos = sos
def __repr__(self):
attr_dict = self.__dict__.copy()
del attr_dict['_sos']
del attr_dict['_tf']
attributes = itemize_items(attr_dict.items())
return '{} with attributes:\n{}'.format(self.__class__.__name__, attributes)
[docs] @staticmethod
def default_parameters():
return Parameters()
@property
def fs(self):
"""
Sample frequency.
Returns:
self._fs (float): Sample frequency.
Raises:
AttributeError if self._fs is None.
"""
if self._fs is None:
# Raise an error if fs is called, but fs is not set.
raise AttributeError('Attribute `fs` required, but not set. Make sure to set the fs attribute (the '
'sampling frequency in Hz in the {} object).'.format(self.__class__.__name__))
return self._fs
@fs.setter
def fs(self, value):
"""
Set the sample frequency.
Resets the filter coefficients, since they most likely change due to the change of fs.
Args:
value (float): the new sample frequency
"""
if self._fs != value:
self._fs = value
self._sos = None
self._tf = None
@property
def sos(self):
"""
Second-order filter coefficients.
Returns:
self._sos (np.ndarray): see self._compute_sos().
"""
if self._sos is None:
self._sos = self._compute_sos()
return self._sos
@property
def tf(self):
"""
Transfer function coefficients.
Returns:
self._tf (np.ndarray): see self._compute_tf().
"""
if self._tf is None:
self._tf = self._compute_tf()
return self._tf
[docs] def compute_frequency_response(self, **kwargs):
"""
Compute the frequency response of the filter.
Returns:
w (np.ndarray): the normalized frequencies at which h was computed, in radians/sample,
see also scipy.signal.freqz().
h (np.ndarray): the frequency response., see also scipy.signal.freqz().
**kwargs: optional keyword arguments for the scipy function.
"""
if self.sos is not None:
sos = self.sos
w, h = scipy.signal.sosfreqz(sos, **kwargs)
elif self.tf is not None:
b, a = self.tf
w, h = scipy.signal.freqz(b, a, **kwargs)
else:
raise self._coefficients_error()
return w, h
[docs] def filtfilt(self, x, replace_nan=True, how='mean', **kwargs):
"""
Filter the signal x by applying a digital filter forward and backward to a signal for zero-phase filtering.
This is a wrapper of scipy.signal filtfilt functions. See scipy.signal.filtfilt() for more complete information.
Args:
x (np.ndarray): signal array.
replace_nan (bool, optional): if True, replaces nans in the input with zeros and places nans in the output
at the locations of nans in the input. Especially for IIR filters, this prevents the all subsequent
output to become nan if there is a nan sample in the input.
Defaults to True.
how (float, string): how to deal with nans. If a float, this will be the fill value.
If 'mean', nans will be replaced by the mean value.
If 'zeros', nans will be replaced by zeros.
If 'interpolate', nans will be interpolated linearly.
**kwargs (optional): see scipy.signal.sosfiltfilt().
Returns:
y (np.ndarray): the filtered output.
Raises:
NotImplementedError if neither self.sos or self.tf are defined.
"""
xp = self._preprocess(x, replace_nan, how, **kwargs)
# Call the appropriate filtfilt function corresponding to the form of the filter coefficients.
# First see if the transfer function (b, a) form is defined (faster) or else try the sos form.
if self.tf is not None:
b, a = self.tf
y = scipy.signal.filtfilt(b, a, xp, **kwargs)
elif self.sos is not None:
sos = self.sos
y = scipy.signal.sosfiltfilt(sos, xp, **kwargs)
else:
raise self._coefficients_error()
y = self._postprocess(x, y, replace_nan)
return y
[docs] def filter(self, x, replace_nan=True, how='mean', **kwargs):
"""
Filter the signal x by applying a digital filter.
This is a wrapper of scipy.signal lfilter and sosfilt functions. See scipy.signal.lfilter() and scipy.signal.sosfilt()
for more complete information.
Args:
x (np.ndarray): signal array.
replace_nan (bool, optional): if True, replaces nans in the input with zeros and places nans in the output
at the locations of nans in the input. Especially for IIR filters, this prevents the all subsequent
output to become nan if there is a nan sample in the input.
Defaults to True.
how (float, string): how to deal with nans. If a float, this will be the fill value.
If 'mean', nans will be replaced by the mean value.
If 'interpolate', nans will be interpolated linearly.
**kwargs (optional): see scipy.signal.lfilter() and scipy.signal.sosfilt().
Returns:
y (np.ndarray): the filtered output.
Raises:
NotImplementedError if neither self.sos or self.tf are defined.
"""
xp = self._preprocess(x, replace_nan, how, **kwargs)
# Call the appropriate filter function corresponding to the form of the filter coefficients.
# First see if the sos form is defined and then try the transfer function (b, a) form.
if self.sos is not None:
sos = self.sos
y = scipy.signal.sosfilt(sos, xp, **kwargs)
elif self.tf is not None:
b, a = self.tf
y = scipy.signal.lfilter(b, a, xp, **kwargs)
else:
raise self._coefficients_error()
y = self._postprocess(x, y, replace_nan)
return y
[docs] def plot_frequency_response(self, ax=None, angle=True, **kwargs):
"""
Plot the frequency response.
Args:
ax: matplotlib axis object for plotting. If None, it plots in the current axis.
Defaults to None.
angle (bool): plot the phase angle or not.
**kwargs: optional arguments for self.compute_frequency_response()
"""
# Compute the frequency response.
w, h = self.compute_frequency_response(**kwargs)
# Compute frequency array.
if self._fs is not None:
f = 0.5 * self.fs * w / np.pi
f_label = 'Frequency (Hz)'
else:
f = w
f_label = 'Frequency (rad/s)'
# Gain in dB.
log_h = 20 * np.log10(abs(h))
# Create an axis or set the current axis.
if ax is None:
ax = plt.gca()
else:
plt.sca(ax)
# Plot frequency response amplitude.
plt.axhline(-3, color=np.ones(3)*0.5, linestyle='--') # -3 dB line.
plt.plot(f, log_h, 'C0')
plt.ylabel('Amplitude (dB)', color='C0')
plt.xlabel(f_label)
plt.grid()
# Extract frequency bands (if in parameters) and indicate those in the plot.
wp = np.reshape(self.parameters.get('passband', None), -1)
ws = np.reshape(self.parameters.get('stopband', None), -1)
wn = np.reshape(self.parameters.get('fn', None), -1)
for edge in np.concatenate([wp, ws, wn], -1):
if edge is not None:
plt.axvline(edge, linestyle='--', color='green') # edge frequency
# Use a proper limit for the y-axis.
a = 2 * self.parameters.get('gstop', 40)
plt.ylim([-a, a / 10])
# Plot frequency response phase.
if angle:
ax2 = ax.twinx()
angles = np.unwrap(np.angle(h))
plt.plot(f, angles, 'C3')
plt.ylabel('Angle (radians)', color='C3')
# Figure make-up.
plt.title(self.__class__.__name__ + ' frequency response')
[docs] def plot_impulse_response(self, ax=None, n=50, **kwargs):
"""
Plot the impulse response.
Args:
ax: matplotlib axis object for plotting. If None, it plots in the current axis.
Defaults to None.
n (int): number of samples to plot the response for.
**kwargs: optional arguments for plt.stem().
"""
# Create an axis or set the current axis.
if ax is None:
ax = plt.gca()
else:
plt.sca(ax)
# Compute impulse response.
imp = scipy.signal.unit_impulse(n)
response = self.filter(imp)
# Plot the impulse response
plt.stem(response, **kwargs)
plt.xlabel('Sample')
plt.ylabel('Response')
plt.title(f'{self.__class__.__name__} impulse response')
plt.grid()
@staticmethod
def _coefficients_error():
"""
Error to be raised when coefficients of the filter are not implemented.
"""
return NotImplementedError('Filter object does not return sos or tf coefficients. Make sure either '
'_compute_sos() or _compute_tf() is implemented.')
def _compute_sos(self):
"""
Compute the second-order filter coefficients.
Optional method to be implemented when creating a child class.
Returns:
sos (np.ndarray): array of second-order filter coefficients, with shape (n_sections, 6).
Notes:
The returned value is the filter input for scipy.signal.sosfiltfilt().
"""
return None
def _compute_tf(self):
"""
Compute the filter's transfer function coefficients.
Optional method to be implemented when creating a child class.
Returns:
b (np.ndarray): numerator polynomial coefficients.
a (np.ndarray): denominator polynomial coefficients.
Notes:
The returned values are the filter inputs for scipy.signal.filtfilt().
"""
return None
@staticmethod
def _preprocess(x, replace_nan, how, **kwargs):
"""
Helper preprocessing routine before applying the filter.
"""
xp = x.copy()
axis = kwargs.get('axis', -1)
how = how.lower()
if replace_nan:
# Replace nans in the input with zeros.
nan_mask = np.isnan(x)
if isinstance(how, (int, float, complex)) and not isinstance(how, bool):
xp[nan_mask] = how
elif isinstance(how, str):
if how == 'interpolate':
xp = interp_nan(x, axis=axis)
elif how == 'zeros':
xp[nan_mask] = 0
elif how == 'mean':
# Demean, replace nan with 0 and add mean back.
mean = np.nanmean(xp, axis=axis, keepdims=True)
xp -= mean
xp[nan_mask] = 0 # Mean is zero for each channel.
xp += mean
else:
raise ValueError(f'Invalid choice "{how}" for `how` of type "{type(how)}".')
return xp
@staticmethod
def _postprocess(x, y, replace_nan):
"""
Helper postprocessing routine after applying the filter.
"""
if replace_nan:
# Put zeros in the output where the input was nan.
nan_mask = np.isnan(x)
y[nan_mask] = np.nan
# Maintain dtype.
y = y.astype(x.dtype)
return y
[docs]class Butterworth(FilterBase):
"""
Create a Butterworth filter.
Args:
see FilterBase.
Examples:
>>> fs = 2000
>>> t = np.linspace(0, 1.0, fs+1)
>>> xlow = np.sin(2 * np.pi * 5 * t) # 5 Hz
>>> xhigh = np.sin(2 * np.pi * 250 * t) # 250 Hz
>>> x = xlow + xhigh
>>> filt = Butterworth(fs=fs, passband=120, stopband=140)
>>> y = filt.filtfilt(x)
>>> print(np.abs(y - xlow).max())
0.004803688903207593
"""
[docs] @staticmethod
def default_parameters():
"""
Return the default parameters.
Returns:
(nnsa.Parameters): a default set of parameters for the object.
"""
pars = {
# Optionally specify order, cutoff frequencies in Hz and the filter type.
# If not specified, the below specifications will be used to estimate an
# appropriate order. If specified, some of the below specifications are ignored, while passband
# is taken as the cut-off frequencies.
'order': None,
'fn': None,
'filter_type': 'bandpass',
# Determine the type of coefficients ('sos' or 'ba').
# 'sos' is recommended for numerical stability.
# See also scipy.signal.butter().
'coef': 'sos',
# Passband and stopband edge frequencies in Hz (see scipy.signal.buttord).
'passband': None,
'stopband': None,
# Maximum loss in the passband in dB.
'gpass': 1,
# Minimum attenuation in the stopband in dB.
'gstop': 40,
}
return Parameters(**pars)
def _compute_tf(self):
if self.parameters['coef'] == 'ba':
return self._compute_coef()
else:
return None
def _compute_sos(self):
if self.parameters['coef'] == 'sos':
return self._compute_coef()
else:
return None
def _compute_coef(self):
"""
Compute the filter coefficients.
Returns:
sos: array of second-order filter coefficients, with shape (n_sections, 6).
"""
pars = self.parameters
fs = self.fs
fs_nyq = fs / 2
# Get filter specifications.
if pars['order'] is None:
# Normalize frequencies.
wp = np.asarray(pars['passband']) / fs_nyq
ws = np.asarray(pars['stopband']) / fs_nyq
gpass = pars['gpass']
gstop = pars['gstop']
# Determine filter type.
filter_type = _get_filter_type(wp, ws)
# Get filter order.
order, wn = scipy.signal.buttord(wp, ws, gpass, gstop, analog=False)
else:
if pars['fn'] is None:
raise ValueError('If `order` is specified, specify frequency cutoffs in `fn` (Hz).')
order = pars['order']
wn = np.asarray(pars['fn']) / fs_nyq
filter_type = pars['filter_type']
# Save the order as attributes to the object for later reference.
self.order = order
self.wn = wn
# Get filter coefficients.
coef = scipy.signal.butter(order, wn, btype=filter_type,
analog=False, output=pars['coef'])
return coef
class IIRFilter(FilterBase):
"""
Create an IIR filter of a specific order (see also scipy.signal.iirfilter).
Args:
see FilterBase.
Examples:
>>> fs = 2000
>>> t = np.linspace(0, 1.0, fs+1)
>>> xlow = np.sin(2 * np.pi * 5 * t) # 5 Hz
>>> xhigh = np.sin(2 * np.pi * 250 * t) # 250 Hz
>>> x = xlow + xhigh
>>> filt = IIRFilter(fs=fs, order=4, passband=120, stopband=140)
>>> y = filt.filtfilt(x)
>>> print(np.abs(y - xlow).max())
0.022381773486631007
"""
@staticmethod
def default_parameters():
"""
Return the default parameters.
Returns:
(nnsa.Parameters): a default set of parameters for the object.
"""
pars = {
# Specify order.
'order': 4,
# Edge frequencies in Hz.
'fn': [0.6, 40],
# Filter type.
'filter_type': 'bandpass',
# Filter type. See also ftype in scipy.signal.iirfilter. Options:
# Butterworth : ‘butter’
# Chebyshev I : ‘cheby1’
# Chebyshev II : ‘cheby2’
# Cauer/elliptic: ‘ellip’
# Bessel/Thomson: ‘bessel’
'ftype': 'butter',
# Determine the type of coefficients ('sos' or 'ba').
# 'sos' is recommended for numerical stability.
# See also scipy.signal.iirfilter().
'coef': 'sos',
# For Chebyshev and elliptic filters:
# Maximum loss in the passband in dB.
'gpass': 1,
# Minimum attenuation in the stopband in dB.
'gstop': 40,
}
return Parameters(**pars)
def _compute_tf(self):
if self.parameters['coef'] == 'ba':
return self._compute_coef()
else:
return None
def _compute_sos(self):
if self.parameters['coef'] == 'sos':
return self._compute_coef()
else:
return None
def _compute_coef(self):
"""
Compute the filter coefficients.
Returns:
sos: array of second-order filter coefficients, with shape (n_sections, 6).
"""
pars = self.parameters
fs = self.fs
fs_nyq = fs / 2
# Normalize frequencies.
wn = np.asarray(pars['fn']) / fs_nyq
gpass = pars['gpass']
gstop = pars['gstop']
# Determine filter type.
filter_type = pars['filter_type']
# Get filter order.
order = pars['order']
# Save the order as attributes to the object for later reference.
self.order = order
self.wn = wn
# Get filter coefficients.
coef = scipy.signal.iirfilter(order, wn, rp=gpass, rs=gstop,
btype=filter_type, ftype=pars['ftype'],
analog=False, output=pars['coef'])
return coef
[docs]class RemezFIR(FilterBase):
"""
Design a minimax optimal filter using the Remez exchange algorithm.
Args:
see FilterBase.
Examples:
>>> fs = 2000
>>> t = np.linspace(0, 1.0, fs+1)
>>> xlow = np.sin(2 * np.pi * 5 * t) # 5 Hz
>>> xhigh = np.sin(2 * np.pi * 250 * t) # 250 Hz
>>> x = xlow + xhigh
>>> filt = RemezFIR(fs=fs, passband=120, stopband=140)
>>> y = filt.filtfilt(x)
>>> print(np.abs(y - xlow).max())
0.05586442486553467
"""
[docs] @staticmethod
def default_parameters():
"""
Return the default parameters.
Returns:
(nnsa.Parameters): a default set of parameters for the object.
"""
pars = {
# Passband and stopband edge frequencies in Hz.
'passband': [0.6, 40],
'stopband': [0.1, 47],
# Maximum deviation (ripple) in the passband in dB.
'passband_ripple': 1,
# Minimum attenuation in the stopband in dB.
'stopband_attenuation': 40,
}
return Parameters(**pars)
def _compute_tf(self):
"""
Compute the filter's transfer function coefficients.
Returns:
b (np.ndarray): numerator polynomial coefficients.
a (np.ndarray): denominator polynomial coefficients.
Notes:
The returned values are the filter inputs for scipy.signal.filtfilt().
"""
# Maximum iterations for finding the filter order. NOTE: MATLAB sets this to 35 and if it fails to find a
# sufficient order within 35 iterations, it uses the initial order estimate.
max_iter = 70
# Extract parameters.
stopband = np.asarray(self.parameters['stopband'])
passband = np.asarray(self.parameters['passband'])
passband_ripple = self.parameters['passband_ripple']
stopband_attenuation = self.parameters['stopband_attenuation']
# In case of stopband or passband are passed as single numbers, convert them to 1D arrays.
if stopband.ndim == 0:
stopband = np.array([stopband])
if passband.ndim == 0:
passband = np.array([passband])
# Find out the type of filter.
filter_type = _get_filter_type(passband, stopband)
# Collect frequency bands in array.
frequencies = np.concatenate((stopband, passband))
# Normalize frequencies.
nyq = self.fs / 2
frequencies = frequencies / nyq
# Sort frequencies.
idx_sort = np.argsort(frequencies)
frequencies = frequencies[idx_sort]
# Create amplitude array (stopband -> 0, passband -> 1) corresponding to the frequency array.
desired_freq = np.concatenate((np.zeros(len(stopband)), np.ones(len(passband))))
desired_freq = desired_freq[idx_sort]
# Add the borders at 0 and fs/2 Hz.
frequencies = np.concatenate(([0], frequencies, [1]))
desired_freq = np.concatenate(([desired_freq[0]], desired_freq, [desired_freq[-1]])).astype(int)
# Equalize the transition widths.
frequencies_specs = np.copy(frequencies) # Keep a copy for testing with the original specs.
if 'band' in filter_type:
# Only relevant if bandpass or bandstop filter.
frequencies = _equalize_transition_widths(frequencies)
# Create desired array corresponding to the bands.
desired_band = desired_freq[::2]
# Convert the dB values of the ripple and attenuation to linear scale.
stopband_attenuation_lin = 10 ** (-stopband_attenuation / 20)
passband_ripple_lin = (10 ** (passband_ripple / 20) - 1) / (10 ** (passband_ripple / 20) + 1)
# Create a dev array with deviation gains corresponding to desired_freq.
stop_pass_dev = np.concatenate(([stopband_attenuation_lin], [passband_ripple_lin]))
deviations_freq = stop_pass_dev[desired_freq]
# Get relative deviations.
deviations_freq = deviations_freq / ((desired_freq == 0).astype(int) + desired_freq)
# Estimate order for the Remez algorithm.
numtaps, weight_band = remezord(frequencies, desired_freq, deviations_freq, fs=2)
# Compute filter coefficients using the Remez/Parks-McClellan algorithm.
b = scipy.signal.remez(numtaps=numtaps, bands=frequencies, desired=desired_band, weight=weight_band, fs=2)
# Collect stop and pass bands in matrices with size (n_bands, 2).
stopbands_specs = frequencies_specs[desired_freq == 0].reshape(-1, 2)
passbands_specs = frequencies_specs[desired_freq == 1].reshape(-1, 2)
# Check if the given filter specifications are met.
done = _check_fir_specification(b, stopbands_specs, passbands_specs, stopband_attenuation,
passband_ripple, fs=2)
# While specifications are not met, increase the filter order and try again.
count = 0
while not done and count < max_iter:
count += 1
# Increase filter order/length.
numtaps += 1
# Handle odd filter (even numtaps) orders for highpass and bandstop filters.
ftype = 'bandpass'
if np.fmod(numtaps, 2) == 0:
if filter_type == 'highpass':
# Use Hilbert.
ftype = 'hilbert'
elif filter_type == 'bandstop':
# Force even filter order.
numtaps += 1
# Compute filter coefficients.
b = scipy.signal.remez(numtaps=numtaps, bands=frequencies, desired=desired_band, weight=weight_band,
fs=2, type=ftype)
# Check if the given filter specifications are met.
done = _check_fir_specification(b, stopbands_specs, passbands_specs, stopband_attenuation, passband_ripple,
fs=2)
# Raise error if specs were not met.
if not done:
raise RuntimeError('Failed to design the filter: specs not met. '
'Consider increasing the maximum iterations, or relaxing the filter specifications.')
# For FIR filters, a = 1.
a = 1
return b, a
[docs]class MovingAverage(FilterBase):
"""
Create a moving average (smoothening) filter.
Args:
see FilterBase.
Examples:
>>> np.random.seed(43)
>>> N = 1000
>>> mean = 4.3
>>> x = mean + (np.random.rand(N) - 0.5)*abs(mean)
>>> filt = MovingAverage(numtaps=200)
>>> y = filt.filter(x)
# Original signal at idx 500.
>>> x[500]
5.322134095439112
# Moving averaged signal at idx 500.
>>> y[500]
4.340021494627845
"""
[docs] @staticmethod
def default_parameters():
"""
Return the default parameters.
Returns:
(nnsa.Parameters): a default set of parameters for the object.
"""
pars = {
# Size of the kernel (number of samples in moving average).
'numtaps': 10,
}
return Parameters(**pars)
def _compute_tf(self):
"""
Compute the filter's transfer function coefficients.
Returns:
b (np.ndarray): numerator polynomial coefficients.
a (np.ndarray): denominator polynomial coefficients.
Notes:
The returned values are the filter inputs for scipy.signal.filtfilt().
"""
# Create kernel.
numtaps = self.parameters['numtaps']
b = np.ones(numtaps)/numtaps
a = 1 # For FIR filters, a equals 1.
return b, a
[docs]class NeatFIR(FilterBase):
"""
Create an assymetric FIR filter with specifications as done in the NEAT toolbox for MATLAB.
References:
Z. A. Vesoulis, P. G. Gamble, S. Jain, N. M. E. Ters,
S. M. Liao, and A. M. Mathur, “WU-NEAT: A clinically validated,
open- source MATLAB toolbox for limited-channel neonatal EEG analysis,”
2018-05-11.
Args:
see FilterBase.
"""
[docs] @staticmethod
def default_parameters():
"""
Return the default parameters.
Returns:
(nnsa.Parameters): a default set of parameters for the object.
"""
pars = {
# Numtaps for the filter. If None, choose it automatically depending on
# the sampling frequency (see _compute_tf()).
'numtaps': None
}
return Parameters(**pars)
def _compute_tf(self):
"""
Compute the filter's transfer function coefficients.
Returns:
b (np.ndarray): numerator polynomial coefficients.
a (np.ndarray): denominator polynomial coefficients.
Notes:
The returned values are the filter inputs for scipy.signal.filtfilt().
"""
numtaps = self.parameters['numtaps']
fs = self.fs
if numtaps is None:
# Make numtaps depend on the sample frequency. At fs=64 Hz, use 300 taps.
numtaps = int(round(300 * fs / 64))
# Make numtaps odd, so that delay is an integer number of samples.
if np.mod(numtaps, 2) == 0:
numtaps += 1
# Collect filter specs.
freq = [0, 1.6, 2.0, 14.4, 15.36, fs / 2]
desired = [0, 0, 0.73, 3.58, 0, 0]
# Design filter. Note: use firwin2 here, not the Parks-McClellan algorithm (not implemented for
# assymetric filters in scipy).
b = scipy.signal.firwin2(numtaps, freq, desired, fs=fs)
a = 1 # For FIR filters, a equals 1.
return b, a
[docs]class NotchIIR(FilterBase):
"""
Create an IIR notch (bandstop) filter.
Args:
see FilterBase.
Examples:
>>> fs = 2000
>>> t = np.linspace(0, 1.0, fs+1)
>>> xlow = np.sin(2 * np.pi * 5 * t) # 5 Hz
>>> xhigh = np.sin(2 * np.pi * 250 * t) # 250 Hz
>>> x = xlow + xhigh
>>> filt = NotchIIR(fs=fs, f0=250)
>>> y = filt.filtfilt(x)
>>> print(y[10])
0.5425567000090112
"""
[docs] @staticmethod
def default_parameters():
"""
Return the default parameters.
Returns:
(nnsa.Parameters): a default set of parameters for the object.
"""
pars = {
# Frequency to suppress/remove in Hz:
'f0': 50,
# Quality factor. Dimensionless parameter that characterizes notch filter -3 dB bandwidth bw relative to
# its normalized center frequency, Q = w0/bw:
'Q': 30,
}
return Parameters(**pars)
def _compute_tf(self):
"""
Compute the filter's transfer function coefficients.
Returns:
b (np.ndarray): numerator polynomial coefficients.
a (np.ndarray): denominator polynomial coefficients.
Notes:
The returned values are the filter inputs for scipy.signal.filtfilt().
"""
f0 = self.parameters['f0']
quality_factor = self.parameters['Q']
fs = self.fs
b, a = scipy.signal.iirnotch(w0=f0, Q=quality_factor, fs=fs)
return b, a
[docs]class WinFIR(FilterBase):
"""
FIR filter design using the window method.
Args:
see FilterBase.
Examples:
TODO
"""
[docs] @staticmethod
def default_parameters():
"""
Return the default parameters.
Returns:
(nnsa.Parameters): a default set of parameters for the object.
"""
pars = {
# See scipy.signal.firwin().
'numtaps': None,
'cutoff': None,
'pass_zero': 'bandpass',
}
return Parameters(**pars)
def _compute_tf(self):
"""
Compute the filter's transfer function coefficients.
Returns:
b (np.ndarray): numerator polynomial coefficients.
a (np.ndarray): denominator polynomial coefficients.
Notes:
The returned values are the filter inputs for scipy.signal.filtfilt().
"""
fs = self.fs
a = 1
numtaps = self.parameters['numtaps']
cutoff = self.parameters['cutoff']
pass_zero = self.parameters['pass_zero']
if numtaps is None:
numtaps = max([int(fs * 3 + 1), 31])
b = scipy.signal.firwin(numtaps=int(numtaps), cutoff=cutoff,
fs=fs, pass_zero=pass_zero)
return b, a
[docs]def filter_signal(x, specification, fs=None, axis=-1, **kwargs):
"""
Filter the signal with the specified filter.
If specification is a string, then a saved filter will be loaded and the signal will be filtered with this
loaded filter(see filter_saved_filter()).
If specification is a FilterBase object, the filtering will be perfomed by the filtfilt() function
of that FilterBase object.
Note that the filters designed by scipy.signal package may be different from the filters as designed by MATLAB.
To reproduce the result of MATLAB filtering, its best to create a new saved filter which laods/creates the
filter coefficients using MATLAB (e.g. see filter_bandpassfir_a() and get_matlab_bandpassfir_coefs() in
nnsa.preprocessing.saved_filters)
Args:
x (np.ndarray): signal to be filtered.
specification (str, FilterBase or None): if a string, it is interpreted as a filter_name of
a saved filter (see nnsa.filter_saved_filter()). If a FilterBase object, the objects filtfilt()
function is used to do the filtering. If None, no filtering is applied to the segment.
fs (float, optional): sample frequency of signal to be filtered. Needed for some saved filters that design
the filter, see the code which saved_filter requires fs. If the fs is not needed, you need not specify fs
(by default None will be assigned to fs). However, if uncertain, you may always pass the sample frequency.
axis (int, optional): the axis of x to which the filter is applied. Default is -1.
**kwargs (optional): optional keyword arguments passed on to the saved filter function (in the end these
are typically passed on to the function from scipy.signal that performs the actual filtering).
Returns:
signal_filtered (np.ndarray): the filtered signal.
"""
if isinstance(specification, str):
# filter_specification is filter_name of a saved filter.
signal_filtered = filter_saved_filter(x,
filter_name=specification,
fs=fs,
axis=axis,
**kwargs)
elif isinstance(specification, FilterBase):
# Set the sample frequency (the FilterBase class resets any precomputed coefficients if necessary).
if fs is not None:
specification.fs = fs
signal_filtered = specification.filtfilt(x, axis=axis, **kwargs)
else:
raise ValueError('Invalid filter specification {}.'.format(specification))
return signal_filtered
[docs]def remezord(bands, desired, deviations, fs=1):
"""
Estimate the order of the FIR filter as computed with the Remez algorithm.
Inspired by MATLAB's firpmord function.
Args:
bands (np.ndarray): a monotonic sequence containing the band edges (including 0 and fs/2 Hz).
desired (np.ndarray): a sequence the same size of bands containing the desired gain in each of the specified
frequencies.
deviations (np.ndarray): sequence the same size of bands containing the maximum deviations or ripples (in
linear units) allowable for each band.
fs (float, optional): sample frequency. Used for normalization of bands.
Returns:
numtaps (int): length of the filter.
weights (np.array): a relative weighting to give to each band region
"""
def remord(freq1, freq2, delta1, delta2):
"""
Estimate the FIR filter length of a lowpass or highpass filter.
Ported from MATLAB's remlpord function in firpmord.m
Args:
freq1 (float): passband cutoff frequency (normalized).
freq2 (float): stopband cutoff frequency (normalized).
delta1 (float): passband ripple (linear).
delta2 (float): stopband attenuation (linear).
Returns:
numtaps (int): filter length.
Notes:
Will not work well if transition zone is near f = 0, or near f = fs/2.
References:
[1] Rabiner & Gold, Theory and Appications of DSP, pp. 156-7.
"""
aa = np.array([[-4.278e-01, -4.761e-01, 0],
[-5.941e-01, 7.114e-02, 0],
[-2.660e-03, 5.309e-03, 0]])
d1 = np.log10(delta1)
d2 = np.log10(delta2)
d = np.array([1, d1, d1*d1]) @ aa @ np.array([1, d2, d2*d2])
bb = np.array([11.01217, 0.51244])
fk = np.array([1.0, (d1 - d2)]) @ bb
df = abs(freq2 - freq1)
numtaps = np.ceil(d/df - fk*df + 1)
return int(numtaps)
# Normalize to sample frequency.
bands = bands/fs
# Verify that the bands between 0 and 1/2 (Nyquist frequency).
if any(bands > 1/2) or any(bands < 0):
raise ValueError('Invalid bands. Bands must lie within [0, fs/2].')
# For each transition band, use formula (ref: Herrmann, Rabiner, Chan) to estimate order. Use the highest order.
numtaps = 0
for i in range(1, len(bands)-1, 2):
freqs = bands[i: i+2]
des = desired[i: i+2].astype(int)
devs = deviations[i: i+2]
# Passband cutoff frequency and ripple.
fpass = freqs[des == 1][0]
ripple = devs[des == 1][0]
# Stopband cutoff frequency and attenuation.
fstop = freqs[des == 0][0]
attenuation = devs[des == 0][0]
numtaps_new = remord(fpass, fstop, ripple, attenuation)
# MATLAB does this? (see firpmord.m):
# numtaps_new = remord(freqs[0], freqs[1], ripple, attenuation)
if numtaps_new > numtaps:
numtaps = numtaps_new
# Compute the weights from the deviations.
weight_freq = np.ones(len(deviations)) * np.max(deviations) / deviations
# Extract the weights per band (instead of per frequency) to be compatible with the input of scipy.signal.remez().
weight = weight_freq[::2]
# If gain is not zero at nyquist, the order must be even.
# If the order is odd (numtaps is even), we bump up the numtaps by one.
if desired[-1] != 0 and np.fmod(numtaps, 2) == 0:
numtaps += 1
return numtaps, weight
def _check_fir_specification(b, stopbands, passbands, stopband_attenuation, passband_ripple, fs=1):
"""
Check if the FIR filter with coefficients b meets the given filter specifications.
Args:
b (np.ndarray): FIR filter coefficients.
stopbands (np.ndarray): array with shape (nbands, 2) containing the edge frequencies of the specified
stopbands.
passbands (np.ndarray): array with shape (nbands, 2) containing the edge frequencies of the specified
passbands.
stopband_attenuation (float): specified stopband attenuation in dB.
passband_ripple (float): specified passband ripple in dB.
fs (float, optional): sample frequency.
Defaults to 1.
Returns:
(bool): True is specifications are met and False if not.
"""
# Number of frequency points to evaluate the frequency response at within a pass or stop band.
nf = 2**12
# Check attenuation at the stopbands.
for band in stopbands:
f_start, f_end = band
# Get magnitude of the frequency response in the band.
h = np.abs(scipy.signal.freqz(b, a=1,
worN=np.linspace(f_start, f_end, nf),
fs=fs)[1])
# Measure attenuation defined as the minimum attenuation in dB.
attenuation = -mag2db(np.max(h))
if attenuation <= stopband_attenuation:
# Specifications not met.
return False
for band in passbands:
f_start, f_end = band
# Get magnitude of the frequency response in the band.
h = np.abs(scipy.signal.freqz(b, a=1,
worN=np.linspace(f_start, f_end, nf),
fs=fs)[1])
# The ripple is defined as the amplitude (dB) variation within the passband.
ripple = mag2db(np.max(h)) - mag2db(np.min(h))
if ripple >= passband_ripple:
# Specifications not met.
return False
# If not returned already, specs are met.
return True
def _equalize_transition_widths(frequencies):
"""
Adjust the edge frequencies of the outer band so that transition widths at both sides of the middle band are equal.
Shrinks the largest transition band by adjusting the outer edge frequency (i.e. does not alter the middle band).
Args:
frequencies (np.ndarray): sequence of 6 monotonically increasing frequencies representing edge frequencies.
Returns:
frequencies (np.ndarray): sequence of 6 monotonically increasing frequencies representing edge frequencies, with
euqla transition widths around the middle band.
Examples:
>>> _equalize_transition_widths([0, 0.1, 0.6, 40, 47, 128])
array([0.00e+00, 1.00e-01, 6.00e-01, 4.00e+01, 4.05e+01, 1.28e+02])
"""
frequencies = np.asarray(frequencies)
if len(frequencies) != 6:
raise ValueError('Input should have length 6. Got len(frequencies) = {}.'.format(len(frequencies)))
# Extract edge frequencies of middle band and outer bands.
middle_band = frequencies[2:4]
outer_band = frequencies[[1, 4]]
if len(middle_band) != 2 or len(outer_band) != 2:
raise ValueError('middle_band and outer_band must have length 2.')
if middle_band[0] < outer_band[0] or middle_band[1] > outer_band[1]:
raise ValueError('Invalid edge frequencies. The middle_band lie in the outer_band interval.')
transition_widths = np.abs(middle_band - outer_band)
if transition_widths[0] < transition_widths[1]:
frequencies[4] = middle_band[1] + transition_widths[0]
elif transition_widths[0] > transition_widths[1]:
frequencies[1] = middle_band[0] - transition_widths[1]
return frequencies
def _get_filter_type(wp, ws):
"""
Determine the filter type based on edge frequencies of the pass and stop band.
Args:
wp (float or np.ndarray): edge frequencies of the passband.
ws (float or np.ndarray): edge frequencies of the stopband.
Returns:
filter_type (str): one of 'lowpass', 'highpass', 'bandpass', 'bandstop'.
Notes:
The edge frequencies may be given in any unit, as long as all frequencies are given in the same unit.
Edge frequencies do not include the 0 and maximum frequency (Nyqvist frequency).
Examples:
>>> _get_filter_type(40, 47)
'lowpass'
>>> _get_filter_type(47, 40)
'highpass'
>>> _get_filter_type([0.6, 40], [0.1, 47])
'bandpass'
>>> _get_filter_type([0.1, 47], [0.6, 40])
'bandstop'
"""
d = np.asarray(wp) - np.asarray(ws)
if np.size(d) == 1:
if d < 0:
filter_type = 'lowpass'
elif d > 0:
filter_type = 'highpass'
else:
raise ValueError('Invalid passband and/or stopband frequencies. Cannot determine lowpass or highpass.')
elif np.size(d) == 2:
if d[0] > 0 > d[1]:
filter_type = 'bandpass'
elif d[0] < 0 < d[1]:
filter_type = 'bandstop'
else:
raise ValueError('Invalid passband and/or stopband frequencies. Cannot determine bandpass or bandstop.')
else:
raise ValueError('Invalid length of stop- and passband edge frequencies.')
return filter_type
def plot_filter_response(w, h):
"""
Plot the frequency response of a digital filter.
Args:
w (np.ndarray): the frequencies at which h was computed.
h (np.ndarray): the frequency response, as complex numbers.
"""
# Compute frequency array and gain array in dB.
f = w
log_h = 20 * np.log10(abs(h))
# Plot frequency response amplitude.
fig = plt.figure()
ax1 = fig.add_subplot(111)
plt.plot(f, log_h, 'C0')
plt.ylabel('Amplitude (dB)', color='C0')
plt.xlabel('Frequency (Hz)')
# Use a proper limit for the y-axis.
a = 2 * 40
plt.ylim([-a, a / 10])
# Plot frequency response phase.
ax2 = ax1.twinx()
angles = np.unwrap(np.angle(h))
plt.plot(f, angles, 'C3')
plt.ylabel('Angle (radians)', color='C3')
# Figure make-up.
plt.title('Digital filter frequency response')
plt.grid()