Source code for nnsa.feature_extraction.aeeg

"""
Code related to amplitude-integrated EEG (aEEG) computation.
Contains multiple algorithms for aEEG computation, some based on MATLAB code.
compute_aeeg_tw fully runs in Python, which should make it the fastest.
"""
import h5py
import scipy.signal
import numpy as np

from nnsa.parameters.parameters import ClassWithParameters, Parameters
from nnsa.feature_extraction.result import ResultBase
from nnsa.utils.arrays import interp_nan
from nnsa.utils.config import HORIZONTAL_RULE
from nnsa.utils.segmentation import segment_generator


__all__ = [
    'AmplitudeEeg',
    'AmplitudeEegResult',
    'compute_aeeg_cc',
    'compute_aeeg_neat',
    'compute_aeeg_tw',
    'filter_aeeg_zhang',
]


[docs]class AmplitudeEeg(ClassWithParameters): """ Class for emulating an amplitude-integrated (aEEG) signal from single channel continuous EEG. Multiple different algorithms can be used, see self.default_parameters(). Main method: amplitude_eeg() or process(). Args: eng (matlab.engine.matlabengine.MatlabEngine, optional): Deprecated. MATLAB engine to use for calling MATLAB functions. The required paths must already have been initialized (see self.eng()). Only relevant if an algorithm that requires MATLAB is used. If None, a new MATLAB engine will be started (which takes some time). Defaults to None. **kwargs (optional): see nnsa.ClassWithParameters. Examples: >>> fs = 256 >>> t = 1/fs*np.arange(100000) >>> x = 4 * np.sin(t) >>> AEEG = AmplitudeEeg(method='neat', dtype=np.float32) >>> print(type(AEEG.parameters).__name__) Parameters >>> result = AEEG.wct(x, fs=fs, verbose=0) >>> print(type(result).__name__) AmplitudeEegResult >>> result.aeeg array([0.21567792, 0.21360768, 0.21153691, ..., 0.08440158, 0.0844016 , 0.08440162], dtype=float32) """ def __init__(self, eng=None, **kwargs): super().__init__(**kwargs) self._eng = eng @property def eng(self): """ Return a MATLAB engine. Returns: (matlab.engine.matlabengine.MatlabEngine): MATLAB engine. """ from nnsa.matlab.utils import get_matlab_engine if self._eng is None: # Start Matlab engine and set paths. self._eng = get_matlab_engine() return self._eng
[docs] @staticmethod def default_parameters(): """ Return the default parameters. Returns: (nnsa.Parameters): a default set of parameters for the object. """ pars = { # The method/algorithm to use for computation of the aEEG. # Choose from 'CC'*, 'NEAT', 'TW', 'ZHANG'. # *these options run on MATLAB in the background. # See the functions in this module for each of these methods for more information # (e.g. compute_aeeg_tw()). 'method': 'NEAT', # Optional additional keyword arguments/parameters for the method/function that computes the aEEG. # These keyword arguments depend on the method specified above. E.g. if method is set to 'TW', see the # function compute_aeeg_tw() for the optional keyword argument that you can specify here: 'method_kwargs': {}, # Whether to convert the aEEG to a semilog scale (<10uV linear, >10uV logarithmic). 'semilog': True, # The data type / precision for the aEEG values (might save space when saving). 'dtype': np.float32, } return Parameters(**pars)
[docs] def amplitude_eeg(self, eeg, fs, label='aEEG', verbose=0): """ Compute the aEEG signal. Samples that are nan in the input will be nan in the output. Args: eeg (np.ndarray): single channel EEG signal. fs (float): sample frequency of `eeg` in Hz. label (str, optional): label for the aEEG signal. Defaults to 'aEEG'. verbose (int, optional): verbose level. Defaults to 0. Returns: (nnsa.AmplitudeEegResult): object containing the aEEG. """ # Make sure input is a 1D array. eeg = np.asarray(eeg).squeeze() if eeg.ndim != 1: raise ValueError('`eeg` should be a 1-dimensional array. Got array with shape {}.' .format(eeg.shape)) if verbose > 0: print(HORIZONTAL_RULE) print('Computing aEEG with parameters:') print(self.parameters) # Interpolate nans. nan_mask = np.isnan(eeg) eeg = interp_nan(eeg) # Extract some parameters. method = self.parameters['method'].lower() method_kwargs = self.parameters['method_kwargs'] # Call the corresponding function to compute envelope. if method == 'cc': aeeg = compute_aeeg_cc(eeg=eeg, fs=fs, eng=self.eng) elif method == 'neat': aeeg = compute_aeeg_neat(eeg=eeg, fs=fs, **method_kwargs) elif method == 'tw': aeeg = compute_aeeg_tw(eeg=eeg, fs=fs, **method_kwargs) elif method == 'zhang': aeeg = compute_aeeg_zhang(eeg=eeg, fs=fs, **method_kwargs) else: raise ValueError('Invalid method "{}". Choose from "".' .format(method, ['CC', 'NEAT', 'TW', 'Zhang'])) if self.parameters['semilog']: # To semilog scale. aeeg = to_semilog(aeeg, make_copy=False) # Apply nan mask. aeeg[nan_mask] = np.nan # Set dtype. aeeg = aeeg.astype(self.parameters['dtype']) result = AmplitudeEegResult(aeeg=aeeg, algorithm_parameters=self.parameters, fs=fs, label=label) return result
[docs] def process(self, *args, **kwargs): """ Shortcut to main function. """ return self.amplitude_eeg(*args, **kwargs)
[docs]class AmplitudeEegResult(ResultBase): """ High-level interface for processing the results of aEEG computation as created by nnsa.AmplitudeEeg(). Args: aeeg (np.ndarray): the aEEG signal with sample frequency `fs` in semilog-scale (amplitudes 0-10 linear and amplitude > 10 logarithmic). fs (float): sample frequency of aeeg. algorithm_parameters (nnsa.Parameters): see ResultBase. label (str, optional): label of the aEEG signal. Defaults to 'aEEG'. data_info (str, optional): see ResultBase. """ def __init__(self, aeeg, fs, algorithm_parameters, label=None, data_info=None, time_offset=0): super().__init__(algorithm_parameters=algorithm_parameters, data_info=data_info, fs=fs, time_offset=time_offset) if self.is_discontinuous(): raise ValueError('Data is discontinuous. This is invalid for the {} class.'.format(self.__class__.__name__)) # Store variables that are not already stored by the parent class (ResultBase). self.aeeg = np.asarray(aeeg).squeeze() self.label = label @property def num_segments(self): """ Return the number of segments (number of samples). Returns: (int): number of segments/samples. """ return self.aeeg.shape[-1]
[docs] def extract_margins(self, segment_length=None, fs=None, q_lower=10, q_upper=90, max_nan=0.5, correct_delay=False): """ Extract upper and lower margin amplitudes by extracting the low and high percentiles in (overlapping) segments. Args: segment_length (float, optional): length of the segments/moving window to compute the margins in (in seconds). If None, a default value is taken based on the aEEG conversion method used. Defaults to None. fs (float): desired sample frequency of the returned margins in Hz. If None, a default value is taken based on the aEEG conversion method used. Defaults to None. q_lower (float, optional): percentile for the lower margin (0-100). Defaults to 10. q_upper (float, optional): percentile for the upper margin (0-100). Defaults to 90. max_nan (float, optional): maximum fraction (0-1) of nan values in a segment. If the fraction of nan samples exceeds `max_nan`, the margins are nan for that segment. Defaults to 0.5. correct_delay (bool): whether to correct for delay by padding. Returns: aeeg_lower (np.ndarray): lower margin amplitudes with frequency `fs_margins`. aeeg_upper (np.ndarray): upper margin amplitudes with frequency `fs_margins`. fs (float): sample frequency of `aeeg_lower` and `aeeg_upper` in Hz. """ # Default inputs. method = self.algorithm_parameters['method'].lower() if method == 'cc': segment_length = 15 if segment_length is None else segment_length fs = 1/15 if fs is None else fs elif method == 'neat': segment_length = 15 if segment_length is None else segment_length fs = 1/15 if fs is None else fs elif method == 'tw': segment_length = 45 if segment_length is None else segment_length fs = 1/30 if fs is None else fs else: segment_length = 15 if segment_length is None else segment_length fs = 1/15 if fs is None else fs aeeg_lower, aeeg_upper = extract_aeeg_margins( aeeg=self.aeeg, fs_aeeg=self.fs, segment_length=segment_length, fs_margins=fs, q_lower=q_lower, q_upper=q_upper, max_nan=max_nan, correct_delay=correct_delay) return aeeg_lower, aeeg_upper, fs
[docs] def plot(self, plot_margins=True, ax=None, **kwargs): """ Plot the envelope. Args: plot_margins (bool, optional): if True, plots the upper and lower margin amplitudes as dark fat lines. If False, does not plot the margins. Defaults to True. ax (plt.axes, optional): axes to plot in. If None, will plot in a new figure. Defaults to None. **kwargs (optional): keyword arguments for nnsa.TimeSeries.plot(). """ import matplotlib.pyplot as plt if ax is None: # Create new figure. plt.figure() ax = plt.subplot(111) # Covert to TimeSeries. ts_raw = self.to_time_series(which='raw') # Plot. plot_kwargs = { 'color': 'grey', 'linewidth': 0.5, } plot_kwargs.update(kwargs) ts_raw.plot(ax=ax, **plot_kwargs) if plot_margins: plot_kwargs = { 'color': 'black', 'linewidth': 2.5, } plot_kwargs.update(kwargs) ts_low = self.to_time_series(which='LMA') ts_up = self.to_time_series(which='UMA') ts_low.plot(ax=ax, **plot_kwargs) ts_up.plot(ax=ax, **plot_kwargs) # Change the ticks on the y-axis (from above 10, they are logarithmic). labels = np.array([0, 2.5, 5, 10, 25, 50, 100]).astype(float) locs = labels.copy() locs[locs > 10] = 10*np.log10(locs[locs > 10]) plt.yticks(locs, labels) # Set locations and labels plt.grid() plt.ylim([0, 20]) # Set title. plt.ylabel('aEEG (uV)') plt.title(self.label)
[docs] def to_time_series(self, which='bandwidth', unit='uV', **kwargs): """ Create a TimeSeries object from the aEEG data. Args: which (str, optional): which signal to export to a time series. Choose from: 'raw' 'LMA' (lower margin amplitude), 'UMA' (upper margin amplitude), 'bandwidth' (UMA - LMA). Defaults to 'bandwidth'. unit (str, optional): unit of the aEEG data. Defaults to 'uV'. **kwargs (optional): optional keyword arguments for the TimeSeries object. Returns: ts (nnsa.TimeSeries): TimeSeries containing the aEEG data. """ # Import locally since TimeSeries imports Envelope from this module. from nnsa.containers.time_series import TimeSeries # Create info object. info = { 'source': 'AmplitudeEegResult of {}.'.format( self.data_info if self.data_info is not None else 'unknown source' ) } # Select the signal to export. which_lower = which.lower() # Lower-case. if which_lower == 'raw': signal = self.aeeg fs = self.fs else: # Extract margins. aeeg_lower, aeeg_upper, fs_margins = self.extract_margins() if which_lower == 'lma': signal = aeeg_lower fs = fs_margins elif which_lower == 'uma': signal = aeeg_upper fs = fs_margins elif which_lower == 'bandwidth': signal = aeeg_upper - aeeg_lower fs = fs_margins else: raise ValueError('Invalid option which="{}". Choose from {}.' .format(which, ['raw', 'LMA', 'UMA', 'bandwidth'])) ts = TimeSeries(signal=signal, fs=fs, label='{} {}'.format(self.label, which), unit=unit, info=info, check_label=False, time_offset=self.time_offset, **kwargs) return ts
def _merge(self, other, inplace=False): raise NotImplementedError('{} not compatible with discontinuous data.'.format(self.__class__.__name__)) @staticmethod def _read_from_hdf5(filepath): """ Read result from hdf5 file into a AmplitudeEegResult class. Args: filepath (str): see ResultBase._read_from_hdf5(). Returns: result (nnsa.AmplitudeEegResult): instance of AmplitudeEegResult containing the aEEG. """ # Read standard hdf5 header (use the ResultBase method). algorithm_parameters, data_info, segment_start_times, segment_end_times, fs, time_offset = \ ResultBase._read_hdf5_header(filepath)[1:] # Re-open the file and read the rest of the file. with h5py.File(filepath, 'r') as f: # Read array data. aeeg = f['aeeg'][:] # Read non-array data. if 'label' in f['aeeg'].attrs: label = f['aeeg'].attrs['label'].decode() else: label = None # Create a result object. result = AmplitudeEegResult(aeeg=aeeg, algorithm_parameters=algorithm_parameters, data_info=data_info, fs=fs, time_offset=time_offset, label=label) return result def _write_to_hdf5(self, filepath): """ Write the contents of the object to an hdf5 file. Args: filepath (str): see ResultBase._write_to_hdf5(). """ # Write standard hdf5 header (use the ResultBase method). self._write_hdf5_header(filepath) # Append attributes to the hdf5 file. with h5py.File(filepath, 'a') as f: # Write array data. f.create_dataset('aeeg', data=self.aeeg) # Write non-array data as attributes. # Convert strings to np.string_ type as recommended for compatibility. if self.label is not None: f['aeeg'].attrs['label'] = np.string_(self.label)
[docs]def compute_aeeg_cc(eeg, fs, eng=None): """ Compute amplitude-integrated EEG (aEEG) from single channel continuous EEG. Algorithm by Amir Ansari, Georgios Lazaridis and and Christos Chatzichristos @ KU Leuven, 2013. Runs on MATLAB in the background. Args: eeg (np.ndarray): single channel EEG signal. fs (float): sample frequency of `eeg` in Hz. eng (matlabengine, optional): started matlab engine. If None, a new matlab engine will be started. Defaults to None. Returns: aeeg (np.ndarray): the aEEG signal with sample frequency `fs` (linear scale). """ from nnsa.matlab.utils import ml_array, quit_engine, get_matlab_engine if eng is None: eng = get_matlab_engine() try: # Convert the numpy array to a matlab array. eeg = ml_array(array=eeg, eng=eng, dtype='double') # Call the MATLAB function. Note that we cast all inputs to float, so that they will be passed as # number of datatype 'double' in the MATLAB engine (which is the required type for most MATLAB functions). aeeg = eng.compute_aEEG_CC(eeg, float(fs), nargout=1) # Convert matlab arrays to numpy arrays. aeeg = np.asarray(aeeg).squeeze() except: # Make sure to quit the matlab engine and then re-raise the error. quit_engine(eng) raise return aeeg
[docs]def compute_aeeg_neat(eeg, fs, gain=1.631): """ Compute amplitude-integrated EEG (aEEG) from single channel continuous EEG. Algorithm ported from the Neonatal EEG Analysis Toolbox (NEAT) for MATLAB. References: Z. A. Vesoulis, P. G. Gamble, S. Jain, N. M. E. Ters, S. M. Liao, and A. M. Mathur, “WU-NEAT: A clinically validated, open- source MATLAB toolbox for limited-channel neonatal EEG analysis,” 2018-05-11. Args: eeg (np.ndarray): single channel EEG signal (not filtered). fs (float): sample frequency of `eeg` in Hz. gain (float, optional): filter gain. Defaults to 1.631 Returns: aeeg (np.ndarray): the aEEG signal with sample frequency `fs` (linear scale). """ # Apply filter. NOTE: this filter is not exactly the same filter as in the MATLAB code, but its close. eeg_filtered = filter_aeeg_neat(eeg, fs, correct_delay=True) # Rectification. eeg_rectified = np.abs(eeg_filtered) # Low-pass filtering. b, a = scipy.signal.butter(N=5, Wn=0.6720, btype='low', fs=fs) eeg_envelope = scipy.signal.filtfilt(b, a, eeg_rectified) # Increase gain. aeeg = gain * eeg_envelope return aeeg
def compute_aeeg_neat_ml(eeg, fs, eng=None): """ Compute amplitude-integrated EEG (aEEG) from single channel continuous EEG. Algorithm part of the Neonatal EEG Analysis Toolbox (NEAT) for MATLAB. Runs on MATLAB in the background. References: Z. A. Vesoulis, P. G. Gamble, S. Jain, N. M. E. Ters, S. M. Liao, and A. M. Mathur, “WU-NEAT: A clinically validated, open- source MATLAB toolbox for limited-channel neonatal EEG analysis,” 2018-05-11. Args: eeg (np.ndarray): single channel EEG signal. fs (float): sample frequency of `eeg` in Hz. eng (matlabengine, optional): started matlab engine. If None, a new matlab engine will be started. Defaults to None. Returns: aeeg (np.ndarray): the aEEG signal with sample frequency `fs` (linear scale). """ from nnsa.matlab.utils import ml_array, quit_engine, get_matlab_engine if eng is None: eng = get_matlab_engine() try: # Convert the numpy array to a matlab array. eeg = ml_array(array=eeg, eng=eng, dtype='double') # Call the MATLAB function. Note that we cast all inputs to float, so that they will be passed as # number of datatype 'double' in the MATLAB engine (which is the required type for most MATLAB functions). aeeg = eng.compute_aEEG_NEAT(eeg, float(fs), nargout=1) # Convert matlab arrays to numpy arrays. aeeg = np.asarray(aeeg).squeeze() except: # Make sure to quit the matlab engine and then re-raise the error. quit_engine(eng) raise return aeeg
[docs]def compute_aeeg_tw(eeg, fs, gain=5, rma_win=3): """ Compute amplitude-integrated EEG (aEEG) from single channel continuous EEG. Algorithm emulates aEEG as generated by Olympic CFM 6000 system, as described in Tobias Werther et al. References: T. Werther, M. Olischar, G. Naulaers, P. Deindl, K. Klebermass-Schrehof, and N. Stevenson, “Are All Amplitude-Integrated Electroencephalogram Systems Equal?,” Neonatology, vol. 112, no. 4, pp. 394–401, 2017, doi: 10.1159/000480008. Args: eeg (np.ndarray): single channel EEG signal. fs (float): sample frequency of `eeg` in Hz. gain (float, optional): filter gain. Defaults to 5. rma_win (float, optional): rectangular moving average (RMA) window (in seconds). Defaults to 3. Returns: aeeg (np.ndarray): the aEEG signal with sample frequency `fs` (linear scale).) """ # Step 1: apply Zhang and Ding filter. eeg_filtered = filter_aeeg_zhang(eeg, fs, correct_delay=True) # Step 2: rectification and gain. eeg_rectified = np.abs(eeg_filtered) * gain # Step 3: apply moving average (RMA). numtaps = int(round(rma_win*fs)) aeeg = np.convolve(eeg_rectified, np.ones(numtaps)/numtaps, mode='same') return aeeg
def compute_aeeg_zhang(eeg, fs, gain=2): """ Compute amplitude-integrated EEG (aEEG) from single channel continuous EEG. References: D. Zhang and H. Ding, “Calculation of compact amplitude-integrated EEG tracing and upper and lower margins using raw EEG data,” Health, vol. 05, no. 05, pp. 885–891, 2013, doi: 10.4236/health.2013.55116. Args: eeg (np.ndarray): single channel EEG signal. fs (float): sample frequency of `eeg` in Hz. gain (float, optional): filter gain. Defaults to 2. Returns: aeeg (np.ndarray): the aEEG signal with sample frequency `fs` (linear scale). """ # Step 1: apply Zhang and Ding filter. eeg_filtered = filter_aeeg_zhang(eeg, fs, correct_delay=True) # Step 2: rectification. eeg_rectified = np.abs(eeg_filtered) # Step 3: envelope detection. b, a = scipy.signal.butter(N=5, Wn=0.6720, btype='low', fs=fs) # Not sure about the filter cutoff (not mentioned in paper!). Used the save as in NEAT. aeeg = gain * scipy.signal.filtfilt(b, a, eeg_rectified) return aeeg def extract_aeeg_margins(aeeg, fs_aeeg, segment_length, fs_margins, q_lower=10, q_upper=90, max_nan=0.5, correct_delay=False): """ Extract upper and lower margin amplitudes by extracting the low and high percentiles in (overlapping) segments. Args: aeeg (np.ndarray): aeeg signal. fs_aeeg (float): sample frequency of aeeg in Hz. segment_length (float): length of the segments /moving window to compute the margins in (in seconds). fs_margins (float): desired sample frequency of the returned margins in Hz. q_lower (float, optional): percentile for the lower margin (0-100). Defaults to 10. q_upper (float, optional): percentile for the upper margin (0-100). Defaults to 90. max_nan (float, optional): maximum fraction (0-1) of nan values in a segment. If the fraction of nan samples exceeds `max_nan`, the margins are nan for that segment. Defaults to 0.5. correct_delay (bool): if True, the signal is padded such that first output sample corresponds to t=0. If False, no padding is done and the first output sample corresponds to t=segment_length/2. Returns: aeeg_lower (np.ndarray): lower margin amplitudes with frequency `fs_margins`. aeeg_upper (np.ndarray): upper margin amplitudes with frequency `fs_margins`. """ if correct_delay: # First add half the window length of sample to the beginning to compensate for delays. aeeg = np.concatenate((np.nanmean(aeeg)*np.ones(int(fs_aeeg * segment_length / 2)), aeeg)) # Segment the aEEG signal. Use such an overlap that we get the desired sampling frequency of the aEEG. aeeg_seg = np.array(list(segment_generator(aeeg, segment_length=segment_length, overlap=segment_length - 1 / fs_margins, fs=fs_aeeg))) aeeg_low_up = np.nanpercentile(aeeg_seg, q=[q_lower, q_upper], axis=-1) # Set segments to nan if more than `nan_max`*100 % of the samples in the segment are nan. nan_frac = np.mean(np.isnan(aeeg_seg), axis=-1) aeeg_low_up[:, nan_frac > max_nan] = np.nan # Extract lower and upper envelopes. aeeg_lower = aeeg_low_up[0] aeeg_upper = aeeg_low_up[1] return aeeg_lower, aeeg_upper def filter_aeeg_neat(x, fs, correct_delay=True): """ Apply an assymetric FIR filter with specifications as done in the NEAT toolbox for MATLAB. References: Z. A. Vesoulis, P. G. Gamble, S. Jain, N. M. E. Ters, S. M. Liao, and A. M. Mathur, “WU-NEAT: A clinically validated, open- source MATLAB toolbox for limited-channel neonatal EEG analysis,” 2018-05-11. Args: x (np.ndarray): (single channel EEG) signal. fs (float): sample frequency of `x`` in Hz. correct_delay (bool, optional): if True, corrects for the delay introduced by the FIR filter. If False, no correction is applied. Returns: y (np.ndarray): filtered signal. """ # Make numtaps depend on the sample frequency. At fs=64 Hz, use 300 taps. numtaps = int(round(300 * fs / 64)) # Make numtaps odd, so that delay is an integer number of samples. if np.mod(numtaps, 2) == 0: numtaps += 1 # Collect filter specs. freq = [0, 1.6, 2.0, 14.4, 15.36, fs / 2] desired = [0, 0, 0.73, 3.58, 0, 0] # Design filter. Note: use firwin2 here, not the Parks-McClellan algorithm (not implemented for # assymetric filters in scipy). b = scipy.signal.firwin2(numtaps, freq, desired, fs=fs) # Compute delay of filter. delay = int((numtaps - 1) / 2) if correct_delay: # Add samples at the end of x. x = np.append(x, np.zeros(delay)) # Apply filter. y = scipy.signal.lfilter(b, 1, x) if correct_delay: # Remove first `delay` samples. y = y[delay:] return y
[docs]def filter_aeeg_zhang(x, fs, correct_delay=True): """ Apply an assymetric FIR filter with specifications as proposed by Zhang and Ding, 2013. This filter is an approximation of a filter to compute aEEG from continuous EEG. References: D. Zhang and H. Ding, “Calculation of compact amplitude-integrated EEG tracing and upper and lower margins using raw EEG data,” Health, vol. 05, no. 05, pp. 885–891, 2013, doi: 10.4236/health.2013.55116. Args: x (np.ndarray): (single channel EEG) signal. fs (float): sample frequency of `x`` in Hz. correct_delay (bool, optional): if True, corrects for the delay introduced by the FIR filter. If False, no correction is applied. Returns: y (np.ndarray): filtered signal. """ # Helper function to convert dB to magnitude. def db2mag(db): return 10 ** (db / 20) # Make numtaps depend on the sample frequency. At fs=64 Hz, use 300 taps. numtaps = int(round(300*fs/64)) # Make numtaps odd, so that delay is an integer number of samples. if np.mod(numtaps, 2) == 0: numtaps += 1 # Specifications are derived from Figure 1 of Zhang and Ding. # At 10 Hz, magnitude have 0 dB. db_10 = 0 # At 2 Hz, magnitude decreases 12 dB/decade from 10 Hz. db_2 = db_10 - np.log10(10/2)*12 # At 1 Hz, magnitude decreases 60 dB/decade from 2 Hz. db_1 = db_2 - np.log10(2/1)*60 # At 15 Hz, magnitude increases 12 dB/decade from 10 Hz. db_15 = db_10 + np.log10(15/10)*12 # At 20 Hz, magnitude decreases 120 dB/decade from 15 Hz. db_20 = db_15 - np.log10(20/15)*120 # At 5 Hz, magnitude decreases 12 dB/decade from 10 Hz. db_5 = db_10 - np.log10(10/5)*12 # Collect filter specs. freq = [0, 1, 2, 5, 10, 15, 20, 21, fs/2] desired = [0, db2mag(db_1), db2mag(db_2), db2mag(db_5), db2mag(db_10), db2mag(db_15), db2mag(db_20), db2mag(-40), 0] # Design filter. b = scipy.signal.firwin2(numtaps, freq, desired, fs=fs) # Compute delay of filter. delay = int((numtaps - 1)/2) if correct_delay: # Add samples at the end of x. x = np.append(x, np.zeros(delay)) # Apply filter. y = scipy.signal.lfilter(b, 1, x) if correct_delay: # Remove first `delay` samples. y = y[delay:] # # Plot frequency response. # w, h = scipy.signal.freqz(b) # import matplotlib.pyplot as plt # fig, ax1 = plt.subplots() # ax1.set_title('Digital filter frequency response') # p = 20 * np.log10(abs(h)) # ax1.plot(w/np.pi*fs/2, p, 'b') # plt.scatter(freq, 20 * np.log10(desired)) # ax1.set_ylabel('Amplitude [dB]', color='b') # ax1.set_xlabel('Frequency [Hz]') # plt.xscale('log') # plt.xlim([1, 25]) # plt.ylim([-80, 20]) return y
def to_semilog(aeeg, make_copy=True): """ Convert to semilog scale (amplitudes below 10 linear scale, above 10 logarithmic scale). """ aeeg = np.asarray(aeeg) if make_copy: aeeg = aeeg.copy() log_mask = aeeg > 10 aeeg[log_mask] = 10 * (np.log10(aeeg[log_mask])) return aeeg