import os
import pickle
import sys
import warnings
import h5py
import joblib
import numpy as np
import pandas as pd
import pyprind
from matplotlib import pyplot as plt
from nnsa.config import MODEL_DATA_DIR
from nnsa.utils.conversions import convert_time_scale
from nnsa.utils.paths import get_filepaths
from nnsa.feature_extraction.common import select_segments
from nnsa.feature_extraction.feature_sets.eeg import compute_features
from nnsa.parameters.parameters import ClassWithParameters, Parameters
from nnsa.feature_extraction.result import ResultBase
from nnsa.preprocessing.resample import resample_by_filtering
from nnsa.utils.config import HORIZONTAL_RULE
from nnsa.utils.pkl import pickle_load
from nnsa.utils.plotting import format_time_axis
from nnsa.utils.segmentation import get_segment_times, get_all_segments
[docs]class BrainAgeSinc(ClassWithParameters):
"""
Deep shared inception ensemble model for brain age estimation.
References:
A. Ansari et al.,
“Brain age as an estimator of neurodevelopmental outcome:
A deep learning approach for neonatal cot-side monitoring,”
Jan. 2023, doi: 10.1101/2023.01.24.525361
Args:
which (str): string specifying which model to use. Options:
- 'v1': the one trained by Ansari and Pillay as available on (as of June 2022):
https://github.com/amirans65/brainagemodel/tree/c948967d7a42e237fa6cc8991b696b662300f6e2/brainagemodel/pretrained
- 'v2': the one retrained by Hermans including more older neonates.
**kwargs: see default_parameters().
"""
def __init__(self, which='v2', detect_novelties=True, **kwargs):
# Call parent's __init__.
super().__init__(which=which.lower(), detect_novelties=detect_novelties, **kwargs)
self._all_models = None
self._normalization_parameters = None
@property
def all_models(self):
"""
Return the model.
Returns:
(nnsa.model or keras.model): model object.
"""
if self._all_models is None:
self._all_models = self._load_all_models(verbose=self.parameters['verbose'])
return self._all_models
[docs] @staticmethod
def default_parameters():
"""
Return the default parameters as a dictionary.
Returns:
(nnsa.Parameters): a default set of parameters for the model.
"""
# Default parameters.
pars = {
# Which model (set on init).
'which': None,
# Whether to detect novelties in data inputs when predicting.
'detect_novelties': True,
# Stepsize in seconds (if None, stepsize = segment_length).
'stepsize': None,
# Number of inputs channels (only for which='v1').
'num_channels': 8,
# Verbosity level (for loading models).
'verbose': 1,
}
return Parameters(**pars)
@property
def data_requirements(self):
"""
Return a dictionary with requirements for the input of the CNN.
Returns:
data_requirements (dict): a dictionary with requirements for the input of the CNN
(channel order, fs, segment length).
"""
# Extract parameters (data requirements depend on input parameters).
num_channels = self.parameters['num_channels']
which = self.parameters['which']
if which in ['v1', 'v2']:
fs = 64
segment_length = 30
reference_channel = 'Cz'
if num_channels == 8:
channel_order = ['Fp1', 'Fp2', 'C3', 'C4', 'T3', 'T4', 'O1', 'O2']
else:
raise NotImplementedError('Not implemented for num_channels={} and which={}.'
.format(num_channels, which))
else:
raise NotImplementedError('Implement data_requirements for which="{}".'.format(which))
data_requirements = dict(
# Order of the channels in the input tensor.
channel_order=channel_order,
# The sample frequency (Hz).
fs=fs,
# The segment length (seconds).
segment_length=segment_length,
# Reference for EEG data.
reference_channel=reference_channel,
)
return data_requirements
@property
def normalization_parameters(self):
"""
Return the normalization parameters.
Returns:
(tuple): normalization parameters.
"""
if self._normalization_parameters is None:
self._normalization_parameters = self._load_normalization_parameters()
return self._normalization_parameters
[docs] def preprocess(self, eeg, fs, axis=1, verbose=1):
"""
Preprocess, normalize, segment.
Args:
see self.process().
Returns:
x (np.ndarray): array with shape (n_seg, n_time, n_channels).
"""
if eeg.ndim != 2:
raise ValueError(f'Expected a 2D input. Got an input with shape {eeg.shape}.')
# Make sure the shape of the EEG is (n_time, n_channels).
if axis in [-1, 1]:
# Transpose to (n_time, n_channels).
eeg = eeg.T
# Extract some info.
channel_order = self.data_requirements['channel_order']
segment_length = self.data_requirements['segment_length']
which = self.parameters['which']
stepsize = self.parameters['stepsize']
stepsize = segment_length if stepsize is None else stepsize
# Make sure that the number of channels matches the length of channel order.
if eeg.shape[1] != len(channel_order):
raise ValueError(f'Expected EEG with {len(channel_order)} channels, but got {eeg.shape[1]} channels. '
f'Expected channel order is: {channel_order}.')
if which in ['v1', 'v2']:
fs_req = self.data_requirements['fs']
if fs != fs_req:
if fs < fs_req:
raise ValueError(f'`fs` ({fs}) lower than required sampling frequency ({fs_req}).')
# Resample.
if verbose:
print('Resampling...')
eeg = resample_by_filtering(x=eeg, fs=fs, fs_new=fs_req, axis=0)
# Normalize.
if verbose:
print('Normalizing...')
norm_pars = self.normalization_parameters
eeg = (eeg - norm_pars['sub']) / norm_pars['den']
# Set NaNs to zeros.
eeg = np.nan_to_num(eeg)
# Segment.
if verbose:
print('Segmenting...')
x = get_all_segments(eeg, segment_length=segment_length,
overlap=segment_length - stepsize, fs=fs_req, axis=0)
else:
raise NotImplementedError(f'Not implemented for which="{which}".')
return x
[docs] def process(self, eeg, fs, batch_size=None, axis=-1, verbose=1):
"""
Predict brain age.
Includes preprocessing (filtering, segmentation, normalization)
Note that the channel order in `eeg` must be according to self.data_requirements['channel_order'].
Args:
eeg (np.ndarray): 2D-array with raw (multichannel) EEG data. The channel order must be
the same as specified in self.data_requirements.
fs (float): sampling frequency of EEG.
batch_size (int, None): number of segments to proces at once.
If None, processes all segments at once, but in case of memory issues (for long recordings),
you may set this value to e.g. 7200 to process 7200 segments (1h of EEG) at a time.
axis (int): the time axis of `eeg`.
verbose (int): verbosity level.
Returns:
(BrainAgeResult): nnsa object containing the results of the brain age predictor.
"""
if verbose > 1:
print(HORIZONTAL_RULE)
print(f'Running {self.__class__.__name__} with parameters:')
print(self.parameters)
# Prepare input for the networks.
if verbose:
print('Preprocessing...')
x = self.preprocess(eeg=eeg, fs=fs, axis=axis, verbose=verbose > 1)
# Detect novelties if requested.
detect_novelties = self.parameters['detect_novelties']
if detect_novelties and self.parameters['which'] == 'v1':
# No novelty model trained for v1.
warnings.warn(f'\nNo novelty detection possible for version {self.parameters["which"]}.')
detect_novelties = False
if detect_novelties is True:
if verbose > 1:
print('Detecting novelties...')
features = self._compute_novelty_features(x, verbose=verbose > 1)
is_novelty, novelty_scores = self._detect_novelties(features=features, verbose=verbose)
else:
is_novelty, novelty_scores = None, None
# Predict with each model.
bar = pyprind.ProgBar(len(self.all_models), stream=sys.stdout)
y_pred_all = []
batch_size = batch_size if batch_size is not None else len(x)
if verbose:
print('Predicting...')
for model_name, model in self.all_models.items():
# Process in chunks to guard memory.
y_pred_model = []
for idx_start in range(0, len(x), batch_size):
# Select chunk.
idx_stop = min([idx_start + batch_size, len(x)])
x_chunk = x[idx_start: idx_stop]
# Predict.
# model(x) is preferred over model.predict() as explained here: https://stackoverflow.com/questions/66271988/warningtensorflow11-out-of-the-last-11-calls-to-triggered-tf-function-retracin
y_pred_chunk = model(x_chunk, training=False)
y_pred_model.append(y_pred_chunk)
# Concatenate chunks.
y_pred_model = np.concatenate(y_pred_model, axis=0)
y_pred_all.append(y_pred_model)
if verbose:
bar.update()
y_pred_all = np.dstack(y_pred_all) # (n_seg, n_out, n_models)
# Select first (and only) output (the FBA estimate).
brain_age = y_pred_all[:, 0, :]
if np.nanmean(brain_age) < 20:
msg = 'Failed, not saving, y_pred_all contains strange values:\n{}'.format(y_pred_all)
raise ValueError(msg)
# Determine segment start times.
segment_length = self.data_requirements['segment_length']
stepsize = self.parameters['stepsize']
stepsize = segment_length if stepsize is None else stepsize
segment_start_times = get_segment_times(
num_segments=len(x), segment_length=segment_length,
overlap=segment_length - stepsize, offset=0)
model_names = list(self.all_models.keys())
# Return as BrainAgeResult object.
return BrainAgeResult(brain_age=brain_age,
is_novelty=is_novelty,
novelty_scores=novelty_scores,
model_names=model_names,
fs=1/stepsize,
segment_start_times=segment_start_times,
segment_end_times=segment_start_times + segment_length,
algorithm_parameters=self.parameters)
[docs] def brain_age_sinc(self, eeg_ds, verbose=1):
"""
TODO
Args:
x (np.ndarray): TODO
verbose (int, optional): verbose level (0 or 1).
Default to 1.
Returns:
(BrainAgeSincResult): nnsa object containing the results of the CNN brain age predictor.
"""
raise DeprecationWarning('Deprecated... Use self.process()')
if verbose > 0:
print(HORIZONTAL_RULE)
print('Predicting brain age using {} with parameters:'.format(self.__class__.__name__))
print(self.parameters)
parameters = self.parameters
data_requirements = self.data_requirements
norm_pars = self.normalization_parameters
# Preprocess.
eeg_ds = eeg_ds.reference('Cz').resample(
fs_new=data_requirements['fs'], method='polyphase_filtering')
# Get channel data as array in correct order.
eeg_array = eeg_ds.asarray(channels=data_requirements['channel_order'], channels_last=True)
# Normalize.
eeg_array = (eeg_array / 1e6 - norm_pars['sub']) / norm_pars['den']
# Set NaNs to zeros.
eeg_array = np.nan_to_num(eeg_array)
# Segment the EEG signals (all channels simultaneously).
segment_length = data_requirements['segment_length']
stepsize = parameters['stepsize']
stepsize = segment_length if stepsize is None else stepsize
fs = eeg_ds.fs
# Dimensions (segments, time, channels).
x = get_all_segments(eeg_array, segment_length=segment_length,
overlap=segment_length - stepsize, fs=fs, axis=0)
# predict
predicted_pma = self.model.predict(x)
# aggregate
y_pred = np.median(predicted_pma, axis=-1)
segment_start_times = get_segment_times(
num_segments=len(x), segment_length=segment_length,
overlap=segment_length - stepsize, offset=eeg_ds.time_offset) # Time offset will be added by self._postprocess_result.
# Return as SleepStageCnn object.
return BrainAgeSincResult(brain_age=y_pred,
fs=1/stepsize,
segment_start_times=segment_start_times,
segment_end_times=segment_start_times + segment_length,
algorithm_parameters=self.parameters)
def _compute_novelty_features(self, x, verbose=1):
"""
Compute features for novelty detection.
Args:
x (np.ndarray): array with preprocessed data with shape (n_seg, n_time, n_channels).
verbose (int): verbosity level.
Returns:
features (pd.DataFrame): dataframe with features for each segment.
"""
# Extract some info.
segment_length = self.data_requirements['segment_length']
fs = self.data_requirements['fs']
which = self.parameters['which']
stepsize = self.parameters['stepsize']
stepsize = segment_length if stepsize is None else stepsize
if which == 'v2':
# Reshape back to (n_seg, n_channels) (for feature computation function).
eeg = x.reshape(-1, x.shape[-1])
# Compute features (n_feat, n_chan, n_seg).
features, feature_labels = compute_features(
eeg, fs, window=segment_length, which='S2', stepsize=stepsize, verbose=verbose)
# To shape (n_seg, n_chan, n_feat).
features = np.swapaxes(features, axis1=0, axis2=2)
# Aggregate features over channels to shape (n_segments, n_features).
features = pd.DataFrame(np.nanmean(features, axis=1), columns=feature_labels)
else:
msg = '\nNo novelty detection trained for which="{}".'.format(which)
warnings.warn(msg)
features = None
return features
def _detect_novelties(self, features, verbose=1):
"""
Args:
features (np.ndarray): dataframe with features of each segment, has shape (n_seg, n_features).
verbose (int): verbosity level.
Returns:
is_novelty (np.ndarray): array with same length as `features` (n_seg,) containing True
at locations where there is an out-of-data segment.
novelty_scores (np.ndarray): array with sem length as `features` (n_seg,) containing
novlety scores for each segment.
"""
which = self.parameters['which']
if which == 'v2':
# Load novelty detection model.
model_name = 'IF_0.1'
novelty_model_path = os.path.join(
MODEL_DATA_DIR, 'brain_age_sinc', 'v2',
f'novelty_detection_{model_name}.joblib')
novelty_model = joblib.load(novelty_model_path)
# Extract parts of model.
feature_names, scaler, pca, clf = novelty_model
# Extract the needed features in correct order (check).
X = features[feature_names]
# Normalize.
X = pd.DataFrame(scaler.transform(np.nan_to_num(np.log(X), neginf=0, posinf=0)), columns=feature_names)
# PCA.
X = pca.transform(X)
# Predict (1 inlier, -1 outlier).
is_novelty = clf.predict(X) == -1
novelty_scores = -clf.score_samples(X)
else:
msg = '\nNo novelty detection trained for which="{}".'.format(which)
warnings.warn(msg)
is_novelty = None
if verbose and is_novelty is not None:
print(f'{np.nanmean(is_novelty) * 100:.1f}% novelties detected.')
return is_novelty, novelty_scores
def _load_normalization_parameters(self):
"""
Load normalization parameters.
Returns:
norm_pars (dict): dict with 'sub' and 'den'. Normalization will occur by subtracting
`sub` from the the EEG data and then dividing by `den`. Units should be uV.
"""
which = self.parameters['which']
if which == 'v1':
# Get normalization parameters.
norm_fp = os.path.join(
MODEL_DATA_DIR, 'brain_age_sinc', 'v1',
'brainagemodel', 'pretrained', 'norm.pkl')
norm_pars = pickle_load(norm_fp)
# Convert to uV.
mean = norm_pars['sub'] * 1e6
sd = norm_pars['den'] * 1e6
elif which == 'v2':
# Load normalization parameters.
norm_fp = os.path.join(MODEL_DATA_DIR, 'brain_age_sinc', 'v2',
'eeg_data_normpars.pkl')
normpars = pickle_load(norm_fp)
# These are per channel. Expand dimensions that refers to time dimension of EEG.
mean = np.expand_dims(normpars['mean'], 0)
sd = np.expand_dims(normpars['sd'], 0)
# Check if we need to compute one parameters for all channels.
normalize_per_channel = False # This was set to False for 'v2'.
if not normalize_per_channel:
# Global mean is mean of piecewise means.
mean = np.mean(mean, axis=-1, keepdims=True)
# Global SD is root of the mean of piecewise variances.
sd = np.sqrt(np.mean(sd ** 2, axis=-1, keepdims=True))
else:
raise NotImplementedError(f'Normalization parameters not implemented for which="{which}".')
# Collect normalization parameters.
norm_pars = {
'sub': mean,
'den': sd,
}
return norm_pars
def _load_all_models(self, verbose=1):
"""
Load all models in ensemble.
Args:
verbose (int): verbosity level.
Returns:
all_models (dict): dict with all loaded (uncompiled) models.
"""
from tensorflow.python.keras.models import load_model
which = self.parameters['which']
if which == 'v1':
num_channels = self.parameters['num_channels']
all_model_paths = get_filepaths(
directory=os.path.join(
MODEL_DATA_DIR, 'brain_age_sinc', 'v1',
'brainagemodel', 'pretrained', 'models'),
pattern=f'sincmodel_ch{num_channels}_sn*',
raise_error=True,
)
elif which == 'v2':
all_model_paths = get_filepaths(
directory=os.path.join(
MODEL_DATA_DIR, 'brain_age_sinc', 'v2', 'models'),
pattern=f'R_*.h5',
raise_error=True,
)
else:
raise NotImplementedError(f'_load_all_models not implemented for which="{which}".')
if verbose:
print(f'Loading ensemble of {len(all_model_paths)} Sinc models...')
# Load into dict.
all_models = dict()
bar = pyprind.ProgBar(len(all_model_paths), stream=sys.stdout)
for fp in all_model_paths:
key = os.path.splitext(os.path.basename(fp))[0]
all_models[key] = load_model(fp, compile=False)
if verbose:
bar.update()
return all_models
[docs]class BrainAgeResult(ResultBase):
"""
Result class for (ensemble of) brain age estimates.
Args:
brain_age (np.ndarray): array with shape (n_segment, n_models). If only one model is used, n_models can be 1.
algorithm_parameters (nnsa.Parameters): see ResultBase.
fs (float): 1/segment length in Hz.
is_novelty (np.ndarray, None): optional boolean array with shape (n_segments,) indicating which segmentes were
classified as novelties (out-of-data).
novelty_scores (np.ndarray, None): optional array with shape (n_segments,) containing novelty scores.
model_names (list): optional list with names for models. Should have length `n_models`.
data_info (str, optional): see ResultBase.
segment_start_times (np.ndarray, optional): see ResultBase.
segment_end_times (np.ndarray, optional): see ResultBase.
"""
def __init__(self, brain_age, algorithm_parameters, fs=None,
is_novelty=None, novelty_scores=None, model_names=None, data_info=None,
segment_start_times=None, segment_end_times=None, time_offset=0):
# Call parent's __init___.
super().__init__(algorithm_parameters=algorithm_parameters, data_info=data_info,
segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs,
time_offset=time_offset)
if brain_age.ndim == 1:
# Assume 1 model.
brain_age = brain_age.reshape(-1, 1)
if brain_age.ndim != 2:
raise ValueError('Expected `brain_age` to have 2D shape (n_segments, n_models), but'
f' got shape {brain_age.shape}.')
# Defaults.
n_models = brain_age.shape[-1]
if model_names is None:
model_names = [f'M{ii}' for ii in range(n_models)]
data_shape = brain_age.shape
if segment_start_times is not None and len(segment_start_times) != data_shape[0]:
raise ValueError('Length of segment_start_times ({}) is not equal to the number of segments in the data '
'({})'.format(len(segment_start_times), data_shape[0]))
# Store variables that are not already stored by the parent class (ResultBase).
self.brain_age = brain_age
self.is_novelty = is_novelty
self.novelty_scores = novelty_scores
self.model_names = model_names
@property
def num_segments(self):
"""
Return the number of segments.
Returns:
(int): number of segments.
"""
return self.brain_age.shape[self.time_axis]
@property
def time_axis(self):
# Segment/time axis of self.brain_age.
return 0
@property
def y_pred(self):
"""
"""
return self.brain_age
[docs] def to_df(self):
"""
Convert segment-wise results to pandas DataFrame.
"""
n_segments = self.num_segments
y_pred = self.y_pred
data = {
'Starttime': self.segment_start_times,
'Endtime': self.segment_end_times,
'Novelty': self.is_novelty if self.is_novelty is not None else np.full(n_segments, fill_value=np.nan),
'FBA (median)': np.nanmedian(y_pred, axis=-1),
'FBA (SD)': np.nanstd(y_pred, axis=-1),
'FBA (range)': np.nanmax(y_pred, axis=-1) - np.nanmin(y_pred, axis=-1)
}
for i_model, model_name in enumerate(self.model_names):
data[f'FBA ({model_name})'] = y_pred[:, i_model]
df = pd.DataFrame(data, index=pd.Index(np.arange(n_segments), name='Segment'))
return df
[docs] def plot(self, time_scale=None, color=None, ax=None, novelty_kwargs=None, **kwargs):
"""
Simple plot of brain age.
Args:
time_scale (str, optional): time scale, see nnsa.data.plotting.convert_time_scale().
Defaults to 'hours'.
color (str): color to plot in.
ax (plt.Axes, optional): axes object to plot in. If None, plots in the current axes.
Defaults to None
novelty_kwargs (dict): kwargs for plt.scatter for indicating novelties.
kwargs: for plt.plot().
"""
if ax is None:
plt.figure()
ax = plt.subplot(111)
else:
# Set current axis.
plt.sca(ax)
if novelty_kwargs is None:
novelty_kwargs = dict()
# Get time array.
time = self.segment_times
# Plot (median + IQR).
plot_kwargs = dict({'marker': 'o', 'color': color}, **kwargs)
y_low, y_mid, y_high = np.nanquantile(self.y_pred, [0.25, 0.5, 0.75], axis=-1)
plt.plot(time, y_mid, **plot_kwargs)
plt.fill_between(time, y_low, y_high, color=color, alpha=0.5)
if self.is_novelty is not None:
mask = self.is_novelty
scatter_kwargs = dict({'marker': 'o', 'color': 'C3'}, **novelty_kwargs)
plt.scatter(time[mask], y_mid[mask], zorder=100, **scatter_kwargs)
plt.title('Brain age trend')
plt.xlabel('Time ({})'.format(time_scale))
plt.ylabel('FBA (weeks)')
format_time_axis(time_scale=time_scale, ax=ax)
[docs] def remove_unreliabilities(self, aci_per_seg, aci_thres=50,
min_num_segments=3, how='remove', verbose=1):
"""
Return an array with FBA estimates for each segment and model, but without the
segments containing (too much) artefacts and novelties.
Args:
aci_per_seg (np.ndarray): array with length n_segments containing the
artefact contamination index (%) for each segment.
If None, does not check for artefacts (only novelties).
aci_thres (float): threshold for ACI (%).
min_num_segments (int): minimum number of consecutive reliable segments.
how (str): what to do with unreliabilities.
'remove': removes the segments from the array.
'nan': replaces the unreliable segments with nan.
verbose (int): verbosity level.
Returns:
y_pred_ens (np.ndarray): array with shape (n_segments, n_models) containing
selected FBA estimates (novelties, artefact segments removed).
"""
y_pred_ens = self.y_pred.copy()
if aci_per_seg is not None:
aci_per_seg = np.asarray(aci_per_seg).squeeze()
if len(aci_per_seg) != len(y_pred_ens):
raise ValueError('Length of `aci_per_seg` should be {}. Got length {}.'
.format(len(y_pred_ens), len(aci_per_seg)))
else:
warnings.warn('No artefact contamination index provided. Only removing novelties.')
# Select segments.
keep_mask = select_segments(
aci=aci_per_seg, novelty_score=None, is_novelty=self.is_novelty,
aci_thres=aci_thres, novelty_score_thres='auto',
verbose=verbose)
# Check if enough usable segments remain.
if (keep_mask is not None) and \
(np.max(np.convolve(keep_mask, np.ones(min_num_segments), 'valid')) < min_num_segments):
msg = '\nToo few usable segments to get a reliable estimate for FBA.'
warnings.warn(msg)
keep_mask = np.full(keep_mask.shape, fill_value=False)
if keep_mask is not None:
if how in ['remove', 'exclude']:
y_pred_ens = y_pred_ens[keep_mask]
elif how.lower() in ['nan']:
y_pred_ens[~keep_mask] = np.nan
else:
raise ValueError(f'Invalid option how="{how}". Choose "remove" or "nan".')
return y_pred_ens
[docs] def robust_fba(self, aci_per_seg=None, verbose=1):
"""
Robust FBA prediction by selecting segments without artefacts or novelties.
Args:
aci_per_seg (np.ndarray): array with length n_segments containing the
artefact contamination index (%) for each segment.
If None, only the novelties are used for robust FBA.
verbose (int): verbosity level.
Returns:
data (pd.Series): results (median, some quantiles, and the FBAs of the selected segments).
"""
aci_per_seg = np.asarray(aci_per_seg).squeeze()
which = self.algorithm_parameters['which']
if which == 'v2':
aci_thres = 50
min_num_segments = 3
y_pred_ens = self.remove_unreliabilities(
aci_per_seg=aci_per_seg, aci_thres=aci_thres,
min_num_segments=min_num_segments, verbose=verbose)
if y_pred_ens.size == 0:
y_pred_ens = [np.nan]
with warnings.catch_warnings():
# I expect to see RuntimeWarning: All-NaN slice encountered.
warnings.simplefilter("ignore", category=RuntimeWarning)
# Aggregate segments and models.
q3, q25, q50, q75, q98 = np.nanquantile(y_pred_ens, q=[0.025, 0.25, 0.5, 0.75, 0.975])
# Percentage artefacts and novelties.
perc_af = np.mean(aci_per_seg > aci_thres) * 100 if aci_per_seg is not None else np.nan
perc_nov = np.mean(self.is_novelty)*100 if self.is_novelty is not None else np.nan
# Collect results in Series.
data = pd.Series({
'FBA': q50,
'q2.5': q3,
'q25': q25,
'q75': q75,
'q97.5': q98,
'Artefacts (%)': perc_af,
'Novelties (%)': perc_nov,
})
else:
raise ValueError(f'No robust FBA for brain age computed with which="{which}".')
return data
def _merge(self, other):
"""
See ResultBase.
"""
self.brain_age = np.concatenate((self.brain_age, other.brain_age), axis=self.time_axis)
if self.is_novelty is not None:
self.is_novelty = np.concatenate((self.is_novelty, other.is_novelty), axis=0)
if self.novelty_scores is not None:
self.novelty_scores = np.concatenate((self.novelty_scores, other.novelty_scores), axis=0)
@staticmethod
def _read_from_hdf5(filepath):
"""
Read the HDF5 file created by self._write_to_hdf5.
Returns:
result (BrainAgeResult): loaded result class.
"""
# Read standard csv header (use the ResultBase method).
algorithm_parameters, data_info, segment_start_times, segment_end_times, fs, time_offset = \
ResultBase._read_hdf5_header(filepath)[1:]
# Re-open the file and read the rest of the file.
with h5py.File(filepath, 'r') as f:
# Read array data.
brain_age = f['brain_age'][:]
model_names = [lab.decode() for lab in f['brain_age'].attrs['model_names']]
if 'is_novelty' in f:
is_novelty = f['is_novelty'][:]
else:
is_novelty = None
if 'novelty_scores' in f:
novelty_scores = f['novelty_scores'][:]
else:
novelty_scores = None
# Create a result object.
result = BrainAgeResult(brain_age=brain_age,
is_novelty=is_novelty,
novelty_scores=novelty_scores,
model_names=model_names,
fs=fs,
segment_start_times=segment_start_times,
segment_end_times=segment_end_times,
algorithm_parameters=algorithm_parameters,
time_offset=time_offset,
data_info=data_info,
)
return result
def _write_to_hdf5(self, filepath):
"""
Write the contents of the object to an hdf5 file.
Args:
filepath (str): see ResultBase._write_to_hdf5().
"""
# Write standard hdf5 header (use the ResultBase method).
self._write_hdf5_header(filepath)
# Append attributes to the hdf5 file.
with h5py.File(filepath, 'a') as f:
# Write array data.
f.create_dataset('brain_age', data=self.brain_age)
f['brain_age'].attrs['model_names'] = [np.string_(label) for label in self.model_names]
if self.is_novelty is not None:
f.create_dataset('is_novelty', data=self.is_novelty)
if self.novelty_scores is not None:
f.create_dataset('novelty_scores', data=self.novelty_scores)