Source code for nnsa.models.conformal_prediction
import sys
import warnings
from collections import defaultdict
import numpy as np
import pyprind
[docs]def build_prediction_interval(y_pred, q_hat, score='absolute_error', scale=None):
"""
Build regression prediction intervals using conformal prediction.
References:
https://www.youtube.com/watch?v=nql000Lu_iE
Args:
y_pred (np.ndarray): 1D array with predictions.
q_hat (float): threshold obtained from conformal scores on the calibration dataset.
score (str): which (non-)conformal score to compute. See code for latest options.
Returns:
y_intervals (np.ndarray): 2D array shape (len(y_pred), 2), containing lower and upper values of
prediction intervals.
"""
score = score.lower()
y_pred = np.asarray(y_pred).squeeze()
# Check shape.
if y_pred.ndim != 1:
raise ValueError('`y_pred` should be 1D. Got array with shape {}.'.format(y_pred.shape))
if scale is not None:
scale = np.asarray(scale).squeeze()
if scale.shape != y_pred.shape:
raise ValueError('`scale` should have same shape as `y_pred`. Got arrays with shape {} and {}.'
.format(scale.shape, y_pred.shape))
else:
scale = np.ones_like(y_pred)
# Loop over samples and build prediction intervals.
y_intervals = []
for i, (yp, sc) in enumerate(zip(y_pred, scale)):
if score in ['absolute_error', 'ae']:
y_low = yp - sc*q_hat
y_high = yp + sc*q_hat
else:
raise ValueError('Unknow conformal score.')
y_intervals.append([y_low, y_high])
y_intervals = np.asarray(y_intervals)
return y_intervals
[docs]def build_prediction_set(p_pred, q_hat, score='probability', signal_quality=None):
"""
Build classification prediction sets using conformal prediction.
References:
https://www.youtube.com/watch?v=nql000Lu_iE
Args:
p_pred (np.ndarray): 2D array with shape (n_samples, n_classes) containing probabilities
for each class. I.e., p_pred[i, j] is the probability of the jth class for sample i.
q_hat (float): threshold obtained from conformal scores on the calibration dataset.
score (str): which (non-)conformal score to compute. See code for latest options.
Returns:
y_pred (np.ndarray): 2D array same shape as `p_pred`, containing 0s and 1s, indicating the
prediction sets. I.e., if y_pred[i, j] == 1, then class j is included in the prediction
set for sample i.
"""
score = score.lower()
# Check shape.
if p_pred.shape[0] < p_pred.shape[1]:
msg = '\nNumber of classes ({}) is larger than number of observations ({}). ' \
'Maybe you meant to pass the transpose of `p_pred`?'.format(p_pred.shape[0], p_pred.shape[1])
warnings.warn(msg)
# Loop over samples and build prediction set.
y_pred = np.zeros(p_pred.shape)
for i, p in enumerate(p_pred):
if score == 'mass':
# Sort in descending order.
idx_sort = np.argsort(p)[::-1]
p_sorted = p[idx_sort]
# Cumulative probability.
p_cum = np.cumsum(p_sorted)
# Include all until exceeding q_hat.
y_pred[i, idx_sort[p_cum <= q_hat]] = 1
elif score == 'probability':
# Include all whose score are lower or equal to the threshold.
score_i = 1 - p
y_pred[i, score_i <= q_hat] = 1
elif score == 'rel_margin':
# The difference of the probability of the true class to the (next) highest probability.
# Larger score means more uncertainty.
for j, p_j in enumerate(p):
p_max = np.nanmax(p[np.arange(len(p)) != j])
score_i = (p_max - p_j)/(p_max + p_j)
if score_i <= q_hat:
y_pred[i, j] = 1
elif score == 'prob_margin':
# The difference of the probability of the true class to the (next) highest probability times (1 - prob).
# Larger score means more uncertainty.
for j, p_j in enumerate(p):
p_max = np.nanmax(p[np.arange(len(p)) != j])
score_i = (p_max - p_j)*(1 - p_j)
if score_i <= q_hat:
y_pred[i, j] = 1
elif score == 'margin':
# Include all classes whose score are lower or equal to the threshold.
for j, p_j in enumerate(p):
p_max = np.nanmax(p[np.arange(len(p)) != j])
score_i = p_max - p_j
if score_i <= q_hat:
y_pred[i, j] = 1
elif score == 'margin_2':
for j, p_j in enumerate(p):
p_max = np.nansum(p[np.arange(len(p)) != j])
score_i = p_max - p_j
if score_i <= q_hat:
y_pred[i, j] = 1
elif score == 'probability_quality':
score_i = (1 - p) * (2 - signal_quality[i])
y_pred[i, score_i <= q_hat] = 1
elif score == 'margin_quality':
for j, p_j in enumerate(p):
p_max = np.nanmax(p[np.arange(len(p)) != j])
score_i = (p_max - p_j) * (2 - signal_quality[i])
if score_i <= q_hat:
y_pred[i, j] = 1
else:
raise ValueError('Unknow conformal score.')
return y_pred
[docs]def class_conformal_score(y_true, p_pred, score='probability', signal_quality=None):
"""
Compute classification (non-)conformal scores.
Args:
y_true (np.ndarray): 1D array with true labels for N classes (coded by 0, 1, ..., N),
or the equavalent 2D array with one-hot encodings with shape (n_samples, n_classes).
p_pred (np.ndarray): 2D array with same length as y_true containing probabilities for each class,
where p_pred[i, j] is the probability of the jth class for sample i.
score (str): which score to compute. See code for latest options.
References:
https://www.youtube.com/watch?v=nql000Lu_iE
Returns:
scores (np.ndarray): 1D array with (non-)conformal scores.
"""
from tensorflow.python.keras.utils.np_utils import to_categorical
y_true = np.asarray(y_true).squeeze()
p_pred = np.asarray(p_pred).squeeze()
score = score.lower()
# Check dims.
if y_true.ndim not in [1, 2]:
raise ValueError('`y_true` should be 1D or 2D. Got array with shape {}.'.format(y_true.shape))
if p_pred.ndim != 2:
raise ValueError('`p_pred` should be 2D. Got array with shape {}.'.format(p_pred.shape))
# Check lengths.
if len(p_pred) != len(y_true):
# Try transpose.
p_pred = p_pred.T
if len(p_pred) != len(y_true):
raise ValueError('Length of `y_true` ({}) does not match length of `p_pred` ({}).'
.format(len(y_true), len(p_pred)))
if y_true.ndim == 1:
# To one-hot encodings.
y_true = to_categorical(y_true).astype(bool)
else:
y_true = y_true.astype(bool)
# Compute scores for each sample.
scores = []
for i, (p, y) in enumerate(zip(p_pred, y_true)):
# Probability of true class.
p_true = np.nanmax(p[y])
# Maximum p, except for p_true.
p_max = np.nanmax(p[~y])
if score == 'mass':
# Sum of probability until true class is included, starting from largest to smallest.
# Larger score means more uncertainty.
score_i = np.sum(p[p >= p_true])
elif score == 'probability':
# The distance of the probability of the true class to 1.
# Larger score means more uncertainty.
score_i = 1 - p_true
elif score == 'rel_margin':
# The reciprocal of the probability of the true class to the (next) highest probability.
# Larger score means more uncertainty.
score_i = (p_max - p_true)/(p_max + p_true)
elif score == 'prob_margin':
# The difference of the probability of the true class to the (next) highest probability times (1 - prob).
score_i = (p_max - p_true) * (1 - p_true)
elif score == 'margin':
# The distance of the probability of the true class to the (next) highest probability.
# Larger score means more uncertainty.
score_i = p_max - p_true
elif score == 'margin_2':
p_true = np.nansum(p[y])
p_max = np.nansum(p[~y])
score_i = p_max - p_true
elif score == 'probability_quality':
# Larger score means more uncertainty.
score_i = (1 - p_true) * (2 - signal_quality[i])
elif score == 'margin_quality':
# Larger score means more uncertainty.
score_i = (p_max - p_true) * (2 - signal_quality[i])
else:
raise ValueError('Invalid conformal score "{}".'.format(score))
scores.append(score_i)
scores = np.array(scores)
return scores
[docs]def regr_conformal_score(y_true, y_pred, score='absolute_error', scale=None):
"""
Compute regression (non-)conformal scores.
Args:
y_true (np.ndarray): 1D array with true values.
y_pred (np.ndarray): 1D array with predicted values (same lentgh as y_true).
score (str): which score to compute. See code for latest options.
References:
https://www.youtube.com/watch?v=nql000Lu_iE
Returns:
scores (np.ndarray): 1D array with (non-)conformal scores.
"""
y_true = np.asarray(y_true).squeeze()
y_pred = np.asarray(y_pred).squeeze()
score = score.lower()
# Check dims.
if y_true.ndim != 1:
raise ValueError('`y_true` should be 1D. Got array with shape {}.'.format(y_true.shape))
if y_pred.shape != y_true.shape:
raise ValueError('`y_pred` should have same shape as `y_true`. Got arrays with shape {} and {}.'
.format(y_pred.shape, y_true.shape))
if scale is not None:
scale = np.asarray(scale).squeeze()
if scale.shape != y_true.shape:
raise ValueError('`scale` should have same shape as `y_true`. Got arrays with shape {} and {}.'
.format(scale.shape, y_true.shape))
else:
scale = np.ones_like(y_true)
# Compute scores for each sample.
scores = []
for i, (yp, yt, sc) in enumerate(zip(y_pred, y_true, scale)):
if score in ['absolute_error', 'ae']:
score_i = np.abs(yt - yp)/sc
else:
raise ValueError('Invalid conformal score "{}".'.format(score))
scores.append(score_i)
scores = np.array(scores)
return scores
[docs]def compute_coverage(y_true, y_pred):
"""
Compute coverage for classification prediction sets.
Args:
y_true (np.ndarray): 1D array with true labels for N classes (coded by 0, 1, ..., N),
or the equavalent 2D array with one-hot encodings with shape (n_samples, n_classes).
y_pred (np.ndarray): 2D array same length `y_true`, containing 0s and 1s, indicating the
prediction sets. I.e., if y_pred[i, j] == 1, then class j is included in the prediction
set for sample i.
Returns:
coverage (float): fraction of prediction sets that contain true label.
"""
from tensorflow.python.keras.utils.np_utils import to_categorical
# Check shapes.
if len(y_pred) != len(y_true):
# Try transpose.
y_pred = y_pred.T
if len(y_pred) != len(y_true):
raise ValueError('Length of `y_true` (shape {}) does not match `y_pred` (shape {}).'
.format(y_true.shape, y_pred.shape))
# Check values.
if np.nanmax(y_pred) > 1:
raise ValueError('Invalid values in y_pred. Should only contain zeros and ones.')
if np.nanmin(y_pred) < 0:
raise ValueError('Invalid values in y_pred. Should only contain zeros and ones.')
if y_true.ndim == 1:
# To one-hot encodings.
y_true = to_categorical(y_true, num_classes=y_pred.shape[1]).astype(int)
else:
y_true = y_true.astype(int)
# For each sample determine whether the prediction set includes the true label.
num_match = np.sum(np.any(y_true * y_pred, axis=-1))
num_true = np.sum(np.any(y_true, axis=-1))
coverage = num_match / num_true
return coverage
[docs]def compute_score(y_true, y_pred, fun):
"""
Compute a performance score from predictions sets, including only the sets with size 1.
Args:
y_true (np.ndarray): 1D array with true labels for N classes (coded by 0, 1, ..., N),
or the equavalent 2D array with one-hot encodings with shape (n_samples, n_classes).
y_pred (np.ndarray): 2D array same length `y_true`, containing 0s and 1s, indicating the
prediction sets. I.e., if y_pred[i, j] == 1, then class j is included in the prediction
set for sample i.
fun (function): function to apply to y_true, y_pred (both their 1D formats with integers
specifying the classes) to compute the score, e.g. sklearns classification score functions.
Returns:
score (float): score.
drop_rate (float): fraction of samples dropped (emtpy prediction set or more than 1 prediction in set).
"""
from tensorflow.python.keras.utils.np_utils import to_categorical
# Check shapes.
if len(y_pred) != len(y_true):
# Try transpose.
y_pred = y_pred.T
if len(y_pred) != len(y_true):
raise ValueError('Length of `y_true` (shape {}) does not match `y_pred` (shape {}).'
.format(y_true.shape, y_pred.shape))
# Check values.
if np.nanmax(y_pred) > 1:
raise ValueError('Invalid values in y_pred. Should only contain zeros and ones.')
if np.nanmin(y_pred) < 0:
raise ValueError('Invalid values in y_pred. Should only contain zeros and ones.')
if y_true.ndim == 1:
# To one-hot encodings.
y_true = to_categorical(y_true, num_classes=y_pred.shape[1]).astype(int)
else:
y_true = y_true.astype(int)
# Select only samples that have a single conformal predictions (or true value).
select_mask = (np.sum(y_true, axis=-1) == 1) & (np.sum(y_pred, axis=-1) == 1)
score = fun(np.argmax(y_true, axis=-1)[select_mask], np.argmax(y_pred, axis=-1)[select_mask])
drop_rate = 1 - np.mean(select_mask)
return score, drop_rate
[docs]def compute_q_hat(scores, alpha):
"""
Compute the threshold q_hat for conformal prediction.
q_hat is the (1 - alpha) quantile (with minor sample size correction) of `scores`.
Values in `scores` should measure the uncertainty of the predictions.
Returns:
q_hat (float): threshold to use for building new prediction sets.
"""
# (1 - alpha) quantile.
n = len(scores)
q = np.ceil((n + 1) * (1 - alpha)) / n
q = np.clip(q, 0, 1) # Cannot compute quantiles <0 or > 1.
q_hat = np.nanquantile(scores, q)
return q_hat
[docs]def conformal_diagnostics(y_true, p_pred, score, sample_id=None,
cali_split=0.5, alpha=0.1, n_iter=50,
metrics_fun=None, seed=None, verbose=1):
"""
Compute conformal diagnostics to test the conformal prediction.
References:
https://www.youtube.com/watch?v=TRx4a2u-j7M
Args:
y_true (np.ndarray): 1D array with true labels for N classes (coded by 0, 1, ..., N),
or the equavalent 2D array with one-hot encodings with shape (n_samples, n_classes).
p_pred (np.ndarray): 2D array with same length as y_true containing probabilities for each class,
where p_pred[i, j] is the probability of the jth class for sample i.
score (str): which score to compute. See code for latest options.
sample_id (list or np.ndarray or None): if provided, this should have length n_samples, where each
entry contains the ID of the corresponding sample. This is used when randomly dividing the data
into calibration and test folds: all samples with the same ID are put in the same fold, which
may be useful if samples are not independent.
If not provided, each sample gets a unique ID and thus is considered independent.
cali_split (float): the fraction of data to put in the calibration set.
alpha (float): alpha level (e.g. 0.1).
metrics_fun (dict): dict with functions that compute any user-defined metrics that accept y_true and y_pred,
both boolean arrays with shape (n_samples, n_classes). E.g.
metrics={'coverage': lambda y_true, y_pred: np.sum(np.any(y_true*y_pred, axis=-1))/np.sum(np.any(y_true, axis=-1))}
n_iter (int): number of iterations (random splits).
seed (int): seed for random generator.
verbose (int): verbosity level.
Returns:
coverage (np.ndarray): array with shape (n_iter,) containing the coverage of each iteration.
set_sizes (np.ndarray): array with shape (n_iter*n_samples) containing the size of the prediction set for
each sample per iteration.
stratified_coverage (np.ndarray): array with shape (n_iter, n_classes) containing the coverage
per iteration per class.
metrics (dict): dict with arrays for each metrics with shapes (n_iter,) containing the metrics computed
on the test fold per iteration.
"""
from tensorflow.python.keras.utils.np_utils import to_categorical
# Check shapes.
y_true = np.asarray(y_true)
p_pred = np.asarray(p_pred)
if len(y_true) != len(p_pred):
raise ValueError('Length of y_true should be the same as p_pred. Got lengths {} and {}.'
.format(len(y_true), len(p_pred)))
if y_true.ndim == 1:
# To one-hot encodings.
y_true = to_categorical(y_true, num_classes=p_pred.shape[1]).astype(bool)
else:
y_true = y_true.astype(bool)
# Number of samples.
n_samples, n_classes = p_pred.shape
if sample_id is None:
# Treat each sample as independent.
sample_id = np.arange(len(y_true))
else:
sample_id = np.asarray(sample_id)
if len(sample_id) != n_samples:
raise ValueError('Length of sample_id should be the number of samples ({}). Got length {}.'
.format(n_samples, len(sample_id)))
if metrics_fun is None:
metrics_fun = {}
# Number of unique sample IDs.
unique_ids = np.unique(sample_id)
# Number of unique IDs in calibration set.
n_cali = int(np.round(len(unique_ids) * cali_split))
# Seed random generator.
rng = np.random.default_rng(seed)
# Loop over iterations.
coverage_all, set_sizes_all, stratified_coverage_all = [], [], []
metrics_all = defaultdict(list)
bar = pyprind.ProgBar(n_iter, stream=sys.stdout)
for i in range(n_iter):
# Shuffle unique IDS.
rng.shuffle(unique_ids)
# IDs for calibration set.
ids_cali = unique_ids[:n_cali]
# Create boolean masks for selecting samples in the calibration and test set.
mask_cali = np.array([ii in ids_cali for ii in sample_id])
mask_test = ~mask_cali
# Check split size.
if abs(np.mean(mask_cali) - cali_split) > 0.2:
msg = '\nFraction of samples in calibration split ({:.2f}) is different from specified fraction ({:.2f}).' \
.format(np.mean(mask_cali), cali_split)
warnings.warn(msg)
# Make the split.
y_true_cali = y_true[mask_cali]
p_pred_cali = p_pred[mask_cali]
y_true_test = y_true[mask_test]
p_pred_test = p_pred[mask_test]
# Compute conformal scores.
conf_scores_cali = class_conformal_score(y_true=y_true_cali, p_pred=p_pred_cali,
score=score)
# Get quantile.
q_hat = compute_q_hat(scores=conf_scores_cali, alpha=alpha)
# Get prediction sets for test data.
y_pred_test = build_prediction_set(p_pred=p_pred_test, q_hat=q_hat, score=score)
# Compute coverage.
coverage = compute_coverage(y_true=y_true_test, y_pred=y_pred_test)
# Compute set sizes.
set_sizes = np.sum(y_pred_test, axis=-1)
# Compute stratified coverage.
stratified_coverage = [compute_coverage(y_true=y_true_test[y_true_test[:, i]],
y_pred=y_pred_test[y_true_test[:, i]])
for i in range(n_classes)]
for name, fun in metrics_fun.items():
m = fun(y_true_test, y_pred_test)
metrics_all[name].append(m)
# Collect.
coverage_all.append(coverage)
set_sizes_all.append(set_sizes)
stratified_coverage_all.append(stratified_coverage)
if verbose:
bar.update()
# To arrays.
coverage = np.array(coverage_all)
set_sizes = np.concatenate(set_sizes_all)
stratified_coverage = np.array(stratified_coverage_all)
metrics = dict()
for name, m in metrics_all.items():
metrics[name] = np.array(m)
return coverage, set_sizes, stratified_coverage, metrics