import numpy as np
from nnsa.artefacts.artefact_detection import detect_flatlines
__all__ = [
'combine_channels',
]
from nnsa.utils.event_detections import time_threshold
[docs]def combine_channels(x, preferred=None, min_nan_len=None):
"""
Combines multiple channels of a signal into one,
by cutting the signals into pieces and pasting them together in an optimal way.
Segments the data based on the onsets and offsets of nans (disturbances).
Then, for each segment the best channel is chosen, which is the channel with the least amount of
NaNs.
When multiple channels have the same amount of NaNs (e.g. zero), then `preferred` is used to prefer
one signal over the others (see Args).
Args:
x (np.ndarray): 2D array with dimensions (channels, time) containing multiple channels of a certain signal.
preferred (int or list or function, optional): indices for the channel dimension of `x` that specify which
channels are preferred. If an int, specifies that a single channel is preferred. If a list,
specifies a priority list of channels. If None, a channel is preferred such that the amount
of switches between channels is minimized (sort of). If a function, takes in the two signals as a
(n_channels, n_time) array and returns the index of the preferred signal segment.
min_nan_len (int, optional): minimum number of consecutive nan samples to consider it as a disturbance and
hence to consider it as a location to switch to another channel.
If None, takes 5% of the total lenght.
Returns:
x_combined (np.ndarray): 1D array with dimension (time,) containing the combined signal.
Examples:
>>> x = np.random.rand(3, 1000)
>>> x[0, :200] = np.nan
>>> x[1, 300:700] = np.nan
>>> x[1, 250:260] = np.nan
>>> x[0, 600:] = np.nan
>>> preferred = [1, 2, 0]
>>> x_combined = combine_channels(x, preferred=preferred)
# plt.figure()
# ax = plt.subplot(211)
# plt.plot(x.T)
# plt.subplot(212, sharex=ax)
# plt.plot(x.T)
# plt.plot(x_combined, '--')
"""
# Input check.
x = np.asarray(x)
if x.ndim != 2:
raise ValueError('Expected input to be 2-dimensional. Got {} dimensions.'.format(x.ndim))
if np.diff(x.shape) < 0:
raise AssertionError('Expected input to have size (channels, time). '
'Got an array with shape {} which is probably incorrect.'.format(x.shape))
if min_nan_len is None:
# By default take 5% of the entire length.
min_nan_len = int(x.shape[-1]*0.05)
nan_masks = np.isnan(x)
# Make a copy of x.
x = np.copy(x)
x_original = np.copy(x)
# For each signal, get start and end indices of nans, these are the boundaries of the segments.
boundaries = []
for mask in nan_masks:
# Remove nans that are short and can be ignore.
mask = time_threshold(mask, min_duration=min_nan_len)
bd = np.where(np.diff(mask) != 0)[0] + 1
boundaries.extend(bd)
# Add begin and end of signal as boundaries.
boundaries = np.concatenate((boundaries, [0, x.shape[-1]])).astype(int)
# Sort boundaries.
boundaries = np.sort(boundaries)
# Initialize idx_preferred (the index of the preferred signal in case of ties in the loop below).
if preferred is None:
# By default, initially prefer signal with least amount of NaNs.
idx_preferred = np.argsort(np.sum(np.isnan(x), axis=-1))
elif callable(preferred):
# is function.
idx_preferred = None
else:
# Make sure preferred is list- or array-like.
if not hasattr(preferred, '__len__'):
# E.g. when preferred is a single index.
preferred = [preferred]
idx_preferred = preferred
# For each segment, choose the one with the least amount of nans.
x_combined = []
for (start_idx, stop_idx) in zip(boundaries[:-1], boundaries[1:]):
x_seg = x[:, start_idx: stop_idx]
num_nans = np.sum(np.isnan(x_seg), axis=-1)
idx_min = np.argmin(num_nans)
# If more than 1 signal has the least amount of nans, prefer one particular signal over the other.
if np.sum(num_nans == num_nans[idx_min]) > 1:
idx_min_all = np.where(num_nans == num_nans[idx_min])[0]
if callable(preferred):
ii_idx_min = preferred(x_seg[idx_min_all, :]) # Should return index of row that is preferred.
idx_min = idx_min_all[ii_idx_min]
else:
for idx_pref in idx_preferred:
if idx_pref in idx_min_all:
idx_min = idx_pref
break # Exit loop.
x_combined.append(x_original[idx_min, start_idx: stop_idx])
if preferred is None:
# Update idx_preferred such that the signal chosen for the previous segment is preferred.
idx_preferred = [idx_min]
x_combined = np.concatenate(x_combined)
return x_combined