Source code for nnsa.decompositions.side_obsp

"""
This module implements the SIgnal DEcomposition based on Oblique Subspace Projections (SIDE-ObSP).

This code was ported from Alexander Caicedo's Matlab code by Tim Hermans.

References:
    A. Caicedo, C. Varon, B. Hunyadi, M. Papademetriou, I. Tachtsidis, and S. V. Huffel,
    “Decomposition of Near-Infrared Spectroscopy Signals Using Oblique Subspace Projections: Applications in Brain Hemodynamic Monitoring,”
    Frontiers in Physiology, vol. 7, Nov. 2016, doi: 10.3389/fphys.2016.00515.
"""
import numpy as np
import pyprind
import scipy.io
import scipy.linalg

from sklearn.metrics import mean_squared_error

from nnsa.models.regressors import LinReg
from nnsa.training.cross_validation import cross_validate
from nnsa.utils.dummy_data import generate_timeseries
from nnsa.utils.objects import basic_repr


__all__ = [
    'SideObsp',
    'create_block_hankel',
]


[docs]class SideObsp(object): """ Class implementing SIgnal DEcomposition based on Oblique Subspace Projections. Args: order (int, optional): order of the moving average model (number of taps). This is parameter `p` in Eq. 12. The order is equal for all inputs. This parameter can be set manually, or found automatically using cross-validation with the fit_hyper() method. Defaults to None. regularizer (str or None, optional): regulizer to use for the linear model. See LinReg() for options. If None, no regularization is applied. Defaults to 'smooth'. gammas (np.ndarray, optional): regularization constant for each input. This is parameter `gamma` in Eq. 10. This parameter can be set manually, or found automatically using cross-validation with the fit_hyper() method. Not that the length of gammas must be equal to the number of input signals that will be used for the decomposition. Is not needed if regularizer is None. Defaults to None. References: A. Caicedo, C. Varon, B. Hunyadi, M. Papademetriou, I. Tachtsidis, and S. V. Huffel, “Decomposition of Near-Infrared Spectroscopy Signals Using Oblique Subspace Projections: Applications in Brain Hemodynamic Monitoring,” Frontiers in Physiology, vol. 7, Nov. 2016, doi: 10.3389/fphys.2016.00515. Examples: >>> n = 1024 >>> signal = generate_timeseries(n) >>> noise = generate_timeseries(n) >>> X = [signal, noise] >>> y = signal + 2*noise + np.random.rand(n) >>> y_hat = SideObsp(regularizer='smooth').fit_hyper(X, y, orders_all=[1, 2, 3, 4, 5]).fit_transform(X, y) >>> y_signal, y_noise = y_hat[:, 0], y_hat[:, 1] >>> y_denoise = y - y_noise """ def __init__(self, order=None, regularizer='smooth', gammas=None): self._order = order self.regularizer = regularizer self._gammas = gammas self._h = None def __repr__(self): return basic_repr(self) @property def order(self): """ Return the model order. """ if self._order is None: raise ValueError('`order` not defined. Set it manually or use the `fit_hyper` method to find it using ' 'cross validation.') return self._order @order.setter def order(self, order): self._order = order @property def gammas(self): """ Return the regularization constant for each input. """ if self._gammas is None: if self.regularizer is None or self.regularizer.lower() == 'ols': self._gammas = None else: raise ValueError('`gammas` not defined. Set it manually or use the `fit_hyper` method to find it using ' 'cross validation.') return self._gammas @gammas.setter def gammas(self, gammas): self._gammas = np.asarray(gammas) @property def h(self): """ Return the impulse responses for each input (the impulses are flipped). Returns: (np.ndarray): array with shape (order, num_inputs). """ if self._h is None: raise ValueError('`h` not defined. Fit the model first using the `fit` method.') return self._h
[docs] def fit(self, X, y, k=None): """ Fit the coefficient vector(s) h (impulse responses) for the kth input(s). Args: X (list or np.ndarray): regressor matrix with shape (n_samples, n_inputs). Can also be a list with n_inputs arrays of length n_samples. y (list or np.ndarray): the signal to decompose. Must be a list or 1D array with length n_samples. k (int or list, optional): index specifying for which signals in X to to find the impulse responses that can be used to estimate the linear contribution of that signal to the output. If None, the impulse responses are computed for each input. Defaults to None. Returns: self: the object itself, but with fitted coefficients. """ # Extract model parameters. order = self.order regularizer = self.regularizer gammas = self.gammas # Prepare inputs. X, y = self._prepare_inputs(X, y) # Number of input signals. n_inputs = X.shape[1] # By default, fit for all inputs. if k is None: k = list(range(n_inputs)) # k must be a list, but a single integer is also allowed. if not isinstance(k, list): k = [k] # Gammas can be None if no regularization is to be applied. if gammas is None: gammas = np.zeros(n_inputs) # Initialize impulse responses. self._h = np.full((order, n_inputs), fill_value=np.nan) # Create block matrix A (Eq. 13). A = create_block_hankel(X, p=order) y = y[order - 1:] # Loop over input signals/regressors. for k_i in k: k_i = int(k_i) # Prepare/preprocess_eeg matrix and vector for solving linear system. Akp, yp = self._prepare_linear_system(A=A, y=y, p=order, k=k_i) # Initialize a linear regression model and fit it. model = LinReg(regularizer=regularizer, gamma=gammas[k_i]).fit(X=Akp, y=yp) # Extract coefficients from model. These are the impulse response. h = model.coef_ self._h[:, k_i] = h return self
[docs] def fit_transform(self, X, y, k=None): """ Shortcut to do a fit and transform the data in one go. """ return self.fit(X, y, k=k).transform(X, k=k)
[docs] def transform(self, X, k=None): """ Decompose the output y into contributions for each input signal in X using fitted impulse responses. This function can only be called after the model has been fit. It will raise an error otherwise. Args: X: see self.fit(). k: see self.fit(). Returns: y_hat (np.ndarray): the estimated linear contribution of each requested input regressor on the output `y`. Has shape (n_samples, len(k)). """ # Extract model parameters. order = self.order # Prepare inputs. X = self._prepare_inputs(X) # By default, transform for all inputs. if k is None: k = list(range(X.shape[1])) # k must be a list, but a single integer is also allowed. if not isinstance(k, list): k = [k] # Create block matrix A (Eq. 13). A = create_block_hankel(X, p=order) n_samples = A.shape[0] # Initialize output. y_hat = np.zeros((n_samples, len(k))) # Loop over input signals/regressors. for i, k_i in enumerate(k): k_i = int(k_i) # Get partition of matrix corresponding to input k_i. Ak = self._split_block_matrix(A, p=order, k=k_i)[0] # Get impulse response. hk = self.h[:, k_i] # Compute y_hat. y_hat[:, i] = Ak @ hk # Depending on the order, y_hat can be shorter than the original signals in X. Fix this by appending nans. if self.order != 1: y_hat = np.concatenate((np.full((self.order - 1, y_hat.shape[1]), fill_value=np.nan), y_hat)) return y_hat
[docs] def fit_hyper(self, X, y, orders_all=None, gammas_all=None, k_folds=10, verbose=2): """ Fit hyper parameters (order, gammas) using a grid search and k-fold cross validation. Args: X: see self.fit(). y: see self.fit(). orders_all (list or np.ndarray, optional): list or 1D array with orders to try. The order is parameter p in Eq. 12. If None, a list with default values is used if self.order is None, otherwise the order will be kept fixed to the current value. Defaults to None. gammas_all (list or np.ndarray, optional): list or 1D array with regularization constants to try. The regularization constant is parameter gamma (lower-case) in Eq. 10. If None, a list with default values is used if self.gammas is None, otherwise gammas will be kept fixed to the current value(s). Defaults to None. k_folds (int, optional): the number of folds for k-fold cross-validation. Defaults to 10. verbose (int): verbosity level. If 1, shows a progress bar. If 2, also prints the results. Returns: self: the object itself, but with fitted hyper parameters. """ if verbose > 1: print('Starting hyperparameter search...') # Prepare inputs. X, y = self._prepare_inputs(X, y) # Default inputs for gridsearch. if orders_all is None: if self._order is None: # Default list. orders_all = [1, 10, 20, 30, 40, 50] else: # Keep order fixed. orders_all = [self._order] elif not hasattr(orders_all, '__len__'): # Convert to list (assuming that just one number is given). orders_all = [orders_all] if gammas_all is None: if self._gammas is None: # Default list. gammas_all = np.logspace(-3, 5, 100) else: # Keep gammas fixed. gammas_all = [self._gammas] elif not hasattr(gammas_all, '__len__'): # Convert to list (assuming that just one number is given). gammas_all = [gammas_all] # orders_all and gammas_all must be an array. orders_all = np.asarray(orders_all) gammas_all = np.asarray(gammas_all) # Definition of some parameters. n_inputs = X.shape[1] # Number of inputs. # Now, we start fitting and cross validation regression models. # We do this # a) len(orders) times for each p parameter, # b) n_inputs times, each time taking another column/signal in A as the regressor, # eliminating the influence of all other signals in A, # c) len(gammas_all) times for each regularization constant. # Initialize arrays that contain metrics of the trained models. error = np.zeros((len(orders_all), len(gammas_all), n_inputs)) regularization = np.zeros_like(error) # Progress bar. bar = pyprind.ProgBar(len(orders_all)*n_inputs*len(gammas_all)) # For each model order. for i, p in enumerate(orders_all): # Create block matrix A (Eq. 13). A_i = create_block_hankel(X, p=p) y_i = y[p - 1:] # For each input signal. for k in range(n_inputs): # Prepare/preprocess_eeg matrix and vector for solving linear system. Akp, yp = self._prepare_linear_system(A=A_i, y=y_i, p=p, k=k) # For each regularization constant. for j, gamma in enumerate(gammas_all): # Initialize linear regression model with regularization possibilities. model = LinReg(regularizer=self.regularizer, gamma=gamma) # K-fold cross training and predictions. yp_pred, all_models = cross_validate( X=Akp, y=yp, model=model, n_folds=k_folds, how='random', verbose=0) # Fit error. error[i, j, k] = mean_squared_error(yp, yp_pred) # Mean regularization cost across models. regularization[i, j, k] = np.mean([m.compute_reg_cost() for m in all_models]) if verbose > 0: bar.update() # Total cost (fit error + regularization) cost = error + regularization # For each input signal/regressor, find the best model and extract the # corresponding order p and regularization coefficient. order_idx = np.zeros(n_inputs, dtype=int) gamma_idx = np.zeros_like(order_idx) for k in range(n_inputs): order_idx[k], gamma_idx[k] = np.unravel_index(np.argmin(cost[:, :, k]), cost.shape[:-1]) # Take maximum across all regressors (if we need a high delay on input k, we better use this also # on the other inputs that try to eliminate the influence of input k). self._order = np.max(orders_all[order_idx]) # Optimal regularization parameter for each input signal. self._gammas = gammas_all[gamma_idx] if verbose > 1: print('Finished hyperparameter search!') print('Order:', self.order) print('Gammas:', self.gammas) return self
@staticmethod def _prepare_inputs(X, y=None): """ Prepare inputs X and y by casting them into arrays and subtracting their means. Args: See self.fit(). Raises errors if inputs are invalid. """ # If X is a list of (presumably) arrays, concatenate. if isinstance(X, list): X = [xi.reshape(-1, 1) for xi in X] X = np.hstack(X) # Make sure it is an array. X = np.asarray(X) # Subtract mean (not necessary?). X = X - np.nanmean(X, axis=0, keepdims=True) if y is None: # Ignore y. return X else: # Make sure it is an array. y = np.asarray(y) # Remove singleton dimensions of y. y = y.squeeze() # Subtract mean (not necessary?). y = y - np.nanmean(y) # Check shapes. if len(X) != len(y) or X.ndim != 2: raise ValueError('Invalid shape for `X`: {}. `X` should have shape (len(y), n_inputs).' .format(X.shape)) if y.ndim != 1: raise ValueError('Too many dimensions for `y`: {}. `y` must be a 1D array.' .format(y.ndim)) return X, y @staticmethod def _prepare_linear_system(A, y, p, k): """ Prepare the linear system as in Eq. 15. Args: A (np.ndarray): array containing the concatenated block Hankel matrices of input signals (Eq. 13). Has shape (n_samples, n_signals * p). y (np.ndarray): output array (Eq. 13). Has shape (n_samples, ). p (int): truncation order of the individual block Hankel matrices (order of the MA model). k (int): the signal index for which the linear contribution needs to be found (Eq. 14). The first signal has index k=0. Returns: Akp (np.ndarray): prepared lhs of linear system (Q_(k)*Ak in Eq. 15). yp (np.ndarray): prepared rhs of linear system (Q_(k)*y in Eq. 15). """ # Replace NaNs with zeros. nan_A = np.any(np.isnan(A), axis=1) nan_y = np.isnan(y) nan_mask = np.logical_or(nan_A, nan_y) A[nan_mask, :] = 0 y[nan_mask] = 0 # Extract block matrices Ak and A_k_ from A (Eq.14). Ak, A_k_ = SideObsp._split_block_matrix(A, p, k) # Compute Q_k_ (Q_(k) in Eq. 2). P_k_ = A_k_ @ np.linalg.pinv(A_k_.T @ A_k_) @ A_k_.T Q_k_ = np.eye(Ak.shape[0]) - P_k_ # Project/wct inputs and output (Eq. 15). Akp = Q_k_ @ Ak yp = Q_k_ @ y return Akp, yp @staticmethod def _split_block_matrix(A, p, k): """ Split the block Hankel expansion matrix (A) into partitions related to the kth input (Ak) and the rest (A_k_). Args: A (np.ndarray): array containing the concatenated block Hankel matrices of input signals (Eq. 13). Has shape (n_samples, n_signals * p). p (int): truncation order of the individual block Hankel matrices (order of the MA model). k (int): the signal index for which the linear contribution needs to be found (Eq. 14). The first signal has index k=0. Returns: Ak (np.ndarray): partition of A relating to the input k (Eq. 14). A_k_ (np.ndarray): partition of A relation to other inputs (Eq. 14). """ # Check inputs. if (k + 1) * p > A.shape[1]: raise ValueError('Invalid parameters p={} and k={} for A with shape {}.'.format( p, k, A.shape )) # Extract the partitions. columns_k = range(k * p, (k + 1) * p) Ak = A[:, columns_k] A_k_ = np.delete(A, columns_k, axis=1) return Ak, A_k_
[docs]def create_block_hankel(X, p): """ Create a block Hankel expansion for signals in `X` with order `p`. Args: X (np.ndarray): array containing multiple signals with shape (n_samples, n_signals). p (int): truncation order (number of columns of the Hankel matrix after which to truncate). Returns: A (np.ndarray): array containing the concatenated block Hankel matrices of each signal in `X` (Eq. 13). Has shape (n_samples - p + 1, n_signals * p). """ # Initialize. N, n_inputs = X.shape A = [] # Loop over inputs. for k_input in range(n_inputs): # Create Hankel matrix with order p. hank = scipy.linalg.hankel(X[:N - p + 1, k_input], X[N - p:, k_input]) A.append(hank) # Stack Hankel matrices. A = np.hstack(A) return A