Source code for nnsa.models.regressors

"""
High-level interface for several regressor models.
Implemented in accordance with sklearn models.
"""
import warnings
from functools import partial

import numpy as np
import scipy.optimize
import statsmodels.api as sm

from nnsa.models.losses import regularization_loss, regularization_der

__all__ = [
    'DualRegression',
    'LinReg',
]


[docs]class DualRegression(object): """ Class to train a model that predicts y from multiple measurements. The model: h(X, a, b) = np.dot(f(X, a), g(X, b)) = y where: X is an array with shape n_measurements * n_features. y is the value to predict that corresponds to X. f(X, a) is a linear or logistic regression function that transforms X into a vector with length n_samples such that sum(f(X)) == 1. a is a vector with parameters for the regression. g(X, b) is a linear or logistic regression function that transforms X into a vector with length n_samples. b is a vector with parameters for the regression. Note that X (n_measurements, n_features) corresponds only to one value y and that n_measurements may vary between train/test data. The model is trained on a list of X matrices and y values. Intuition: g(X, a) tries to predict the value y from each measurement in X, where y is the same for each measurement. For some good measurements, y can possibly be predicted with high accuracy. For other, bad measurements (e.g. containing artefacts), the prediction may not be reliable. Therefore, a weighted sum is taken over the predictions of all measurements, where the weight for each measurement is computed by f(X, b). This weighted sum is trainable, i.e. the weight for each measurement is simply computed by another linear/logistic regression that maps the features to a weight. TODO Examples: """ def __init__(self, intercept=True, regulizer_a=None, regulizer_b=None, lambda_a=0.001, lambda_b=0.001, p_dropout=0.0): self.intercept = intercept self.regulizer_a = regulizer_a self.regulizer_b = regulizer_b self.lambda_a = lambda_a self.lambda_b = lambda_b self.p_dropout = p_dropout # Model coefficients. self._theta = None @property def theta(self): """ Theta is a vector containing all coefficients. If split into two equal parts, the first half contains the coefficients a for f(X, a) and the second half contains the coefficients b for g(X, b). If intercept is True, the first entry of a and b is the intercept. Returns: self._theta (np.ndarray): coefficient vector. """ if self._theta is None: raise AttributeError('Model {} was not fit. Coefficient vector `theta` is not set.' .format(self.__class__.__name__)) else: return self._theta
[docs] @staticmethod def compute_normalization_pars(X_list, method='standard'): """ Compute parameters to normalize the input data. Args: X_list (list): list with length n_samples, where entries are arrays with size (n_features, n_measurements). method (bool, optional): specify the method for scaling. If 'standard', center is the mean and scale is the standard deviation. This is not recommended if you are not using an intercept since the 1/(np.sum(aX)) term in the loss may blow up. If 'range', center is the 5th percentile and scale is the 95th percentile, so that scaled data have approximately a range between 0 and 1. Returns: center (float): the value with which to center the data. scale (float): the value with which to scale the data. """ # Standardize features. X_all = np.concatenate(X_list, axis=1) fmin = np.percentile(X_all, 5, axis=1, keepdims=True) fmax = np.percentile(X_all, 95, axis=1, keepdims=True) mu = np.mean(X_all, axis=1, keepdims=True) std = np.std(X_all, axis=1, keepdims=True) if method == 'standard': center = mu scale = std elif method == 'range': # DO NOT SUBTRACT MEAN, THE 1/(np.sum(aX)) TERM IN THE LOSS MAY BLOW UP. center = fmin scale = fmax else: raise ValueError('Invalid method="{}". Choose from {}.' .format(method, ['standard', 'range'])) return center, scale
[docs] def fit(self, X_list, y_list, theta_0=None, seed=None, **kwargs): """ Fit the model. Args: X_list (list): list with train inputs for self.predict(). Shape of X_list[i] must be (n_features, n_measurements). The length of the list is the number of samples, so each entry in the list is one sample. y_list (list): list with the same lenght as X_list containing the target output values. theta_0 (np.ndarray, optional): initial values for the coefficients. Shape is 2*n_features if self.intercept is False or 2*n_features+2 if self.intercept is True. If None, coefficients are initialized randomly. Defaults to None. seed (float, optional): random seed. If None, no seed is used. Defaults to None. **kwargs (optional): optional keyword arguments for the optimizer function. Returns: self: the current updated containing the fitted coefficients. """ if self.intercept: X_list = self._add_intercept(X_list) if seed is not None: np.random.seed(seed) if theta_0 is None: theta_0 = np.random.rand(2*X_list[0].shape[0]) # Wrap functions. loss_wrapped = partial(self._loss, X_list=X_list, y_list=y_list) loss_der_wrapped = partial(self._loss_der, X_list=X_list, y_list=y_list) # Call optimization function. theta_opt = scipy.optimize.fmin_bfgs(loss_wrapped, theta_0, fprime=loss_der_wrapped, **kwargs) # Save optimal theta in the object. self._theta = theta_opt return self
[docs] def predict(self, X_list): """ Predict output `y` from input `X`. Args: X_list (list): list with inputs for self.predict(). Shape of X_list[i] must be (n_features, n_measurements). The length of the list is the number of samples, so each entry in the list is one sample. One output value is predicted per sample. Returns: y_list (list): predicted values for the samples in `X_list`. """ if self.intercept: X_list = self._add_intercept(X_list) y_list = list() for X in X_list: y_list.append(self._predict(self.theta, X)) return y_list
[docs] @staticmethod def normalize_input(X_list, center, scale): """ Normalize the input `X_list`. Args: X_list (list): list with inputs for self.predict(). Shape of X_list[i] must be (n_features, n_measurements). The length of the list is the number of samples, so each entry in the list is one sample. center (float): the value with which to center the data. scale (float): the value with which to scale the data. Returns: X_list (list): normalized data. """ X_list = X_list.copy() for i in range(len(X_list)): X_list[i] -= center X_list[i] /= scale return X_list
@staticmethod def _add_intercept(X_list): """ Add intercept term (row of ones) to input data. Args: X_list (list): input data. Returns: X_list (list): input data containing an extra row of ones. """ X_list = X_list.copy() for i in range(len(X_list)): ones_row = np.ones((1, X_list[i].shape[1])) X_list[i] = np.concatenate((ones_row, X_list[i]), axis=0) return X_list def _loss(self, theta, X_list, y_list): """ Compute the loss function for given coefficients, inputs and outputs. Args: theta (np.ndarray): coefficients vector. X_list (list): list with inputs for self._predict(). Shape of X_list[i] must be (n_features, n_measurements). The length of the list is the number of samples. y_list (list): list with outputs corresponding to `X`. Returns: loss (float): loss. """ intercept = self.intercept regulizer_a = self.regulizer_a regulizer_b = self.regulizer_b lambda_a = self.lambda_a lambda_b = self.lambda_b # Squared residual. residuals = [] for X, y in zip(X_list, y_list): residuals.append(self._predict(theta, X) - y) loss = np.dot(residuals, residuals) / 2 # Regularization. a, b = self._split_theta(theta) if intercept: # Exclude intercept from regularization. a = a[1:] b = b[1:] if regulizer_a is not None: loss += regularization_loss(a, regulizer=regulizer_a, lamb=lambda_a) if regulizer_b is not None: loss += regularization_loss(b, regulizer=regulizer_b, lamb=lambda_b) return loss / len(y_list) def _loss_der(self, theta, X_list, y_list): """ Derivative of the loss function with respect to `theta`. Args: theta (np.ndarray): coefficients vector. X_list (list): list with length n_samples, where entries are arrays with size (n_features, n_measurements). y_list (list): list with length n_samples, containing outputs corresponding to the inputs. Returns: (np.ndarray): array with same size as `theta` containing partial derivatives of the loss with respectect to each coefficient in theta. """ intercept = self.intercept regulizer_a = self.regulizer_a regulizer_b = self.regulizer_b lambda_a = self.lambda_a lambda_b = self.lambda_b p_dropout = self.p_dropout # Derivative of loss to theta. a, b = self._split_theta(theta) dJda = np.zeros_like(a) dJdb = np.zeros_like(b) for X, y in zip(X_list, y_list): res = self._predict(theta, X) - y aX = np.dot(a, X) bX = np.dot(b, X) f = np.sum(aX * bX) g = np.sum(aX) dfda = np.sum(np.repeat(bX.reshape(1, -1), X.shape[0], 0) * X, axis=1) dgda = np.sum(X, axis=1) dfdb = np.sum(np.repeat(aX.reshape(1, -1), X.shape[0], 0) * X, axis=1) dresda = 1 / g ** 2 * (dfda * g - dgda * f) dresdb = 1 / g * dfdb dJda += res * dresda dJdb += res * dresdb if intercept: # Exclude intercept from regularization. a = a[1:] b = b[1:] if regulizer_a is not None: dJda[-len(a):] += regularization_der(a, regulizer=regulizer_a, lamb=lambda_a) if regulizer_b is not None: dJdb[-len(b):] += regularization_der(b, regulizer=regulizer_b, lamb=lambda_b) dJdtheta = np.concatenate([dJda, dJdb]) # Apply dropout: do not update some of the coefficients. idx_dropout = np.random.randint(0, len(theta), int(p_dropout * len(theta))) dJdtheta[idx_dropout] = 0 return dJdtheta / len(y_list) def _predict(self, theta, X): """ Predict output `y` from input `X`. Args: theta (np.ndarray): coefficients vector. X (np.ndarray): input array with shape (n_features, n_measurements). Returns: y (float): output value. """ a, b = self._split_theta(theta) if X.shape[0] != len(a): raise ValueError('Invalid shape of `X`: {}. Shape must be (n_features, n_measurements). ' 'Expected {} features.' .format(X.shape, a.shape - 1*self.intercept)) # Prediction. aX = np.dot(a, X) bX = np.dot(b, X) y_hat = 1 / (np.sum(aX)) * np.dot(aX, bX) return y_hat @staticmethod def _split_theta(theta): """ Split the theta vector in two equal parts, containing the coefficients for the two regressions. Args: theta (np.ndarray): 1D array that contains the concatenation of the coefficients in a and b. Returns: a (np.ndarray): 1D array with coefficients for function f(X, a). b (np.ndarray): 1D array with coefficients for function f(X, b). """ split_idx = int(len(theta) / 2) a = theta[:split_idx] b = theta[split_idx:] return a, b
[docs]class LinReg(object): """ Fit a linear model with custom regularization. Args: regularizer (str or np.ndarray or None, optional): type of regularization. If None, no regularization is applied. If an array, this array is used as regularizer. If a string, a common regularizer is constructed. Options are 'smooth', 'ridge', 'ols' (see self.create_regularizer). Defaults to None. gamma (float, optional): regularization constant. """ def __init__(self, regularizer=None, gamma=None, solver='fast'): self.regularizer = regularizer self.gamma = gamma self._G = None self._coef_ = None self.solver = solver @property def coef_(self): """ Return the model coefficients. """ if self._coef_ is None: raise ValueError('Coefficients `coef_` not known. Fit the model first.') return self._coef_ @property def G(self): """ Return the regularization matrix. """ if self._G is None: raise ValueError('Regularization matrix `G` not known. Fit the model first.') return self._G
[docs] def compute_reg_cost(self): """ Compute the regularization cost. Returns: cost (float): regularization cost (L2 norm of G*coef). """ g = self.G @ self.coef_ cost = np.dot(g, g) return cost
[docs] @staticmethod def create_regularizer(regularizer, n): """ Create regularization matrix based on the specific type of regularization. Args: regularizer: see class definition. n (int, optional): size of the square regularization matrix. Returns: G (np.ndarray): regularization matrix with shape (n, n). """ if regularizer is None: # No regularization. G = np.zeros([n, n]) elif isinstance(regularizer, np.ndarray): if not regularizer.shape == (n, n): raise ValueError('`regularizer` has wrong shape ({0}). Expected shape ({1}, {1}).'.format( regularizer.shape, n)) # Use the given array as regularizer. G = regularizer elif isinstance(regularizer, str): # Construct common regularizers. regularizer = regularizer.lower() if regularizer == 'ols': # No regularization. G = np.zeros([n, n]) elif regularizer == 'smooth': # Penalize non-smoothness. G = -1 * np.eye(n) G[:, 1:] -= G[:, :-1] G[-1, -1] = 0 elif regularizer == 'ridge': # Ridge regression (L1 penalty). G = np.eye(n) else: raise ValueError('Invalid `regularizer` option "{}". Choose from {}.'.format( regularizer, ['smooth', 'ridge', 'ols'] )) else: raise TypeError('`regulizer must be None, an array or a string. Got {}.'.format(type(regularizer))) return G
[docs] def fit(self, X, y): """ Fit the model coefficients to training data. Args: X (np.ndarray): training input data with shape (n_samples, n_features). y (np.ndarray): training output data with shape (n_samples,) Returns: self: the object itself, which now contains fitted coefficients. """ X = np.asarray(X) y = np.asarray(y).squeeze() if X.ndim != 2: raise ValueError('`X` should be 2D. Got array with shape {}.'.format(X.shape)) if y.ndim != 1: raise ValueError('`y` should be 1D. Got array with shape {}.'.format(y.shape)) if len(X) != len(y): raise ValueError('`X` and `y` should have same lengths. Got shape {} and {}.'.format(X.shape, y.shape)) # Remove nans. X_nan = np.any(np.isnan(X), axis=-1) y_nan = np.isnan(y) nan_mask = X_nan | y_nan X = X[~nan_mask, :] y = y[~nan_mask] if self.gamma is not None: # Create the regularization matrix. G = self.create_regularizer(self.regularizer, X.shape[1]) self._G = G # Regularized linear system. X, y = X.T @ X + self.gamma*(G.T @ G), X.T @ y elif self.regularizer is not None: msg = '\n`regularizer` given, but `gamma` not. Ignoring regularzation.' warnings.warn(msg) solver = self.solver.lower() if solver == 'statsmodels': model = sm.OLS(y, X) results = model.fit() self._coef_ = results.params.reshape(-1, 1) self.results = results elif solver == 'fast': # Least squares solution of linear system Xh = y h = np.linalg.lstsq(X, y, rcond=None)[0] self._coef_ = h else: raise ValueError('Unknow `self.solver`="{}".'.format(self.solver)) return self
[docs] def predict(self, X): """ Apply a fitted model to input `X`. Args: X (np.ndarray): input data with shape (n_samples, n_features). Returns: y (np.ndarray): output data (predictions) with shape (n_samples,) """ return X @ self.coef_