Source code for nnsa.models.conformal_prediction

import sys
import warnings
from collections import defaultdict

import numpy as np
import pyprind


[docs]def build_prediction_interval(y_pred, q_hat, score='absolute_error', scale=None): """ Build regression prediction intervals using conformal prediction. References: https://www.youtube.com/watch?v=nql000Lu_iE Args: y_pred (np.ndarray): 1D array with predictions. q_hat (float): threshold obtained from conformal scores on the calibration dataset. score (str): which (non-)conformal score to compute. See code for latest options. Returns: y_intervals (np.ndarray): 2D array shape (len(y_pred), 2), containing lower and upper values of prediction intervals. """ score = score.lower() y_pred = np.asarray(y_pred).squeeze() # Check shape. if y_pred.ndim != 1: raise ValueError('`y_pred` should be 1D. Got array with shape {}.'.format(y_pred.shape)) if scale is not None: scale = np.asarray(scale).squeeze() if scale.shape != y_pred.shape: raise ValueError('`scale` should have same shape as `y_pred`. Got arrays with shape {} and {}.' .format(scale.shape, y_pred.shape)) else: scale = np.ones_like(y_pred) # Loop over samples and build prediction intervals. y_intervals = [] for i, (yp, sc) in enumerate(zip(y_pred, scale)): if score in ['absolute_error', 'ae']: y_low = yp - sc*q_hat y_high = yp + sc*q_hat else: raise ValueError('Unknow conformal score.') y_intervals.append([y_low, y_high]) y_intervals = np.asarray(y_intervals) return y_intervals
[docs]def build_prediction_set(p_pred, q_hat, score='probability', signal_quality=None): """ Build classification prediction sets using conformal prediction. References: https://www.youtube.com/watch?v=nql000Lu_iE Args: p_pred (np.ndarray): 2D array with shape (n_samples, n_classes) containing probabilities for each class. I.e., p_pred[i, j] is the probability of the jth class for sample i. q_hat (float): threshold obtained from conformal scores on the calibration dataset. score (str): which (non-)conformal score to compute. See code for latest options. Returns: y_pred (np.ndarray): 2D array same shape as `p_pred`, containing 0s and 1s, indicating the prediction sets. I.e., if y_pred[i, j] == 1, then class j is included in the prediction set for sample i. """ score = score.lower() # Check shape. if p_pred.shape[0] < p_pred.shape[1]: msg = '\nNumber of classes ({}) is larger than number of observations ({}). ' \ 'Maybe you meant to pass the transpose of `p_pred`?'.format(p_pred.shape[0], p_pred.shape[1]) warnings.warn(msg) # Loop over samples and build prediction set. y_pred = np.zeros(p_pred.shape) for i, p in enumerate(p_pred): if score == 'mass': # Sort in descending order. idx_sort = np.argsort(p)[::-1] p_sorted = p[idx_sort] # Cumulative probability. p_cum = np.cumsum(p_sorted) # Include all until exceeding q_hat. y_pred[i, idx_sort[p_cum <= q_hat]] = 1 elif score == 'probability': # Include all whose score are lower or equal to the threshold. score_i = 1 - p y_pred[i, score_i <= q_hat] = 1 elif score == 'rel_margin': # The difference of the probability of the true class to the (next) highest probability. # Larger score means more uncertainty. for j, p_j in enumerate(p): p_max = np.nanmax(p[np.arange(len(p)) != j]) score_i = (p_max - p_j)/(p_max + p_j) if score_i <= q_hat: y_pred[i, j] = 1 elif score == 'prob_margin': # The difference of the probability of the true class to the (next) highest probability times (1 - prob). # Larger score means more uncertainty. for j, p_j in enumerate(p): p_max = np.nanmax(p[np.arange(len(p)) != j]) score_i = (p_max - p_j)*(1 - p_j) if score_i <= q_hat: y_pred[i, j] = 1 elif score == 'margin': # Include all classes whose score are lower or equal to the threshold. for j, p_j in enumerate(p): p_max = np.nanmax(p[np.arange(len(p)) != j]) score_i = p_max - p_j if score_i <= q_hat: y_pred[i, j] = 1 elif score == 'margin_2': for j, p_j in enumerate(p): p_max = np.nansum(p[np.arange(len(p)) != j]) score_i = p_max - p_j if score_i <= q_hat: y_pred[i, j] = 1 elif score == 'probability_quality': score_i = (1 - p) * (2 - signal_quality[i]) y_pred[i, score_i <= q_hat] = 1 elif score == 'margin_quality': for j, p_j in enumerate(p): p_max = np.nanmax(p[np.arange(len(p)) != j]) score_i = (p_max - p_j) * (2 - signal_quality[i]) if score_i <= q_hat: y_pred[i, j] = 1 else: raise ValueError('Unknow conformal score.') return y_pred
[docs]def class_conformal_score(y_true, p_pred, score='probability', signal_quality=None): """ Compute classification (non-)conformal scores. Args: y_true (np.ndarray): 1D array with true labels for N classes (coded by 0, 1, ..., N), or the equavalent 2D array with one-hot encodings with shape (n_samples, n_classes). p_pred (np.ndarray): 2D array with same length as y_true containing probabilities for each class, where p_pred[i, j] is the probability of the jth class for sample i. score (str): which score to compute. See code for latest options. References: https://www.youtube.com/watch?v=nql000Lu_iE Returns: scores (np.ndarray): 1D array with (non-)conformal scores. """ from tensorflow.python.keras.utils.np_utils import to_categorical y_true = np.asarray(y_true).squeeze() p_pred = np.asarray(p_pred).squeeze() score = score.lower() # Check dims. if y_true.ndim not in [1, 2]: raise ValueError('`y_true` should be 1D or 2D. Got array with shape {}.'.format(y_true.shape)) if p_pred.ndim != 2: raise ValueError('`p_pred` should be 2D. Got array with shape {}.'.format(p_pred.shape)) # Check lengths. if len(p_pred) != len(y_true): # Try transpose. p_pred = p_pred.T if len(p_pred) != len(y_true): raise ValueError('Length of `y_true` ({}) does not match length of `p_pred` ({}).' .format(len(y_true), len(p_pred))) if y_true.ndim == 1: # To one-hot encodings. y_true = to_categorical(y_true).astype(bool) else: y_true = y_true.astype(bool) # Compute scores for each sample. scores = [] for i, (p, y) in enumerate(zip(p_pred, y_true)): # Probability of true class. p_true = np.nanmax(p[y]) # Maximum p, except for p_true. p_max = np.nanmax(p[~y]) if score == 'mass': # Sum of probability until true class is included, starting from largest to smallest. # Larger score means more uncertainty. score_i = np.sum(p[p >= p_true]) elif score == 'probability': # The distance of the probability of the true class to 1. # Larger score means more uncertainty. score_i = 1 - p_true elif score == 'rel_margin': # The reciprocal of the probability of the true class to the (next) highest probability. # Larger score means more uncertainty. score_i = (p_max - p_true)/(p_max + p_true) elif score == 'prob_margin': # The difference of the probability of the true class to the (next) highest probability times (1 - prob). score_i = (p_max - p_true) * (1 - p_true) elif score == 'margin': # The distance of the probability of the true class to the (next) highest probability. # Larger score means more uncertainty. score_i = p_max - p_true elif score == 'margin_2': p_true = np.nansum(p[y]) p_max = np.nansum(p[~y]) score_i = p_max - p_true elif score == 'probability_quality': # Larger score means more uncertainty. score_i = (1 - p_true) * (2 - signal_quality[i]) elif score == 'margin_quality': # Larger score means more uncertainty. score_i = (p_max - p_true) * (2 - signal_quality[i]) else: raise ValueError('Invalid conformal score "{}".'.format(score)) scores.append(score_i) scores = np.array(scores) return scores
[docs]def regr_conformal_score(y_true, y_pred, score='absolute_error', scale=None): """ Compute regression (non-)conformal scores. Args: y_true (np.ndarray): 1D array with true values. y_pred (np.ndarray): 1D array with predicted values (same lentgh as y_true). score (str): which score to compute. See code for latest options. References: https://www.youtube.com/watch?v=nql000Lu_iE Returns: scores (np.ndarray): 1D array with (non-)conformal scores. """ y_true = np.asarray(y_true).squeeze() y_pred = np.asarray(y_pred).squeeze() score = score.lower() # Check dims. if y_true.ndim != 1: raise ValueError('`y_true` should be 1D. Got array with shape {}.'.format(y_true.shape)) if y_pred.shape != y_true.shape: raise ValueError('`y_pred` should have same shape as `y_true`. Got arrays with shape {} and {}.' .format(y_pred.shape, y_true.shape)) if scale is not None: scale = np.asarray(scale).squeeze() if scale.shape != y_true.shape: raise ValueError('`scale` should have same shape as `y_true`. Got arrays with shape {} and {}.' .format(scale.shape, y_true.shape)) else: scale = np.ones_like(y_true) # Compute scores for each sample. scores = [] for i, (yp, yt, sc) in enumerate(zip(y_pred, y_true, scale)): if score in ['absolute_error', 'ae']: score_i = np.abs(yt - yp)/sc else: raise ValueError('Invalid conformal score "{}".'.format(score)) scores.append(score_i) scores = np.array(scores) return scores
[docs]def compute_coverage(y_true, y_pred): """ Compute coverage for classification prediction sets. Args: y_true (np.ndarray): 1D array with true labels for N classes (coded by 0, 1, ..., N), or the equavalent 2D array with one-hot encodings with shape (n_samples, n_classes). y_pred (np.ndarray): 2D array same length `y_true`, containing 0s and 1s, indicating the prediction sets. I.e., if y_pred[i, j] == 1, then class j is included in the prediction set for sample i. Returns: coverage (float): fraction of prediction sets that contain true label. """ from tensorflow.python.keras.utils.np_utils import to_categorical # Check shapes. if len(y_pred) != len(y_true): # Try transpose. y_pred = y_pred.T if len(y_pred) != len(y_true): raise ValueError('Length of `y_true` (shape {}) does not match `y_pred` (shape {}).' .format(y_true.shape, y_pred.shape)) # Check values. if np.nanmax(y_pred) > 1: raise ValueError('Invalid values in y_pred. Should only contain zeros and ones.') if np.nanmin(y_pred) < 0: raise ValueError('Invalid values in y_pred. Should only contain zeros and ones.') if y_true.ndim == 1: # To one-hot encodings. y_true = to_categorical(y_true, num_classes=y_pred.shape[1]).astype(int) else: y_true = y_true.astype(int) # For each sample determine whether the prediction set includes the true label. num_match = np.sum(np.any(y_true * y_pred, axis=-1)) num_true = np.sum(np.any(y_true, axis=-1)) coverage = num_match / num_true return coverage
[docs]def compute_score(y_true, y_pred, fun): """ Compute a performance score from predictions sets, including only the sets with size 1. Args: y_true (np.ndarray): 1D array with true labels for N classes (coded by 0, 1, ..., N), or the equavalent 2D array with one-hot encodings with shape (n_samples, n_classes). y_pred (np.ndarray): 2D array same length `y_true`, containing 0s and 1s, indicating the prediction sets. I.e., if y_pred[i, j] == 1, then class j is included in the prediction set for sample i. fun (function): function to apply to y_true, y_pred (both their 1D formats with integers specifying the classes) to compute the score, e.g. sklearns classification score functions. Returns: score (float): score. drop_rate (float): fraction of samples dropped (emtpy prediction set or more than 1 prediction in set). """ from tensorflow.python.keras.utils.np_utils import to_categorical # Check shapes. if len(y_pred) != len(y_true): # Try transpose. y_pred = y_pred.T if len(y_pred) != len(y_true): raise ValueError('Length of `y_true` (shape {}) does not match `y_pred` (shape {}).' .format(y_true.shape, y_pred.shape)) # Check values. if np.nanmax(y_pred) > 1: raise ValueError('Invalid values in y_pred. Should only contain zeros and ones.') if np.nanmin(y_pred) < 0: raise ValueError('Invalid values in y_pred. Should only contain zeros and ones.') if y_true.ndim == 1: # To one-hot encodings. y_true = to_categorical(y_true, num_classes=y_pred.shape[1]).astype(int) else: y_true = y_true.astype(int) # Select only samples that have a single conformal predictions (or true value). select_mask = (np.sum(y_true, axis=-1) == 1) & (np.sum(y_pred, axis=-1) == 1) score = fun(np.argmax(y_true, axis=-1)[select_mask], np.argmax(y_pred, axis=-1)[select_mask]) drop_rate = 1 - np.mean(select_mask) return score, drop_rate
[docs]def compute_q_hat(scores, alpha): """ Compute the threshold q_hat for conformal prediction. q_hat is the (1 - alpha) quantile (with minor sample size correction) of `scores`. Values in `scores` should measure the uncertainty of the predictions. Returns: q_hat (float): threshold to use for building new prediction sets. """ # (1 - alpha) quantile. n = len(scores) q = np.ceil((n + 1) * (1 - alpha)) / n q = np.clip(q, 0, 1) # Cannot compute quantiles <0 or > 1. q_hat = np.nanquantile(scores, q) return q_hat
[docs]def conformal_diagnostics(y_true, p_pred, score, sample_id=None, cali_split=0.5, alpha=0.1, n_iter=50, metrics_fun=None, seed=None, verbose=1): """ Compute conformal diagnostics to test the conformal prediction. References: https://www.youtube.com/watch?v=TRx4a2u-j7M Args: y_true (np.ndarray): 1D array with true labels for N classes (coded by 0, 1, ..., N), or the equavalent 2D array with one-hot encodings with shape (n_samples, n_classes). p_pred (np.ndarray): 2D array with same length as y_true containing probabilities for each class, where p_pred[i, j] is the probability of the jth class for sample i. score (str): which score to compute. See code for latest options. sample_id (list or np.ndarray or None): if provided, this should have length n_samples, where each entry contains the ID of the corresponding sample. This is used when randomly dividing the data into calibration and test folds: all samples with the same ID are put in the same fold, which may be useful if samples are not independent. If not provided, each sample gets a unique ID and thus is considered independent. cali_split (float): the fraction of data to put in the calibration set. alpha (float): alpha level (e.g. 0.1). metrics_fun (dict): dict with functions that compute any user-defined metrics that accept y_true and y_pred, both boolean arrays with shape (n_samples, n_classes). E.g. metrics={'coverage': lambda y_true, y_pred: np.sum(np.any(y_true*y_pred, axis=-1))/np.sum(np.any(y_true, axis=-1))} n_iter (int): number of iterations (random splits). seed (int): seed for random generator. verbose (int): verbosity level. Returns: coverage (np.ndarray): array with shape (n_iter,) containing the coverage of each iteration. set_sizes (np.ndarray): array with shape (n_iter*n_samples) containing the size of the prediction set for each sample per iteration. stratified_coverage (np.ndarray): array with shape (n_iter, n_classes) containing the coverage per iteration per class. metrics (dict): dict with arrays for each metrics with shapes (n_iter,) containing the metrics computed on the test fold per iteration. """ from tensorflow.python.keras.utils.np_utils import to_categorical # Check shapes. y_true = np.asarray(y_true) p_pred = np.asarray(p_pred) if len(y_true) != len(p_pred): raise ValueError('Length of y_true should be the same as p_pred. Got lengths {} and {}.' .format(len(y_true), len(p_pred))) if y_true.ndim == 1: # To one-hot encodings. y_true = to_categorical(y_true, num_classes=p_pred.shape[1]).astype(bool) else: y_true = y_true.astype(bool) # Number of samples. n_samples, n_classes = p_pred.shape if sample_id is None: # Treat each sample as independent. sample_id = np.arange(len(y_true)) else: sample_id = np.asarray(sample_id) if len(sample_id) != n_samples: raise ValueError('Length of sample_id should be the number of samples ({}). Got length {}.' .format(n_samples, len(sample_id))) if metrics_fun is None: metrics_fun = {} # Number of unique sample IDs. unique_ids = np.unique(sample_id) # Number of unique IDs in calibration set. n_cali = int(np.round(len(unique_ids) * cali_split)) # Seed random generator. rng = np.random.default_rng(seed) # Loop over iterations. coverage_all, set_sizes_all, stratified_coverage_all = [], [], [] metrics_all = defaultdict(list) bar = pyprind.ProgBar(n_iter, stream=sys.stdout) for i in range(n_iter): # Shuffle unique IDS. rng.shuffle(unique_ids) # IDs for calibration set. ids_cali = unique_ids[:n_cali] # Create boolean masks for selecting samples in the calibration and test set. mask_cali = np.array([ii in ids_cali for ii in sample_id]) mask_test = ~mask_cali # Check split size. if abs(np.mean(mask_cali) - cali_split) > 0.2: msg = '\nFraction of samples in calibration split ({:.2f}) is different from specified fraction ({:.2f}).' \ .format(np.mean(mask_cali), cali_split) warnings.warn(msg) # Make the split. y_true_cali = y_true[mask_cali] p_pred_cali = p_pred[mask_cali] y_true_test = y_true[mask_test] p_pred_test = p_pred[mask_test] # Compute conformal scores. conf_scores_cali = class_conformal_score(y_true=y_true_cali, p_pred=p_pred_cali, score=score) # Get quantile. q_hat = compute_q_hat(scores=conf_scores_cali, alpha=alpha) # Get prediction sets for test data. y_pred_test = build_prediction_set(p_pred=p_pred_test, q_hat=q_hat, score=score) # Compute coverage. coverage = compute_coverage(y_true=y_true_test, y_pred=y_pred_test) # Compute set sizes. set_sizes = np.sum(y_pred_test, axis=-1) # Compute stratified coverage. stratified_coverage = [compute_coverage(y_true=y_true_test[y_true_test[:, i]], y_pred=y_pred_test[y_true_test[:, i]]) for i in range(n_classes)] for name, fun in metrics_fun.items(): m = fun(y_true_test, y_pred_test) metrics_all[name].append(m) # Collect. coverage_all.append(coverage) set_sizes_all.append(set_sizes) stratified_coverage_all.append(stratified_coverage) if verbose: bar.update() # To arrays. coverage = np.array(coverage_all) set_sizes = np.concatenate(set_sizes_all) stratified_coverage = np.array(stratified_coverage_all) metrics = dict() for name, m in metrics_all.items(): metrics[name] = np.array(m) return coverage, set_sizes, stratified_coverage, metrics