"""
Code for evaluation of classifiers.
"""
import itertools
import sys
import warnings
from collections import defaultdict
import numpy as np
import sklearn
from joblib import Parallel, delayed
from pyprind import ProgBar
from sklearn import metrics
from sklearn.metrics import confusion_matrix, silhouette_score, precision_recall_curve, accuracy_score, \
cohen_kappa_score
from sklearn.preprocessing import label_binarize
from sklearn.utils import shuffle
from sklearn.utils.multiclass import unique_labels
import pandas as pd
import scipy.stats
from nnsa.utils.arrays import remove_nans
from nnsa.utils.objects import basic_repr
from nnsa.utils.other import enumerate_label
__all__ = [
'ConfusionMatrix',
'average_silhouette',
'binary_classification_scores',
'cluster_inertia',
'compute_separation_scores',
'fscore_carolina',
'separation_score',
'roc_auc_score',
]
[docs]class ConfusionMatrix(object):
"""
High-level interface for manipulating the results of the confusion matrix.
Samples with np.nan values in y_true or y_pred are ignored upon initialization of the confusion matrix.
Upon initializing a ConfusionMatrix object, the confusion matrix is computed. Subsequently, by calling methods
on the object, all sorts of metrics derived from the confusion matrix can be quickly computed (e.g. accuracy,
sensitivity, specificity). This works for binary classification as well as multiclass classification. In the case
of multiclass classification, a one-vs-all method is applied if neccessary.
A nice overview of most of thos metrics can be found on Wikipedia:
https://en.wikipedia.org/wiki/Confusion_matrix
Args:
y_true (np.ndarray): 1D array with true class numbers 0:N-1, with N the number of classes.
y_pred (np.ndarray): 1D array with predicted class numbers 0:N-1.
class_mapping (dict, optional): dictionary that maps a class label to a class number. If None,
default class labels are created for the class numbers.
Defaults to None.
Raises:
ValueError: if length of y_true and y_pred are not equal.
ValueError: if not all detected class numbers are linked to a label in the specified class_mapping.
"""
def __init__(self, y_true, y_pred, class_mapping=None):
# Input check.
if len(y_true) != len(y_pred):
raise ValueError('Length of y_true ({}) is not equal to length of predictions ({}).'
.format(len(y_true), len(y_pred)))
# Remove samples in which either y_true or y_pred is nan.
mask_nan = np.logical_or(np.isnan(y_true), np.isnan(y_pred))
mask_keep = np.logical_not(mask_nan)
y_true = y_true[mask_keep]
y_pred = y_pred[mask_keep]
# Store the unique class numbers (sorted).
unique_classes = unique_labels(y_true, y_pred)
# Default class_mapping.
if class_mapping is None:
class_labels = enumerate_label(len(unique_classes), label='Class')
class_mapping = dict(zip(class_labels, unique_classes))
# Check if all class numbers have a class label.
if not all((class_number in class_mapping.values() for class_number in unique_classes)):
raise ValueError('Not all class numbers ({}) have a label in class_mapping ({})'
.format(unique_classes, class_mapping))
self.matrix = confusion_matrix(y_true=y_true, y_pred=y_pred, labels=unique_classes)
self.class_mapping = class_mapping
self.unique_classes = unique_classes
def __repr__(self):
return basic_repr(self)
[docs] def accuracy(self):
"""
Compute accuracy from the confusion matrix.
Returns:
(float): accuracy.
"""
cm = self.matrix
return np.trace(cm) / np.sum(cm)
[docs] def average_accuracy(self, class_label=None):
"""
Compute average accuracy from the confusion matrix (which is fairer than accuracy for imbalanced data sets).
Returns:
(float): average accuracy.
"""
tpr = self.sensitivity(class_label=class_label)
tnr = self.specificity(class_label=class_label)
return (tpr + tnr)/2
[docs] def f1(self, class_label=None):
ppv = self.precision(class_label=class_label)
tpr = self.sensitivity(class_label=class_label)
return 2*ppv*tpr/(ppv + tpr)
[docs] def false_negatives(self, class_label=None):
"""
Return the number of false negatives.
Args:
class_label (str, optional): class label that represents the positive outcome.
If None, the last class is taken
.
Defaults to None.
Returns:
(int): the number of false negatives.
"""
idx_positive = self._get_class_idx(class_label=class_label)
cm = self.matrix
false_negatives = np.sum(cm[idx_positive, :]) - cm[idx_positive, idx_positive]
return false_negatives
[docs] def false_positives(self, class_label=None):
"""
Return the number of false positives.
Args:
class_label (str, optional): class label that represents the positive outcome.
If None, the last class is taken.
Defaults to None.
Returns:
(int): the number of false positives.
"""
idx_positive = self._get_class_idx(class_label=class_label)
cm = self.matrix
false_positives = np.sum(cm[:, idx_positive]) - cm[idx_positive, idx_positive]
return false_positives
[docs] def kappa(self):
"""
Compute Cohen's kappa, i.e., inter-rater agreement (accuracy), corrected for chance.
Returns:
(float): Cohen's kappa.
"""
# Probability of agreement (accuracy).
p_o = self.accuracy()
# Probability of agreement by chance.
cm = self.matrix
p_e = 1/np.sum(cm)**2 * np.dot(np.sum(cm, axis=0), np.sum(cm, axis=1))
return (p_o - p_e)/(1 - p_e)
[docs] def precision(self, class_label=None):
"""
Compute precision from confusion matrix.
Args:
class_label (str, optional): class label that represents the positive outcome.
If None, the last class is taken.
Defaults to None.
Returns:
(float): precision.
"""
true_positives = self.true_positives(class_label=class_label)
false_positives = self.false_positives(class_label=class_label)
return true_positives / (true_positives + false_positives)
[docs] def scores_general(self):
"""
Return some general evaluation metrics (independent of which class is the positive outcome).
Returns:
scores (dict): dictionary with scores.
"""
scores = {
'accuracy': self.accuracy(),
'kappa': self.kappa(),
}
return scores
[docs] def scores_per_class(self):
"""
Return some evaluation metrics per class (considering one class as the positive outcome at a time).
Returns:
df (pd.DataFrame): pandas DataFrame with scores.
"""
# Collect all scores in a dictionary, consisting of lists of scores.
scores = defaultdict(list)
scores['class_label'] = list(self.class_mapping.keys())
for label in scores['class_label']:
scores['average_accuracy'].append(self.average_accuracy(class_label=label))
scores['f1'].append(self.f1(class_label=label))
scores['precision'].append(self.precision(class_label=label))
scores['sensitivity'].append(self.sensitivity(class_label=label))
scores['specificity'].append(self.specificity(class_label=label))
scores['TP'].append(self.true_positives(class_label=label))
scores['TN'].append(self.true_negatives(class_label=label))
scores['FP'].append(self.false_positives(class_label=label))
scores['FN'].append(self.false_negatives(class_label=label))
# Convert the dictionary to a DataFrame.
df = pd.DataFrame(scores)
return df
[docs] def sensitivity(self, class_label=None):
"""
Compute sensitivity from confusion matrix.
Args:
class_label (str, optional): class label that represents the positive outcome.
If None, the last class is taken.
Defaults to None.
Returns:
(float): sensitivity.
"""
true_positives = self.true_positives(class_label=class_label)
false_negatives = self.false_negatives(class_label=class_label)
return true_positives / (true_positives + false_negatives)
[docs] def specificity(self, class_label=None):
"""
Compute specificity from confusion matrix.
Args:
class_label (str, optional): class label that represents the positive outcome.
If None, the last class is taken.
Defaults to None.
Returns:
(float): specificity.
"""
true_negatives = self.true_negatives(class_label=class_label)
false_positives = self.false_positives(class_label=class_label)
return true_negatives / (true_negatives + false_positives)
[docs] def support(self, class_label=None):
"""
Return the number of samples classified as the specified class_label in y_true.
Args:
class_label (str, optional): class label to count the support of.
If None, the last class is taken.
Defaults to None.
Returns:
(int): the support.
"""
idx_class = self._get_class_idx(class_label=class_label)
cm = self.matrix
return int(np.sum(cm[idx_class, :]))
[docs] def true_negatives(self, class_label=None):
"""
Return the number of true negatives.
Args:
class_label (str, optional): class label that represents the positive outcome.
If None, the last class is taken.
Defaults to None.
Returns:
(int): the number of true negatives.
"""
idx_positive = self._get_class_idx(class_label=class_label)
cm = self.matrix
n_tot = np.sum(cm)
# Determine true negatives (one vs all).
true_negatives = n_tot \
- np.sum(cm[idx_positive, :]) \
- np.sum(cm[:, idx_positive]) \
+ cm[idx_positive, idx_positive]
return true_negatives
[docs] def true_positives(self, class_label=None):
"""
Return the number of true positives.
Args:
class_label (str, optional): class label that represents the positive outcome.
If None, the last class is taken.
Defaults to None.
Returns:
(int): the number of true positives.
"""
idx_positive = self._get_class_idx(class_label=class_label)
cm = self.matrix
true_positives = cm[idx_positive, idx_positive]
return true_positives
def _check_class_label(self, class_label):
"""
Check if class_label is in self.class_mapping.
Args:
class_label (str): class label to check.
Raises:
ValueError: if class_label is invalid.
"""
if class_label not in self.class_mapping:
raise ValueError('Invalid input argument "{}" for class_label. Choose from: {}.'
.format(class_label, '"' + '", "'.join(self.class_mapping.keys()) + '"'))
def _get_class_idx(self, class_label):
"""
Return the row/column index of the class corresponding to class_label in the confusion matrix.
Args:
class_label (str, optional): class label to get the index of in the confusion matrix.
If None, the last class is taken.
Defaults to None.
Returns:
idx (int): row/column index of the class corresponding to class_label in the confusion matrix.
"""
if class_label is None:
# The default.
idx = -1
else:
self._check_class_label(class_label=class_label)
class_number = self.class_mapping[class_label]
if class_number not in self.unique_classes:
raise ValueError('Class "{}" does not appear in confusion matrix (possibly because no samples belong '
'to this class).'.format(class_label))
idx = np.where(self.unique_classes == class_number)[0][0]
return idx
[docs]def average_silhouette(x, y):
"""
Return the average silhouette of clustered points.
Compares the average distance of a point to all points within its cluster to the distance of the point to the
center of the closest other cluster.
Args:
x (np.ndarray): (n, m) array of n samples with m features.
y (np.ndarray): (n, ) array of n samples with cluster/class numbers. May not contain np.nan values.
Returns:
silhouette (float): silhouette.
"""
warnings.warn('DeprecationWarning: Use sklearn.metrics.silhouette_score')
x = np.asarray(x)
y = np.asarray(y)
def dissimilairy(x1, x2):
return np.sqrt(np.nansum((x1 - x2) ** 2, axis=1))
# Compute cluster centers.
cluster_numbers = np.unique(y)
cluster_centra = np.zeros((len(cluster_numbers), x.shape[1]))
for idx, c in enumerate(cluster_numbers):
x_cluster = x[y == c]
cluster_centra[idx, :] = np.nanmean(x_cluster, axis=0)
# For each observation i, calculate the average dissimilarity ai between i and all other points
# of the cluster to which i belongs.
sil = np.empty(len(x))
sil.fill(np.nan)
for i, xi, yi in zip(range(len(x)), x, y):
cluster_points = x[y == yi]
diss_a = dissimilairy(xi, cluster_points)
ai = np.nanmean(diss_a)
# Compute the dissimilarity between i and its “neighbor” cluster, i.e., the nearest one to which
# it does not belong.
other_clusters = cluster_centra[cluster_numbers != yi, :]
diss_b = dissimilairy(xi, other_clusters)
bi = np.min(diss_b)
# Compute silhouette of point:
sil[i] = (bi - ai) / max(ai, bi)
return np.nanmean(sil)
[docs]def binary_classification_scores(y_true, y_pred, y_score=None):
"""
Compute some scores for a binary classification algorithm by comparing predicted output to true labels.
Args:
y_true (np.ndarray): 1D array with true class numbers (0 or 1).
y_pred (np.ndarray, None): 1D array with predicted class numbers (0 or 1). If None and y_score is given,
y_pred is determined from y_score by applying a threshold that maximizes F1 score.
y_score (np.ndarray): predicted scores/probabilities, see sklearn.metrics.roc_auc_score().
Returns:
scores_merged (dict): dictionary containing some scores for the classification.
"""
if y_pred is None:
raise ValueError('y_pred` cannot be None.')
# Area under ROC curve.
if y_score is not None:
auc = roc_auc_score(y_true, y_score)
else:
auc = np.nan
# Scores that can be computed from a confusion matrix.
cm = ConfusionMatrix(y_true, y_pred, class_mapping={'negative_class': 0, 'positive_class': 1})
scores_general = cm.scores_general()
scores_per_class = cm.scores_per_class()
# Only select the scores of the positive class.
scores_pos = scores_per_class[scores_per_class.class_label == 'positive_class'].drop(
columns='class_label').squeeze().to_dict()
# Merge all scores into one dictionary.
scores_merged = {'auc': auc, **scores_general, **scores_pos}
return scores_merged
[docs]def cluster_inertia(x, y):
"""
Return the inertia of clustered points.
Args:
x (np.ndarray): (n, m) array of n samples with m features. May not contain nan.
y (np.ndarray): (n, ) array of n samples with cluster/class numbers. May not contain np.nan values.
Returns:
inertia (float): inertia.
"""
x = np.asarray(x)
y = np.asarray(y)
cluster_numbers = np.unique(y)
# Sum squared distance of each sample to its cluster centroid.
inertia = 0
for idx, c in enumerate(cluster_numbers):
x_cluster = x[y == c]
center = np.mean(x_cluster, axis=0)
diff = x_cluster - center
sum_squared_diff = np.sum(diff ** 2)
inertia += sum_squared_diff
inertia /= x.shape[0]
return inertia
def drop_samples_score(y_true, y_pred, score_fun, drop_rates=np.linspace(0, 1, 20), niter=1):
"""
Return classification scores after droppping some samples (drop_rates) in a random and optimal fashion.
Args:
y_true, y_pred: classifications.
score_fun (function): function that returns a score (should take two inputs: y_true, y_pred).
drop_rates (np.ndarray or list): fractions of sampes to drop (between 0 and 1).
niter (int): how many times to repeat the procedure, shuffling samples each iteration.
Returns:
scores_rand (np.ndarray): (niter, len(drop_rates)) array with scores for each iteration and drop rate,
with random dropping.
scores_best (np.ndarray): (niter, len(drop_rates)) array with scores for each iteration and drop rate,
with optimal dropping (dropping misclassifications).
"""
n = len(y_true)
scores_rand = []
scores_best = []
for _ in range(niter):
# Shuffle to maintain randomness after sorting.
y_true_, y_pred_ = shuffle(y_true, y_pred)
match = y_true_ == y_pred_
idx_sort = np.argsort(match)
scores_rand_per_p = []
scores_best_per_p = []
for p in drop_rates:
# Number of samples to drop.
n_drop = int(p * n)
# Randomly drop a fraction of samples.
y_true_rand, y_pred_rand = y_true_[n_drop:], y_pred_[n_drop:]
# Drop misclassifications.
y_true_best, y_pred_best = y_true_[idx_sort[n_drop:]], y_pred_[idx_sort[n_drop:]]
# Compute metric.
score_rand = score_fun(y_true_rand, y_pred_rand)
score_best = score_fun(y_true_best, y_pred_best)
# Collet.
scores_rand_per_p.append(score_rand)
scores_best_per_p.append(score_best)
# Collect.
scores_rand.append(scores_rand_per_p)
scores_best.append(scores_best_per_p)
# To arrays (niter, len(drop_rates)).
scores_rand = np.asarray(scores_rand)
scores_best = np.asarray(scores_best)
return scores_rand, scores_best
[docs]def fscore_carolina(y_true, y_score):
"""
Compute the F-score (TODO ask Mario for a reference).
Implemented for binary classification using 1 feature.
This measure characterizes the distance of the feature values between the two clusters.
The distance metric is normalized by the sum of the variance in each class.
So intuitively, it is a ratio of the separation over the variance.
Args:
y_true (np.ndarray): array containing ones and zeros specifying the class per sample.
y_score (np.ndarray): array containing the feature value of each sample.
Returns:
f_score (float): the F-score.
Examples:
>>> np.random.seed(0)
>>> f0 = np.random.normal(loc=4, scale=1, size=100000)
>>> f1 = np.random.normal(loc=2, scale=1, size=100000)
>>> y0 = np.zeros(len(f0))
>>> y1 = np.ones(len(f1))
>>> y_true = np.concatenate((y0, y1))
>>> y_score = np.concatenate((f0, f1))
>>> fscore = fscore_carolina(y_true, y_score)
>>> print(fscore) # Expected score is 1.
1.0004474426175411
"""
y_true = np.asarray(y_true)
y_score = np.asarray(y_score)
# Make the features have zero mean.
features = y_score - np.nanmean(y_score)
# Extract features per class.
features_0 = features[y_true == 0]
features_1 = features[y_true == 1]
# Compute the mean per class. Note that if one mean is positive, the other mean must be negative in order for
# the mean of all features together to be zero.
mean_0 = np.mean(features_0)
mean_1 = np.mean(features_1)
# A measure for the separation is the sum of the squared averages per class.
# If we would have added the absolute values (which is similar to taking the square values), we would be computing
# the distance between the 2 cluster centers. So adding the square averages is also a metric for the separation.
separation = mean_0**2 + mean_1**2
# Large distance between cluster averages are meaningless if the variances of the two clusters are relatively high.
# Therefore, we normalize the separation by the sum of the variances.
var_0 = np.var(features_0, ddof=1)
var_1 = np.var(features_1, ddof=1)
sum_variance = (var_0 + var_1)
f_score = separation/sum_variance
return f_score
def find_y_pred(y_true, y_score, **kwargs):
"""
Find threshold for y_score to determine y_pred, by maximizing F1-score.
Args:
y_true (np.ndarray): true binary scores consisting of 0s and 1s (see also sklearn.metric.precision_recall_curve()).
y_score (np.ndarray): predicted probability scores (see also sklearn.metric.precision_recall_curve()).
**kwargs: for sklearn.metric.precision_recall_curve().
Returns:
y_pred (np.ndarray): predictions that maximize F1 score.
Examples:
>>> y_score = np.random.rand(100,)
>>> y_true = y_score < 0.43
>>> y_pred = find_y_pred(y_true, y_score)
>>> np.mean(y_pred == y_true)
1.0
"""
# Compute precision recall curve for both possible positive labels.
precision_1, recall_1, thresholds_1 = precision_recall_curve(y_true, y_score, pos_label=0, **kwargs)
precision_2, recall_2, thresholds_2 = precision_recall_curve(y_true, y_score, pos_label=1, **kwargs)
# F1-score.
f1_scores_1 = 2 * recall_1 * precision_1 / (recall_1 + precision_1)
f1_scores_2 = 2 * recall_2 * precision_2 / (recall_2 + precision_2)
# Choose the best threshold.
if np.nanmax(f1_scores_1) > np.nanmax(f1_scores_2):
threshold = thresholds_1[np.nanargmax(f1_scores_1)]
y_pred = y_score < threshold
else:
threshold = thresholds_2[np.nanargmax(f1_scores_2)]
y_pred = y_score >= threshold
return y_pred
def find_threshold_f1(y_true, y_score, **kwargs):
"""
Find threshold for y_score to determine y_pred, by maximizing F1-score.
Args:
y_true (np.ndarray): true binary scores consisting of 0s and 1s (see also sklearn.metric.precision_recall_curve()).
y_score (np.ndarray): predicted probability scores (see also sklearn.metric.precision_recall_curve()).
**kwargs: for sklearn.metric.precision_recall_curve().
Returns:
y_pred (np.ndarray): predictions that maximize F1 score.
Examples:
>>> np.random.seed(43)
>>> y_score = np.random.rand(100,)
>>> y_true = y_score > 0.43
>>> threshold = find_threshold_f1(y_true, y_score)
>>> print(np.round(threshold, 2))
0.43
"""
# Compute precision recall curve for both possible positive labels.
precision, recall, thresholds = precision_recall_curve(y_true, y_score, **kwargs)
# F1-score.
f1_scores = 2 * recall * precision / (recall + precision)
# Choose the best threshold.
threshold = thresholds[np.nanargmax(f1_scores)]
return threshold
def mann_whitney_u(y_true, y_score, **kwargs):
"""
Compute the Mann-Whitney U statistic and p-value for two classes.
Wrapper of the scipy.stats.mannwhithneyu() function that accepts the input arguments in a different format and
can handle NaN values.
Use the Mann-Whitney U test to determine whether two independent samples were selected from populations having the
same distribution (https://en.wikipedia.org/wiki/Mann%E2%80%93Whitney_U_test).
Args:
y_true (np.ndarray): true binary labels (2 unique classes).
y_score (np.ndarray): predicted scores/probabilities. Must have same size as y_true.
**kwargs (optional): keyword arguments for scipy.stats.mannwhithneyu().
Returns:
statistic (float): the test statistic under the large-sample approximation that the rank sum statistic is
normally distributed, see scipy.stats.mannwhithneyu().
pvalue (float): the two-sided p-value of the test, see scipy.stats.mannwhithneyu().
"""
# Remove NaNs.
y_true, y_score = _remove_nans(y_true, y_score)
# Extract unique classes.
y_unique = np.unique(y_true)
if len(y_unique) != 2:
raise ValueError('Expected 2 classes. Instead got {} classes: {}. This function works for 2 classes only.'
.format(len(y_unique), y_unique))
# Extract feature values per class.
x = y_score[y_true == y_unique[0]]
y = y_score[y_true == y_unique[1]]
try:
statistic, pvalue = scipy.stats.mannwhitneyu(x=x, y=y, **kwargs)
except ValueError:
# The test raises a ValueError if all numbers are identical. Return np.nan instead of raising an error.
statistic, pvalue = np.nan, np.nan
return statistic, pvalue
def multiple_classification_scores(y_true, y_pred):
metrics = dict()
# Remove possible nans.
not_nan_mask = np.logical_and(~np.isnan(y_true), ~np.isnan(y_pred))
y_true = y_true[not_nan_mask]
y_pred = y_pred[not_nan_mask]
metrics['accuracy'] = accuracy_score(y_true, y_pred)
metrics['kappa'] = cohen_kappa_score(y_true, y_pred)
return metrics
def ranksums(y_true, y_score):
"""
Compute the Wilcoxon rank-sum statistic and p-value for two classes.
Wrapper of the scipy.stats.ranksums() function that accepts the input arguments in a different format and
can handle NaN values.
Use the Wilcoxon rank-sum test to determine whether two samples were selected from populations having
the same distribution (https://en.wikipedia.org/wiki/Wilcoxon_signed-rank_test).
Args:
y_true (np.ndarray): true binary labels (2 unique classes) coded as integers in a 1D array.
y_score (np.ndarray): predicted scores/probabilities. Must have same size as y_true.
Returns:
statistic (float): the test statistic under the large-sample approximation that the rank sum statistic is
normally distributed, see scipy.stats.ranksums().
pvalue (float): the two-sided p-value of the test, see scipy.stats.ranksums().
"""
# Remove NaNs.
y_true, y_score = _remove_nans(y_true, y_score)
# Extract unique classes.
y_unique = np.unique(y_true)
if len(y_unique) != 2:
raise ValueError('Expected 2 classes. Instead got {} classes: {}. This function works for 2 classes only.'
.format(len(y_unique), y_unique))
# Extract feature values per class.
x = y_score[y_true == y_unique[0]]
y = y_score[y_true == y_unique[1]]
statistic, pvalue = scipy.stats.ranksums(x=x, y=y)
return statistic, pvalue
[docs]def roc_auc_score(y_true, y_score, remove_nans=True, **kwargs):
"""
Compute Area Under the Receiver Operating Characteristic Curve (ROC AUC) from prediction scores.
Wrapper of the sklearn.metrics.roc_auc_score() function that can handle NaN values and negative relation between
y_true and y_score.
Args:
y_true (np.ndarray): true binary labels or binary label indicators, see sklearn.metrics.roc_auc_score().
y_score (np.ndarray): predicted scores/probabilities, see sklearn.metrics.roc_auc_score().
**kwargs (optional): keyword arguments for sklearn.metrics.roc_auc_score().
Returns:
auc (float): area under the ROC curve, or np.nan if this area is not defined.
"""
y_true = np.asarray(y_true)
y_score = np.asarray(y_score)
# Remove NaNs.
if remove_nans:
y_true, y_score = _remove_nans(y_true, y_score)
if len(np.unique(y_true)) < 2:
# AUC not defined if no samples (left) in y_true or if only one class (left) in y_true.
auc = np.nan
else:
# Compute and return auc.
auc_1 = metrics.roc_auc_score(y_true=y_true, y_score=y_score, **kwargs)
# If we would multiply the feature with -1, we get the score for the negative relation between y and x.
# This is equal to 1 - score_1.
auc_2 = 1 - auc_1
# Take the maximum value of the positive and negative relation.
auc = max([auc_1, auc_2])
return auc
[docs]def separation_score(f1, f2):
"""
Compute a score that captures how much two sets/distributions are separated.
It is 0.5 * the squared difference between the means of the two sets, normalized by the sum of their variances.
Ignores nan values.
It is equivalent to fscore_coralina if the sizes of both sets are equal.
For unequal set sized, this metric is not dependend on the size of the sets, unlike fscore_carolina().
I.e., if two sets have a certain mean and std, increasing the numeber of samples in one of the sets does not
significantly affect the score (apart from the influence on the computation of the std).
Args:
f1 (np.ndarray): array with values of first set (e.g. feature values).
f2 (np.ndarray): array with values of second set (e.g. feature values).
Returns:
score (float): normalized score that increases if the separation of the two sets is larger.
Examples:
>>> np.random.seed(43)
>>> f1 = np.random.normal(loc=4, scale=1, size=1000)
>>> f2 = np.random.normal(loc=2, scale=1, size=100000)
>>> separation_score(f1, f2) # Expected score is 1.
1.0086296863621815
"""
mu1 = np.nanmean(f1)
mu2 = np.nanmean(f2)
var1 = np.nanvar(f1, ddof=1)
var2 = np.nanvar(f2, ddof=1)
# Use an arbitrary scaling factor of 0.5 to be equivalent to fscore_carolina for equal set sizes.
score = 0.5*((mu1 - mu2)**2/(var1 + var2))
return score
def _remove_nans(x, y):
"""
Remove entries in x and y where x or y is nan.
Args:
x (np.ndarray): data array.
y (np.ndarray): data array with same shape as x.
Returns:
x_out (np.ndarray): copy of `x` without nan values.
y_out (np.ndarray): copy of `y` without nan values.
"""
x_out, y_out = remove_nans(x, y)
return x_out, y_out
[docs]def compute_separation_scores(df, y='y', n_combine=1, subsets=None, normalize=True, score_fun=None, dropna=True,
n_permutations=99, min_score=-np.inf, n_jobs=None, verbose=1, seed=None):
"""
Compute scores and pvalues quantifying how well each combination of features in df separate predefined classes.
The predefined classes are determined by the class label, which is column y in df.
The pvalue is determined using permutation test. More specifically, it implements Test 1 in:
Ojala and Garriga. Permutation Tests for Studying Classifier Performance.
The Journal of Machine Learning Research (2010) vol. 11
Args:
df (pd.DataFrame): DataFrame with shape (n_samples, n_features + 1), containing the features and
class label (`y`).
y (str, optional): column in df containing the class labels.
n_combine (int, optional): number of features to consider when making combinations.
subsets (list, optional): list with length n_combine. Determines which features are combined.
Elements are column names (list of strings) in the df. combinations are made by taking one features out of
every list with column names. If specified, n_combine parameter is ignored.
normalize (bool, optional): normalize the features (zero mean, unit SD).
score_fun (function, optional): function that takes as input the feature vector X and the class labels y,
i.e.: fun(X, y). It should return one number (e.g. how well features in X separate classes y).
Higher score computed by score_fun must relate to better clustering.
dropna (bool, optional): whether to drop samples with nan values.
(NOTE: may be different for different combinations of features since
nan values may occur only in certain features).
n_permutations (int, optional): number of permutations to do for computing the pvalues.
If 0 or None, no pvalues are computed.
min_score (float, optional): minimum score for computing pvalue (pruning).
If score is lower than min_score, pvalue is not computed and set to NaN.
n_jobs (int, optional): Number of jobs to run in parallel when computing the pvalue using permutations.
verbose (int, optional): verbose level. Choices: 0, 1, 2.
seed (int, optional): seed for random generator for repeatability of permutation test.
Returns:
df_scores (pd.DataFrame): dataframe with scores for each possible combination of features,
sorted by score (from high to low).
The computed score depends on `score_fun`.
Examples:
>>> # Create toy dataset.
>>> np.random.seed(43)
>>> df = pd.DataFrame()
>>> for i in range(5): df['x{}'.format(i+1)] = np.random.rand(20)
>>> df['y'] = df.x3 > df.x2 # x3 and x2 are explaining features for class y.
>>> # Compute scores.
>>> n_combine = 2 # Number of features to combine (number of predictors).
>>> y = 'y' # Column with class labels.
>>> df_scores = compute_separation_scores(df, y=y, n_combine=n_combine, verbose=0) # Scores are sorted in decreasing order.
>>> print(df_scores) # Note that the feature combination with highest score is features x2 and x3.
score pvalue labels
0 0.365212 0.01 [x2, x3]
1 0.279842 0.01 [x1, x2]
2 0.232275 0.01 [x2, x4]
3 0.216710 0.02 [x2, x5]
4 0.106424 0.08 [x1, x3]
5 0.095038 0.03 [x3, x5]
6 0.094238 0.14 [x3, x4]
7 -0.002126 0.36 [x1, x4]
8 -0.031383 0.76 [x1, x5]
9 -0.059352 0.90 [x4, x5]
"""
# Check inputs.
if y not in df:
raise ValueError('y="{}" is not a column in `df`.'.format(y))
# Default score function.
if score_fun is None:
# Use clustering metric silhouette.
score_fun = silhouette_score
# Remove samples for which the y label is nan.
if dropna:
df = df.dropna(subset=[y])
# Split features from y labels.
X = df.drop(columns=y)
y = df[y]
n_samples, n_features = X.shape
if subsets is not None:
n_combine = len(subsets)
if n_combine > n_features:
raise ValueError('n_select ({}) must be smaller or equal to the total number of features ({}).'
.format(n_combine, n_features))
# Normalize.
if normalize:
X = (X - X.mean()) / X.std()
# Seed random generator.
if seed is not None:
np.random.seed(seed)
# Get all possible combinations of features.
if subsets is None:
indices = list(itertools.combinations(range(n_features), n_combine))
else:
# Convert column names to indices.
subsets_indices = []
for set_i in subsets:
subsets_indices.append([df.columns.get_loc(c) for c in set_i])
indices = list(itertools.product(*subsets_indices))
# Initialize progress bar.
bar = ProgBar(len(indices), stream=sys.stdout)
# Loop over feature combinations.
out_val = []
out_pval = []
out_labels = []
for idx in indices:
X_i = X.iloc[:, list(idx)]
# Drop samples that have nan in any feature.
if dropna:
mask = ~X_i.isna().any(axis=1)
X_i = X_i[mask]
y_i = y[mask]
else:
y_i = y
if len(y_i) < n_samples/2:
# Too little samples -> skip.
continue
# Compute score.
score_i = score_fun(X_i, y_i)
if score_i >= min_score:
# Compute p-value using permutation test (Ojala et Garriga 2010).
par = Parallel(n_jobs=n_jobs, verbose=verbose == 2) # For parallel computing.
if par.n_jobs > 1:
# Parallelization possible -> parallel computing is faster.
permutation_scores = par(delayed(score_fun)(X_i, np.random.permutation(y_i)) for _ in range(n_permutations))
else:
# No parallelization possible -> list comprehension is faster.
permutation_scores = np.array([score_fun(X_i, np.random.permutation(y_i)) for _ in range(n_permutations)])
# Compute pvalue (definition 1 in Ojala et Garriga 2010).
pval_i = (np.sum(permutation_scores >= score_i) + 1.0) / (n_permutations + 1)
else:
pval_i = np.nan
# Collect score, pvalue and feature names for current combination.
out_val.append(score_i)
out_pval.append(pval_i)
out_labels.append(X_i.columns.to_list())
# Update progress bar.
if verbose > 0:
bar.update()
# Return scores as a DataFrame.
df_scores = pd.DataFrame(data={
'score': out_val,
'pvalue': out_pval,
'labels': out_labels
}).sort_values(by='score', ascending=False, ignore_index=True)
if n_permutations == 0:
# No permutations computed, set pvalue to nan.
df_scores['pvalue'] = np.nan
return df_scores