Multifractal analysis

Demonstration code for performing a multifractal analysis using MultifractalAnalysis().

Link to script: feature_extraction/multifractal_analysis.py

import os

import numpy as np
import matplotlib.pyplot as plt

from nnsa import EdfReader, MultifractalAnalysis, SUPPORTED_RESULT_FILE_TYPES, read_result_from_file, \
    assert_equal, print_object_summary
from nnsa.matlab.utils import matlab_engine

plt.close('all')

Parameters.

# Print the default parameters of MultifractalAnalysis():
print(MultifractalAnalysis().default_parameters())

# Descriptions of the parameters are documented in the default_parameters() code.

# Create an instance of the MultifractalAnalysis class with custom parameters, overruling some defaults:
mfa = MultifractalAnalysis(segmentation={'segment_length': 30})

# See if the custom parameters were accepted:
print('\nCustom parameters:')
print(mfa.parameters)

Main method: multifractal_analysis.

# Now that we have initialized a MultifractalAnalysis object with certain parameters, we can run the
# multifractal_analysis method, which performs the multifractal analysis:
# Create a random signal.
signal = np.random.rand(8, 10000)
fs = 250
result = mfa.multifractal_analysis(signal, fs=fs)

Save the result.

# We can save the result to any supported file. To list the supported file types/extensions:
print('Supported result file types: {}'.format(SUPPORTED_RESULT_FILE_TYPES))

# E.g. save as hdf5 (the file type to save is automatically detected from the extension of the filename).
filename = 'temp_result.hdf5'
result.save_to_file(filename)

# We can reload the result back to a MultifractalAnalysisResult object:
result_loaded = read_result_from_file(filename)

# Remove temporary file.
os.remove(filename)

# Verify that the loaded object is equal to the original object:
assert_equal(actual=result_loaded, desired=result)  # Will raise an AssertionError if not equal.
print('Loaded result object is equal to original object.')

The returned object is a MultifractalAnalysisResult object, which is a high-level interface for manipulating/visualizing/saving the result:

print('\nSummary of result object:')
print_object_summary(result)

MultifractalAnalysisResult attributes.

# The main attribute is cp, which contains the coefficients of the Taylor approximation of the MF spectrum:
print('result.cp: {}'.format(result.cp))

# The attributes hmax and hmin are also related to the shape of the spectrum.
print('result.hmax: {}'.format(result.hmax))
print('result.hmin: {}'.format(result.hmin))

# The attribute q_functions is a dictionary that contains variables that are functions of q, as computed
# in the analysis.
print('result.q_functions: {}'.format(result.q_functions))

# The variable q, corresponding to the q_functions can be accessed via the property q:
print('result.q: {}'.format(result.q))

# The channel labels corresponding to the channel dimension of the aforementioned arrays:
print('result.channel_labels: {}'.format(result.channel_labels))

# The segment times corresponding to the segments/time dimension:
print('result.segment_times: {}'.format(result.segment_times))

MultifractalAnalysisResult methods.

# Get a dictionary with features. Features are arrays with dimension (channels, segments).
features = result.extract_features()

MultifractalAnalysis of EegDataset.

# Specify a file and open a reader to read the data (e.g. EdfReader to read an EDF(+) file).
# filepath = TEST_FILE_EDF
filepath = 'C:/data_temp/test.edf'

# Read EEG channels in an EegDataset object.
with EdfReader(filepath) as r:
    ds = r.read_eeg_dataset(dtype=np.float32)  # We can specify the dtype, lower precision, means less memory usage.

ds_preprocessed = ds.reference('Cz').resample(fs_new=128).filter_saved_filter(filter_name='bandpassfir_a')

# We can do the mf analysis for multichannel EEG via the wrapper method 'multifractal_analysis' of the EegDataset class.
# Specify the parameters to the MultifractalAnalysis class as kwargs.
if False:
    with start_matlab_engine() as eng:
        # Disable this line, since the multifractal_analysis will take a long time to compute.
        result_ds = ds_preprocessed.multifractal_analysis(eng=eng)

else:
    # %% Perform on only some segments (the below code is basically what's inside the EegDataset.multifractal_analysis
    # method).

    # The mfa algorithm calls MATLAB functions. We explicitly pass a matlab engine so we can make sure it
    # closes when it is done using the context manager matlab_engine().
    with matlab_engine() as eng:
        # Initialize MultifractalAnalysis object with default arguments.
        mfa = MultifractalAnalysis(eng=eng)

        # Prepare input matrix for multifractal_analysis.
        data_matrix, channel_labels = ds_preprocessed.asarray(return_channel_labels=True)

        # Sample frequency of all EEG signals is the same if self.asarray() did not raise an error.
        fs = next((ts.fs for ts in ds_preprocessed.time_series.values()))

        # Select only some segments.
        n_segments = 2
        data_matrix = data_matrix[:, :n_segments * mfa.parameters['segmentation']['segment_length'] * fs]

        # Run multifractal_analysis.
        result_ds = mfa.multifractal_analysis(
            data_matrix, fs=fs, channel_labels=channel_labels)

Display some features.

segment_idx = 1
zq = result_ds.q_functions['zq'][:, :, segment_idx]; print(zq)
hq = result_ds.q_functions['hq'][:, :, segment_idx]; print(hq)
dq = result_ds.q_functions['Dq'][:, :, segment_idx]; print(dq)
cp = result_ds.cp[:, :, segment_idx]; print(cp)
hmin = result_ds.hmin[:, segment_idx]; print(hmin)
hmax = result_ds.hmax[:, segment_idx]; print(hmax)

Plot SS and approximation.

plt.figure()
chan = 6
seg = 1
hq = result_ds.q_functions['hq'][:, chan, seg]
dq = result_ds.q_functions['Dq'][:, chan, seg]
plt.scatter(hq, dq)
plt.xlabel('h')
plt.ylabel('D(h)')

# Taylor expansion of D(h). Note that the coefficients of this expansion are nice features of the MFA analysis.
q = result_ds.q
tauq = np.zeros_like(q)
for i, cp in enumerate(result_ds.cp[:, chan, seg]):
    p = i + 1
    tauq += cp * q ** p / np.math.factorial(p)
dh_ap = q * hq - tauq + dq.max()

plt.plot(hq, dh_ap)