"""
Functions to extract a set of time and frequency features from an EEG signal.
Author: Tim Hermans (tim-hermans@hotmail.com).
"""
import os
import sys
import warnings
import numpy as np
import psutil
import pyprind
from scipy.signal import periodogram, welch
from scipy.stats import skew, kurtosis
from nnsa.parameters.parameters import ClassWithParameters, Parameters
from nnsa.feature_extraction.result import read_result_from_file
from nnsa.utils.testing import assert_equal
from nnsa.feature_extraction.feature_sets.base import FeatureSetResult
from nnsa.decompositions.fourier import fftdeconvolve
from nnsa.feature_extraction.envelope import hilbert_envelope
from nnsa.feature_extraction.fractality import wfbmesti
from nnsa.utils.segmentation import get_all_segments, get_segment_times
__all__ = [
'compute_features',
'compute_features_S1',
'compute_features_S2',
]
class EegFeatures(ClassWithParameters):
"""
Class for computing EEG features.
Args:
which (str): which EEG feature set to compute. See compute_features() for options.
**kwargs (optional): see nnsa.ClassWithParameters.
Examples:
>>> np.random.seed(43)
>>> fs = 250
>>> x = np.random.rand(8, 100*fs)
>>> ef = EegFeatures(which='v2')
>>> print(type(ef.parameters).__name__)
Parameters
>>> result = ef.process(x, fs=fs, verbose=0)
>>> print(type(result).__name__)
FeatureSetResult
>>> result.feature_labels[0] # Label of first feature.
'F1'
>>> result.features[0, 1, 2] # Value of first feature, second channel, third segment.
0.07127218164811738
>>> temp_file = 'temp_ef.hdf5'
>>> result.save_to_file(temp_file, overwrite=True, verbose=0)
>>> result_loaded = read_result_from_file(temp_file, verbose=0)
>>> assert_equal(result, result_loaded)
>>> os.remove(temp_file)
"""
def __init__(self, which='S2', **kwargs):
super().__init__(which=which, **kwargs)
@staticmethod
def default_parameters():
# Parameters for segmentation of EEG recording into small time segments/epochs.
segmentation = Parameters(**{
# Segment length in seconds:
'segment_length': 30,
# Overlap in segments in seconds:
'overlap': 0,
})
# Which feature set to compute.
which = None
pars = {
'which': which,
'segmentation': segmentation,
}
return Parameters(**pars)
def process(self, eeg, fs, channel_labels=None, axis=-1, chunk_size=None, verbose=1):
"""
Extract the features.
Args:
eeg (np.ndarray): 2D-array with (multi-channel) EEG data. Shape depends on `axis`. If 1D array is given,
it is considered one channel.
fs (float): sample frequency of the EEG signal.
channel_labels (list, optional): optional list of channel labels.
Defaults to None.
axis (int): the time axis (along which to segment and compute features).
chunk_size (int): number of time samples to process at once.
If None, a suitable chunk_size is chosen automatically.
verbose (int, optional): verbose level.
Defaults to 1.
Returns:
(nnsa.FeatureSetResult): object containing the features per segment per channel.
"""
# Check.
if eeg.ndim == 1:
eeg = eeg.reshape(1, -1)
axis = -1
if eeg.ndim != 2:
raise ValueError('Expected a 2D input with multi-channel EEG. Got shape {}.'.format(eeg.shape))
# Get pars.
seg_pars = self.parameters['segmentation']
seg_len = seg_pars['segment_length']
overlap = seg_pars['overlap']
stepsize = seg_len - overlap
which = self.parameters['which']
# We need time to be the first axis, so transpose if needed.
if axis == -1 or axis == 1:
eeg = eeg.T
if verbose:
print('Computing feature set {} ...'.format(which))
if channel_labels is not None:
if len(channel_labels) != eeg.shape[-1]:
raise ValueError('{} channel labels provided, but channel dimension of eeg has size {} (eeg.shape={}).'
.format(len(channel_labels), eeg.shape[-1], eeg.shape))
# Compute features.
features, feature_labels = compute_features(
x=eeg, fs=fs, window=seg_len, which=which, stepsize=stepsize,
chunk_size=chunk_size, verbose=verbose)
# To shape (n_seg, n_chan, n_feat).
features = np.swapaxes(features, axis1=0, axis2=2)
# Time vector for segments.
time_start = get_segment_times(num_segments=len(features), segment_length=seg_len, overlap=overlap, offset=0)
# To FeatureSetResult.
result = FeatureSetResult(
features=features, feature_labels=feature_labels,
algorithm_parameters=self.parameters, channel_labels=channel_labels,
segment_start_times=time_start, segment_end_times=time_start+seg_len)
return result
[docs]def compute_features(x, fs, window, which='S2', stepsize=None, chunk_size=None, verbose=1):
"""
Compute (absolute) features with specific segment length.
Implemented by segmenting (a chunk of) x and computing the features on the 3d array with segments,
making it potentially 2-3x faster than compute_features() which loops over segments,
especially when `window` is not much larger than `stepsize`.
Args:
x (np.ndarray): array with shape (n_samples, n_channels).
fs (flaot): sampling frequency of `x` in Hz.
window (float): window length for feature computation (in seconds).
which (str): which feature set to compute. Choose from: 'S1', 'S2'.
See e.g. compute_features_S1() what features are included in S1`.
stepsize (float): stepsize for feature computation (in seconds). If None, uses `window` (i.e., no overlap).
chunk_size (int): number of time samples to process at once.
If None, a suitable chunk_size is chosen automatically.
verbose (int): verbosity level.
Returns:
features (np.ndarray): array with shape (n_feat, n_chan, n_seg) containing the features.
feature_labels (list): list with length n_feat containing feature labels for `features`.
"""
# Default inputs.
if stepsize is None:
stepsize = window
# Check.
if x.ndim != 2:
raise ValueError('Expected an input with shape (n_samples, n_channels). Got shape {}.'.format(x.shape))
n_time, n_channels = x.shape
n_diff = n_time - n_channels
if n_diff < 0:
msg = '\nNumber of channels ({}) is larger than number of samples ({}). ' \
'Might need to transpose input array?'.format(n_channels, n_time)
if n_diff > -100:
warnings.warn(msg)
else:
raise ValueError(msg)
# Number of samples for segmentation.
step_len = float(stepsize * fs)
win_len = float(window * fs)
if step_len.is_integer() and win_len.is_integer():
step_len = int(step_len)
win_len = int(win_len)
else:
raise ValueError('step_len (stepsize*fs) and/or win_len (window*fs) are not integers. '
'Change fs, the window or stepsize.')
if chunk_size is None:
# Determine automatically if we need to chunk.
expected_size = n_time/step_len * win_len * n_channels
required_mem = 8 * expected_size * 8 # ~ 8 * number of bytes of the segmented array.
available_mem = psutil.virtual_memory()[1]
if available_mem < required_mem:
chunk_size = available_mem/required_mem * n_time
else:
chunk_size = -1
if chunk_size == -1:
# Chunk size is total length (no division into chunks).
chunk_size = n_time
else:
# Determine suitable chunk_size: multiple of step_len.
chunk_size = step_len * int(np.ceil(chunk_size/step_len))
# Loop over chunks.
n_chunks = np.ceil(n_time/chunk_size)
bar = pyprind.ProgBar(n_chunks, stream=sys.stdout)
features = []
idx_start = 0
while idx_start + win_len <= n_time:
# Get chunk.
idx_stop = min(idx_start + chunk_size, n_time)
x_chunk = x[idx_start: idx_stop]
# Create array with segments (n_seg, n_time, n_channels).
x_seg_all = get_all_segments(x_chunk, segment_length=window, overlap=window - stepsize,
fs=fs, axis=0)
# I expect to see "RuntimeWarning: Mean of empty slice" in this block
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
if which in ['S1', 'v1', 'time']:
features_i, feature_labels = compute_features_S1(x_seg_all, fs, axis=1)
elif which in ['S2', 'v2', 'time_freq']:
features_i, feature_labels = compute_features_S2(x_seg_all, fs, axis=1)
elif which in ['S3', 'spectral_pirya']:
features_i, feature_labels = compute_features_S3(x_seg_all, fs, axis=1)
else:
raise ValueError('Invalid option which="{}". Choose from {}.'.format(which, ['S1', 'S2', 'S3']))
# To shape (n_feat, n_chan, n_seg).
features_i = np.swapaxes(features_i, 1, 2)
features.append(features_i)
# Update start index for next chunk.
idx_start = idx_stop - win_len + step_len
if verbose:
bar.update()
# Concatenate chunks.
features = np.concatenate(features, 2)
assert len(feature_labels) == len(features)
return features, feature_labels
[docs]def compute_features_S1(x, fs, axis=0):
"""
Compute time domain features (own set).
Args:
x (np.ndarray): signal array for which to compute features with shape.
fs (float): sampling frequency of the signal (Hz).
axis (int): time axis of `x` (axis along which to compute the features).
Returns:
features (np.ndarray): array with length n_features containing the features.
Is an N-D array if `x` is N-D, e.g. if x.shape == (n_segments, n_time, n_channels),
then features.shape = (n_features, n_segments, n_channels). Note that the time dimension disappears and
that the features are always the first dimension.
feature_labels (list): list with length n_features containing names for the features.
"""
# First time derivative (per second).
x_diff = np.diff(x, axis=axis) * fs
# Compute features.
var = np.nanvar(x, axis=axis)
mean = np.nanmean(x, axis=axis)
mean_abs = np.nanmean(np.abs(x), axis=axis)
zc = np.sum(np.diff(np.sign(x), axis=axis) > 0, axis=axis)
line_length = np.nanmean(np.abs(x_diff), axis=axis)
max_der = np.nanmax(np.abs(x_diff), axis=axis)
# Collect features (n_feat, n_chan).
features = np.asarray([var, mean, mean_abs, zc, line_length, max_der])
# Create feature labels.
feature_labels = ['Var', 'Mean', 'Mean abs', 'ZC', 'LL', 'Max der']
return features, feature_labels
[docs]def compute_features_S2(x, fs, axis=0):
"""
Compute time- and frequency-domain features from Stevenson et al. 2014 paper + own set ('S1').
References:
N. J. Stevenson, J. M. O. I. Korotchikova, and G. B. Boylan, “Artefact detection in neonatal EEG,”
in 2014 36th Annual International Conference of the IEEE Engineering in Medicine and Biology Society,
2014, doi: 10.1109/embc.2014.6943743.
For IWMF and IWBW:
B. R. Greene, S. Faul, W. P. Marnane, G. Lightbody, I. Korotchikova, and G. B. Boylan,
“A comparison of quantitative EEG features for neonatal seizure detection,”
Clinical Neurophysiology, vol. 119, no. 6, pp. 1248–1261, Jun. 2008, doi: 10.1016/j.clinph.2008.02.001.
Args:
x (np.ndarray): signal array for which to compute features with shape.
fs (float): sampling frequency of the signal (Hz).
axis (int): time axis of `x` (axis along which to compute the features).
Returns:
features (np.ndarray): array with length n_features containing the features.
Is an N-D array if `x` is N-D, e.g. if x.shape == (n_segments, n_time, n_channels),
then features.shape = (n_features, n_segments, n_channels). Note that the time dimension disappears and
that the features are always the first dimension.
feature_labels (list): list with length n_features containing names for the features.
"""
if x.ndim > 1:
# Swap time axis with second-to last axis such that we can use dot products.
new_axis = -2
x = np.swapaxes(x, new_axis, axis)
else:
new_axis = -1
# Points for FFT (set high enough for estimation of power between 0 and 0.5 Hz).
nfft = 1024
# Precompute some things.
N = x.shape[new_axis]
S = skew(x, axis=new_axis, nan_policy='omit')
K = kurtosis(x, axis=new_axis, nan_policy='omit')
x_mean = np.nanmean(x, axis=new_axis, keepdims=True)
x_demean = x - x_mean
# First time derivative (per second).
x_diff = np.diff(x, axis=new_axis) * fs
# Hilbert transform.
envelope = hilbert_envelope(x, nfft=None, axis=new_axis)
# Periodogram.
f, P = periodogram(x, fs=fs, nfft=nfft, axis=new_axis, scaling='density')
df = f[1]
P_tot = np.sum(P, axis=new_axis)
_dims = list(range(P.ndim))
del _dims[new_axis]
# Hurst exponent*.
H = wfbmesti(x, method='derivative_wavelet', axis=new_axis, keepdims=False)
# Deconvolution filter (eq. 9).
h_deconv = np.ones_like(x)
slc_1 = [slice(None)] * x.ndim
slc_2 = [slice(None)] * x.ndim
for ii in range(1, x.shape[new_axis]):
slc_1[new_axis] = ii
slc_2[new_axis] = ii - 1
h_deconv[tuple(slc_1)] = (H + 0.5 + ii)/(ii + 1)*h_deconv[tuple(slc_2)]
# Deconvolve.
x_deconv = fftdeconvolve(x, h_deconv, axis=new_axis)
abs_x_deconv = np.abs(x_deconv)
# Collect features from Stevenson et al. 2014.
f1 = np.nanmean(envelope, axis=new_axis)
f2 = np.nanmedian(envelope, axis=new_axis)
f3 = np.nanmean((envelope - np.expand_dims(f1, new_axis)) ** 2, axis=new_axis)
f4 = np.dot(f, P) / P_tot # See Greene et al. 2008
f5 = np.sqrt(
np.sum(((np.expand_dims(f, _dims) - np.expand_dims(f4, new_axis)) ** 2) * P,
axis=new_axis) / P_tot) # See Greene et al. 2008
f6 = np.dot((f >= 0.5) & (f < 13), P) * df
f7 = np.dot((f < 0.5), P) * df / f6
f8 = np.dot((f >= 13) & (f < 70), P) * df / f6
f9 = H
f10 = np.nanmax(envelope, axis=new_axis) / f2 # Use envelope so we can re-use f2, reducing computation time.
f11 = np.nanmax(abs_x_deconv, axis=new_axis)/np.nanmedian(abs_x_deconv, axis=new_axis)
f12 = N / 6 * (S ** 2 + 1 / 4 * (K - 3) ** 2)
f13 = f[np.argmax(P, axis=new_axis)]
f14 = np.max(P, axis=new_axis) / P_tot
# Collect own features.
var = np.nanvar(x, axis=new_axis)
abs_mean = np.abs(np.nanmean(x, axis=new_axis))
mean_abs = np.nanmean(np.abs(x), axis=new_axis)
zc = np.sum(np.diff(np.sign(x_demean), axis=new_axis) > 0, axis=new_axis)
line_length = np.nanmean(np.abs(x_diff), axis=new_axis)
max_der = np.nanmax(np.abs(x_diff), axis=new_axis)
# Collect features (n_feat, ...).
features = np.asarray([f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12, f13, f14,
var, abs_mean, mean_abs, zc, line_length, max_der])
# Create feature labels.
feature_labels = ['F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9', 'F10', 'F11', 'F12', 'F13', 'F14'] + \
['Var', 'Abs mean', 'Mean abs', 'ZC', 'LL', 'Max der']
assert len(features) == len(feature_labels)
return features, feature_labels
def compute_features_S3(x, fs, nperseg=None, axis=0):
"""
Compute frequency-domain features from Piryatinska et al. 2009 paper.
References:
A. Piryatinska, G. Terdik, W. A. Woyczynski, K. A. Loparo, M. S. Scher, and A. Zlotnik,
“Automated detection of neonate EEG sleep stages,” Computer Methods and Programs in Biomedicine,
vol. 95, no. 1, pp. 31–46, Jul. 2009, doi: 10.1016/j.cmpb.2009.01.006.
Args:
x (np.ndarray): signal array for which to compute features with shape.
fs (float): sampling frequency of the signal (Hz).
nperseg (int): number of samples per subsegment in Welch procedure (see scipy's welch() function).
axis (int): time axis of `x` (axis along which to compute the features).
Returns:
features (np.ndarray): array with length n_features containing the features.
Is an N-D array if `x` is N-D, e.g. if x.shape == (n_segments, n_time, n_channels),
then features.shape = (n_features, n_segments, n_channels). Note that the time dimension disappears and
that the features are always the first dimension.
feature_labels (list): list with length n_features containing names for the features.
Examples:
>>> tmax = 30
>>> fs = 100
>>> t = np.arange(0, tmax, 1/fs)
>>> f_all = [2, 6, 10, 13.5]
>>> a_all = [1, 2, 3, 4]
>>> x = 0
>>> for fi, ai in zip(f_all, a_all):
... x += np.sqrt(ai) * np.sin(fi * t * 2 * np.pi)
>>> features, feature_labels = compute_features_S3(x, fs, axis=-1)
>>> print(np.round(features, 1))
[ 0.1 0.2 0.3 0.4 13.4 13.6 49. -0. 3.9]
"""
if nperseg is None:
# 4 seconds by default.
nperseg = int(fs * 4)
if x.ndim > 1:
# Swap time axis with second-to last axis such that we can use dot products.
new_axis = -2
x = np.swapaxes(x, new_axis, axis)
else:
new_axis = -1
N = x.shape[new_axis]
if nperseg > N:
raise ValueError(f'nperseg = {nperseg} is greater than input length = {N}.')
# Welch spectral power.
nfft = max([1024, nperseg])
f, P = welch(
x, fs=fs,
window='hann', nperseg=nperseg, noverlap=None, # Overlap = None, means 50 %overlap.
nfft=nfft, scaling='density', axis=new_axis)
assert P.shape[new_axis] == len(f)
df = f[1]
_dims = list(range(P.ndim))
del _dims[new_axis]
# Remove freqs below 0.5 and above 30 Hz.
freq_band = (f >= 0.5) & (f < 30)
f = f[freq_band]
slices = [slice(None)] * P.ndim
slices[new_axis] = freq_band
P = P[tuple(slices)]
# Total and relative power.
P_tot = np.sum(P, axis=new_axis, keepdims=True)
P_rel = P / P_tot
# Cumulative power and edge frequency locations.
P_cum = np.cumsum(P_rel, axis=new_axis)
idx_edge_75 = np.argmin(np.abs(P_cum - 0.75), axis=new_axis)
idx_edge_90 = np.argmin(np.abs(P_cum - 0.9), axis=new_axis)
# Distribute the amplitude into bins.
N_bins = 100
x_min = np.min(x, axis=new_axis, keepdims=False)
x_max = np.max(x, axis=new_axis, keepdims=False)
bin_width = (x_max - x_min) / N_bins
x_demin = x - np.expand_dims(x_min, new_axis)
x_norm = x_demin / np.max(x_demin, axis=new_axis, keepdims=True) * (N_bins - 1 / N_bins / 1e5)
bin_idx = np.floor(x_norm).astype(int)
assert np.all(bin_idx < N_bins)
# Create P_bin with shape (N_bins, ...).
P_bin = []
for bin_num in range(N_bins):
P_bin.append(np.sum(bin_idx == bin_num, axis=new_axis))
P_bin = np.asarray(P_bin)
# Normalize P_bin, such that integral is 1 (probability density).
P_bin = P_bin / (np.sum(P_bin, axis=0, keepdims=True) * np.expand_dims(bin_width, 0))
feature_dict = {
'delta_power_rel': np.dot((f >= 0.5) & (f < 4), P_rel),
'theta_power_rel': np.dot((f >= 4) & (f < 8), P_rel),
'alpha_power_rel': np.dot((f >= 8) & (f < 12), P_rel),
'beta_power_rel': np.dot((f >= 12) & (f < 30), P_rel),
'sef_75': f[idx_edge_75],
'sef_90': f[idx_edge_90],
'spectral_moment': np.sum(np.expand_dims(f, _dims) * P, axis=new_axis) * df,
'spectral_entropy': -np.sum(P * np.log(P + 1e-10), axis=new_axis) * df / len(f),
'amplitude_entropy': -np.sum(P_bin * np.log(P_bin + 1e-10) / np.log(N_bins), axis=0),
}
# Split the values and keys and return separately.
features = np.asarray(list(feature_dict.values()))
feature_labels = list(feature_dict.keys())
return features, feature_labels
def _compute_bandpower(f, Pxx, fmin, fmax):
# Compute bandpower.
ind_min = np.argmax(f > fmin) - 1
ind_max = np.argmax(f > fmax) - 1
return np.trapz(Pxx[ind_min: ind_max], f[ind_min: ind_max])