Source code for nnsa.utils.dataframes

import sys
import os
import warnings
from functools import partial

import numpy as np
import pyprind
import seaborn as sns
import sklearn
from matplotlib import pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
from scipy.stats import pearsonr, spearmanr, kendalltau, mannwhitneyu
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.preprocessing import StandardScaler
import pandas as pd
import scipy.stats
import statsmodels.api as sm

from nnsa.models.evaluation import fscore_carolina, roc_auc_score, ranksums, mann_whitney_u, average_silhouette
from nnsa.stats.paired import wilcoxon
from nnsa.stats.partially_paired import opt_pooled_ttest
from nnsa.stats.statistics import partial_corr, test_groups_pairwise
from nnsa.stats.utils import make_stars
from nnsa.training.utils import split_data
from nnsa.utils.other import convert_string_auto
from nnsa.utils.plotting import subplot_rows_columns, stripboxplot, heatmap, value_to_color

__all__ = [
    'boxplot_paired',
    'collect_high_cor_values',
    'compute_correlation_matrix',
    'different_regression_scores',
    'feature_correction_lin_reg',
    'pair_separation_scores',
    'pairplot',
    'pairplot_best',
    'pca',
    'plot_column_data',
    'read_dataframe',
    'regplots',
    'scatter_paired',
    'standardize',
]


def annotate_significant_box(x, y, df, test_fun=mannwhitneyu, order=None, ax=None, verbose=1):

    if order is None:
        order = [t.get_text() for t in ax.get_xticklabels()]
    df_stat, df_pval = test_groups_pairwise(x=y, group=x, data=df, test_fun=test_fun)

    xlim = ax.get_xlim()
    ymax = df[y].max()
    yrange = ymax - df[y].min()
    for ii, group_i in enumerate(order):
        for jj, group_j in enumerate(order):
            if jj <= ii:
                continue
            pval = df_pval.loc[group_i, group_j]
            xtext = (ii + jj) / 2
            ytext = ymax + (1 + ((ii + jj) == 2))*yrange*0.075
            stars = get_pstars(pval)
            if stars:
                ax.text(xtext, ytext, stars, ha='center', va='bottom')
                ax.plot([ii, jj], [ytext]*2, color='k', linewidth=0.7)
                if verbose:
                    print(f'{group_i} vs {group_j}: p={pval}')
    ax.set_xlim(xlim)


def barplot(y, data=None, color=None, x='index', annotation=None, text_inside=True, horizontal=True, ax=None, **kwargs):
    if ax is None:
        ax = plt.gca()

    if isinstance(x, str):
        if data is None:
            raise ValueError(f'`Cannot interpret x="{x}" if `data` is not given.')
        if x == 'index':
            xvalues = data.index
        else:
            xvalues = data[x]
    else:
        xvalues = np.asarray(x)

    if isinstance(y, str):
        if data is None:
            raise ValueError(f'`Cannot interpret y="{y}" if `data` is not given.')
        yvalues = data[y]
    else:
        yvalues = np.asarray(y)

    if isinstance(color, str):
        if data is not None and color in data.columns:
            color = data[color]

    if isinstance(annotation, str):
        if data is None:
            raise ValueError(f'`Cannot interpret annotation="{annotation}" if `data` is not given.')
        annotation_texts = data[annotation]
    else:
        annotation_texts = annotation

    # Plot bars.
    bar_kwargs = dict(dict(color=color), **kwargs)
    if horizontal:
        ax.barh(xvalues, yvalues, **bar_kwargs)
    else:
        ax.bar(xvalues, yvalues, **bar_kwargs)

    # Add texts at end of bars.
    if annotation_texts is not None:
        for idx, (text, yval) in enumerate(zip(annotation_texts, yvalues)):
            if not text_inside:
                # After bar.
                if horizontal:
                    ax.text(yval, idx, text, va='center',
                            ha='left' if yval >= 0 else 'right')
                else:
                    ax.text(idx, yval, text, ha='center',
                            va='bottom' if yval >= 0 else 'top')
            else:
                # In bar.
                if horizontal:
                    ax.text(yval, idx, text, va='center',
                            ha='right' if yval >= 0 else 'left')
                else:
                    ax.text(idx, yval, text, ha='center',
                            va='top' if yval >= 0 else 'bottom', rotation=90)

    if not horizontal:
        # Rotate labels on x-axis for better readability.
        plt.xticks(rotation=45, horizontalalignment='right')

    # Add axis label.
    label = x if isinstance(x, str) else ''
    if horizontal:
        ax.set_xlabel(label)
    else:
        ax.set_ylabel(label)


[docs]def collect_high_cor_values(df_cor, min_val=0.8): """ Collect all multimodal (absolute) correlation values higher than min_val. Only considers correlations between different modalities (encoded by the first 3 characters). Args: df_cor (pd.DataFrame): correlation dataframe (df.corr()). min_val (float): vlaues higher than min_val are collected. Returns: series_report (pd.Series): series with high correlation scores. """ columns = list(df_cor.columns) series_report = pd.Series() append_list = [] for i, col in enumerate(columns[:-1]): candidates = columns[i+1:] columns_to_check = [c for c in candidates if c[:3] != col[:3]] series = df_cor.loc[col, columns_to_check] series_abs = series.abs() to_report = series[series_abs >= min_val] to_report.index += '_VS_{}'.format(col) append_list.append(to_report) series_report = series_report.append(append_list) return series_report
[docs]def compute_correlation_matrix( data, x_columns=None, y_columns=None, control_var=None, corr_method='pearson'): """ Compute correlations between `x_columns` and `y_columns` in `data`. Args: data (pd.DataFrame, np.ndarray): dataframe or arrays with columns. x_columns (list): list with columns names for x. If None, takes all numeric columns in `data`. y_columns (list): list with columns names for y. If None, `x_columns` will be used. control_var (str): variable to control for. If specified, computes partial correlation values. corr_method (str, optional): which correlation function to use ('pearson', 'spearman', 'kendall'). Returns: df_corr (pd.DataFrame): dataframe with correlation values. Columns correspond to `x_columns` and indices to `y_columns`. df_pval (pd.DataFrame): dataframe with p-values corresponding to the correlations (same shape as `df_corr`). """ if not isinstance(data, pd.DataFrame): # To dataframe. data = pd.DataFrame(data) if x_columns is None: # By default take all numeric columns in `data`. x_columns = data.select_dtypes(include=np.number).columns if y_columns is None: # By default correlate x with itself. y_columns = x_columns.copy() corr_all = [] pval_all = [] for x_col in x_columns: corr_i = corrwith_pvalue( df=data[y_columns], other=data[x_col], control_var=data[control_var] if control_var is not None else None, method=corr_method) corr = corr_i['correlation_coef'].rename(x_col) pval = corr_i['p-value'].rename(x_col) corr_all.append(corr) pval_all.append(pval) df_corr = pd.concat(corr_all, ignore_index=False, axis=1) df_pval = pd.concat(pval_all, ignore_index=False, axis=1) return df_corr, df_pval
def corrwith_pvalue(df, other, control_var=None, method='pearson'): """ Compute the Pearson correlation and p-value as returned by scipy.stats.pearsonr(). Args: df (pd.DataFrame): DataFrame with column data. other (pd.Series, str): Series with same indices as `df` or column name of the feature in `df` for which to compute the correlations. control_var (pd.Series, str): variable to control for. If specified, computes partial correlation values. method (str, fun): string specifying the correlation function ('pearson', 'spearman', 'kendall'). Or a callable function that returns a tuple with two elements (numbers): the statistic (correlation) and its pvalue. Returns: df_out (pd.DataFrame): DataFrame with the correlation and p-value of each numerical column in df with column `other`. """ # Extract numerical columns. df = df.select_dtypes(include='number') if isinstance(other, pd.Series): if other.index.is_unique: # Use index to extract the corresponding rows. other = other[df.index.tolist()] else: if len(other) != len(df): raise ValueError('`other` and `df` should have the same length if `other` ' 'contains duplicate values in index.') elif isinstance(other, str): if other not in df: raise ValueError('`other` "{}" is not a (numerical) column in DataFrame.'.format(other)) else: other = df[other] else: raise ValueError('Invalid type for `other`. Expected a pandas Series or a string (column name). Got {}.' .format(type(other))) if control_var is None: control_var = None elif isinstance(control_var, pd.Series): control_var = control_var.loc[df.index] elif isinstance(control_var, str): if control_var not in df: raise ValueError('`control_var` "{}" is not a (numerical) column in DataFrame.'.format(control_var)) else: control_var = df[control_var] else: raise ValueError('Invalid type for `control_var`. Expected a pandas Series or a string (column name). Got {}.' .format(type(control_var))) method = method.lower() if method == 'pearson': corfun = scipy.stats.pearsonr elif method == 'spearman': corfun = scipy.stats.spearmanr elif method == 'kendall': corfun = scipy.stats.kendalltau elif callable(method): corfun = method else: raise ValueError('Invalid input for `method` ({})'.format(method)) def compute_correlation(x, y, z): x = x.rename('x') y = y.rename('y') if z is None: # Normal correlation. array = pd.concat((x, y), axis=1, sort=False, ignore_index=False).dropna(how='any').values if array.shape[0] < 2: cor, pval = np.nan, np.nan else: cor, pval = corfun(array[:, 0], array[:, 1]) else: z = z.rename('z') # Partial correlation. array = pd.concat((x, y, z), axis=1, sort=False, ignore_index=False).dropna(how='any').values if array.shape[0] < 2: cor, pval = np.nan, np.nan else: cor, pval = partial_corr(x=array[:, 0], y=array[:, 1], z=array[:, 2], method=method) return cor, pval # Compute the correlation coefficient and the p-value in one go and store them temporarily as a tuple in a series. series_temp = df.apply(partial(compute_correlation, y=other, z=control_var)) series_cor = series_temp.apply(lambda x: x[0]) series_pval = series_temp.apply(lambda x: x[1]) df_out = pd.concat((series_cor, series_pval), axis=1, sort=False, ignore_index=False)\ .rename(columns={0: 'correlation_coef', 1: 'p-value'}) return df_out def plot_heatmap(df, size_scale=500, annotations=None, ax=None, **kwargs): """ Plot a heatmap showing the values in the `df` matrix. """ melted = pd.melt(df.reset_index(), id_vars='index') melted.columns = ['y', 'x', 'value'] heatmap_kwargs = dict(dict( color_range=[-1, 1], size_range=[0, 1], ), **kwargs) if annotations is not None: if callable(annotations): annotations = df.applymap(annotations) if isinstance(annotations, pd.DataFrame): # Make sure annotations is aligned with the df. annotations = annotations.loc[df.index, df.columns] annotations = pd.melt(annotations.reset_index(), id_vars='index') annotations.columns = ['y', 'x', 'annotation'] assert annotations.shape == melted.shape annotations = annotations['annotation'] else: raise ValueError('`annotations` must be a function of a pandas DataFrame. Got a {}.' .format(type(annotations))) heatmap( melted['x'], melted['y'], **dict(dict( color=melted['value'], size=melted['value'].abs(), size_scale=size_scale, x_order=df.columns, y_order=df.index[::-1], annotations=annotations, ax=ax), **heatmap_kwargs) ) def correlation_scores(df, y, columns=None): """ Compute the correlation of each feature in `columns` with feature `y`.. Args: df (pd.DataFrame): DataFrame containing the data. columns (list, optional): list of (numeric) columns that hold the features. If None, all numeric columns are used, except groupby. Defaults to None. Returns: (pd.Series): correlation scores for each feature/column in descending order. """ columns = _check_columns(columns, df) feature_scores = df[columns].corrwith(df[y]) return feature_scores.sort_values(ascending=False)
[docs]def different_regression_scores(df, predictor, condition, columns=None): """ Compute the scores for a condition having effect on the relation between each of columns and predictor in df. Returns 1-pvalue that the condition is a signficiant contribution to the linear regression model. Fits the following model and assesses whether the condition terms are significant. column ~ 1 + predictor + condition + condition*predictor. # TODO Args: df: predictor: condition: columns: Returns: """ columns = _check_columns(columns, df) # Remove condition and predictor from columns. if condition in columns: columns.remove(condition) if predictor in columns: columns.remove(predictor) # Remove samples with NaN in condition. df = df[df[condition].notna()] # Remove samples with NaN in predictor. df = df[df[predictor].notna()] # Create categorical variable (0 or 1). condition_labels = np.unique(df[condition]) if len(condition_labels) != 2: raise ValueError('Number of unique conditions in should be 2. Got {} different conditions.' .format(len(condition_labels))) condition_bool = (df[condition].values == condition_labels[0]).astype(float) # get predictor values. pred = df[predictor].values # Create feature matrix X. X = np.transpose(np.vstack((np.ones(len(condition_bool)), pred, condition_bool, condition_bool * pred))) # For loop over columns. min_p_values = np.ones((len(columns), 2)) for i, feature in enumerate(columns): # Get feature values. y = df[feature].values notnan_mask = ~np.isnan(y) y_i = y[notnan_mask] X_i = X[notnan_mask, :] # Fit. res_1 = sm.OLS(y_i, X_i).fit() # Check if intercept is independent on condition. p_values = [1, 1] p_values = res_1.pvalues[2:] # if res_1.pvalues[2] > confidence_level: # # Fit again without intercept. # res_2 = sm.OLS(y_i, X_i[:, [0, 1, 3]]).fit() # p_values[1] = res_2.pvalues[-1] # # else: # p_values[1] = res_1.pvalues[3] # # # Check if interaction is independent on condition. # if res_1.pvalues[3] > confidence_level: # # Fit again without interaction. # res_3 = sm.OLS(y_i, X_i[:, [0, 1, 2]]).fit() # p_values[0] = res_3.pvalues[-1] # # else: # p_values[0] = res_1.pvalues[2] min_p_values[i, :] = p_values scores = 1 - min_p_values return pd.DataFrame(data=scores, index=columns, columns=['intercept', 'slope'])
[docs]def feature_correction_lin_reg(x, df, columns=None, except_columns=None, groupby=None, x_ref=0, corr_pvalue_threshold=0.05, postfix=None): """ Apply feature correction by subtracting a linear regression model from the feature. Useful in the following example: you want to investigate the influence of (categorical) feature `groupby` on feature y (in `columns`), however, feature x also influences feature x and is correlated with feature `groupby`. To eliminate the influence of feature x on feature y, this function first groups the data by the `groupby` feature. For the set of samples in each group, a linear regression model is fitted that predicts y as a function of x. Subsequently, this regression model is evaluated at each sample point at the corresponding feature value x, and subtracted from feature y as a way to eliminate the influence of feature x on feature y, keeping in account that the `groupby` feature might have influence on y as well. Args: x (str): the feature (column name in df) to correct for. For this feature, a linear regression model is fitted with the target feature y. Subsequently, this linear regression model is used to subtract the influence of feature x from feature y. df (pd.DataFrame): DataFrame containing the data. columns (str or list, optional): the feature column name(s) of the feature(s) to correct. If None, all numerical features, except feature x, will be corrected. Defaults to None. except_columns (str or list, optional): the column name(s) of the features not to correct (will be removed from `columns`). If None, no columns will be removed. Defaults to None. groupby (str, optional): a categorical feature that is expected to have an additional influence on y. If None, no grouping is applied. I.e., a linear regression model is fitted based on all samples, assuming that there are no features (predictors) correlating with x. If specified, the features are corrected per group. Defaults to None. x_ref (float, optional): reference value for feature x. After subtracting the contribution of feature x to feature y, it is possible to add the expected contribution at a fixed reference value for x. Defaults to 0. corr_pvalue_threshold (float, optional): the maximum p-value of the Pearson correlation coefficient (as returned by scipy.stats.pearsonr()) in order to correct the feature. Only features that correlate significantly with `x` will be corrected, i.e. features with a correlation p-value <= corr_pvalue_threshold will be corrected. Defaults to 0.05. postfix (str, optional): a postfix to add to the feature name(s) (column name(s)) that are corrected. If None, a standerd postfix will be added indicating the correction. Defaults to None. Returns: df_out (pd.DataFrame): a new DataFrame with the same number of features/columns, but where columns y are corrected and are optionally renamed using the postfix. Examples: >>> df = pd.DataFrame(data=np.tile(np.random.rand(25).reshape(-1, 1), (1, 2)), columns=['random_1', 'random_2']) >>> df_corrected = feature_correction_lin_reg('random_1', df, columns='random_2') >>> print(np.all(df_corrected['random_2_linsub_random_1'] < 1e-15)) True """ # Default parameters. columns = _check_columns(columns, df) # Remove column x. if x in columns: columns.remove(x) # Remove columns in except columns. if except_columns is not None: if not isinstance(except_columns, list): # Make sure except_columns is a list. except_columns = [except_columns] for col in except_columns: columns.remove(col) if postfix is None: postfix = '_linsub_{}'.format(x) # Extract groups. if groupby is None: all_groups = ['this_is_a_single_dummy_group_name_so_that_we_can_loop_over_all_groups'] else: all_groups = df[groupby].unique() # Create a copy of the DataFrame. df_out = df.copy() # Initialize empty dict that will hold renaming mapping. rename_dict = dict() # Loop over groups. for jj, group in enumerate(all_groups): # Extract data. if groupby is None: # Use all data. data_mask = np.ones(len(df_out)).astype(bool) else: # Use data of current group. data_mask = df[groupby] == group # Extract x data. x_i = df[x][data_mask].values x_i_notnan = ~np.isnan(x_i) # Loop over columns (features to correct). for col in columns: # Extract y data. y_i = df[col][data_mask].values # Remove nan values before fitting. y_i_notnan = ~np.isnan(y_i) notnan_mask = np.logical_and(x_i_notnan, y_i_notnan) x_i_nn = x_i[notnan_mask].reshape((-1, 1)) # Requires dimensions (samples, features). y_i_nn = y_i[notnan_mask] # Check if the correlation is high enough, otherwise do not correct this feature. if corr_pvalue_threshold is not None: corr_pvalue = scipy.stats.pearsonr(x_i_nn[:, 0], y_i_nn)[1] if corr_pvalue > corr_pvalue_threshold: continue # Fit linear regression. model = LinearRegression().fit(x_i_nn, y_i_nn) # Extract slope. a = model.coef_ # Subtract the values of the linear model from the feature and add the reference value. y_i_cor = y_i - a*(x_i - x_ref) # Replace the values in the DataFrame. df_out.loc[data_mask, col] = y_i_cor # Add new name to rename dict. if len(all_groups) == 1: rename_dict[col] = '{}{}'.format(col, postfix) elif col not in rename_dict or postfix not in rename_dict[col]: rename_dict[col] = '{}{}_{}'.format(col, postfix, jj) else: rename_dict[col] += '_{}'.format(jj) # Rename the columns in the DataFrame. df_out.rename(columns=rename_dict, inplace=True) return df_out
def groupplots(df, x, columns=None, axes=None, pvalue='auto'): """ Plot boxplots for x for each group in columns. Args: df (pd.DataFrame): DataFrame containing the data. x (str): name of the feature to plot. columns (list or str, optional): list of columns with grouping variables. Can also be a string to plot one grouping. If None, all columns are plotted. axes (list, optional): list with Axes objects to plot in. If None, a new figure with subplots is created. Defaults to None. pvalue (function): function to compute the pvalue which will be printed in the titles. If None, no pvalue is computed/printed. If 'auto' t-test will be used for 2 groups and ANOVA for more groups. Returns: axes (list): the axes objects in which the plots are made. """ columns = _check_columns(columns, df) if axes is None: # Create subplot axes. nrows, ncols = subplot_rows_columns(len(columns), minimize='rows') axes = plt.subplots(nrows=nrows, ncols=ncols, tight_layout=True)[1] axes = np.reshape([axes], -1) # Add x to columns, x must be in df[columns]. if x not in columns: columns.append(x) # Loop over columns. for i, (ax, col) in enumerate(zip(axes, columns)): if x != col: df_i = df.dropna(subset=[x, col]) # Compute pvalues. unique_labels = df_i[col].unique() if pvalue == 'auto': if len(unique_labels) == 2: pvalue_fun = lambda a, b: scipy.stats.ttest_ind(a, b)[1] else: pvalue_fun = None else: pvalue_fun = pvalue if pvalue_fun is not None: data = df_i[x] groups = () # Collect arrays of groups. for lab in unique_labels: grp_i = data[df_i[col] == lab] groups = groups + (grp_i,) pval = pvalue_fun(*groups) # Plot. stripboxplot(data=df_i, x=col, y=x, ax=ax, stripkwargs={'jitter': True}) plt.sca(ax) plt.title('{}\np-value={:.4f}'.format(col, pval, make_stars(pval))) plt.ylabel(col.split('_')[0]) return axes
[docs]def pair_separation_scores(df, groupby, x1_columns=None, x2_columns=None, standardize_columns=True, split_train=0.75, separation_metric='roc_auc', verbose=0): """ Compute cluster quality for clustering power of every feature in x_columns together with any feature in y_column. Args: df (pd.DataFrame): DataFrame containing the data. groupby (str, optional): categorical column in df that classifies the data. x1_columns (list, optional): list of (numeric) columns that hold the first features. If None, all numeric columns are used, except groupby. Defaults to None. x2_columns (list, optional): list of (numeric) columns that hold the seconds features. If None, all numeric columns are used, except groupby. Defaults to None. standardize_columns (bool, optional): if True, first standardize the features. If False, do not standardize. Defaults to True. split_train (float or None, optional): if a float, the data will be split in train and test set, where split_train is the fraction of data to train on. Must be between 0 and 1. If None, the data is not split and training and testing is done on the entire data set. Defaults to 0.75. separation_metric (str, optional): the separation metric to use. Choose from: 'silhouette', 'roc_auc', 'accuracy'. Defaults to 'roc_auc'. verbose (int, optional): verbosity level. Defaults to 0. Returns: (pd.DataFrame): table with cluster scores for any two pairs of features. """ if verbose: print('Computing 2-feature separation scores...') x1_columns = _check_columns(x1_columns, df) x2_columns = _check_columns(x2_columns, df) if groupby in x1_columns: x1_columns.remove(groupby) if groupby in x2_columns: x2_columns.remove(groupby) if standardize_columns: df = standardize(df, columns=np.unique(x1_columns + x2_columns)) # Extract y values. y = df[groupby].values # Transform y to binary labels. lb = sklearn.preprocessing.LabelBinarizer() y = lb.fit_transform(y).squeeze() # Verify that the number of classes is 2. if len(lb.classes_) != 2: raise ValueError('Expected 2 classes. Instead got {} classes: {}. This function works for 2 classes only.' .format(len(lb.classes_), lb.classes_)) # For each pair of features (columns) compute cluster quality. m = len(x1_columns) n = len(x2_columns) scores_array = np.full((m, n), np.nan) # Initialize progress bar. bar = pyprind.ProgBar(int(m*n), stream=sys.stdout) for idx_a, col_a in enumerate(x1_columns): for idx_b, col_b in enumerate(x2_columns): if col_b in x1_columns[:idx_a] and col_a in x2_columns[:idx_b]: # Already computed this score. score = np.nan else: x = df.loc[:, [col_a, col_b]].values mask_nan = np.isnan(x).any(1) x_i = x[~mask_nan] y_i = y[~mask_nan] # Compute separation. if separation_metric == 'silhouette': score = average_silhouette(x_i, y_i) elif separation_metric in ['roc_auc', 'accuracy']: if split_train is None or split_train is False: x_train, y_train, x_test, y_test = x_i, y_i, x_i, y_i else: x_train, y_train, x_test, y_test = split_data(x_i, y_i, train_frac=split_train) # Train a logistic regression model and predict on test set. if len(np.unique(y_train)) < 2: continue clf = LogisticRegression(solver='lbfgs').fit(x_train, y_train) y_score = clf.predict_proba(x_test)[:, 1] # Compute evaluation/separation metric. if separation_metric == 'roc_auc': score = roc_auc_score(y_true=y_test, y_score=y_score, average='weighted') elif separation_metric == 'accuracy': score = sklearn.metrics.accuracy_score(y_true=y_test, y_pred=np.round(y_score)) else: raise ValueError('Invalid argument "{}" for separation_metric. See code for valid options.' .format(separation_metric)) if verbose: # Update progess bar. bar.update() scores_array[idx_a, idx_b] = score return pd.DataFrame(data=scores_array, index=x1_columns, columns=x2_columns)
[docs]def pairplot(*args, **kwargs): """ Create scatterplots of each possible pairs of (specified) columns in the dataframe. Wrapper of seaborn's pairplot. Args: *args: see sns.pairplot(). **kwargs: see sns.pairplot(). Returns: see sns.pairplot() """ return sns.pairplot(*args, **kwargs)
[docs]def pairplot_best(df, groupby, n, scores=None, **kwargs): """ Create scatterplots of n pairs of columns in df, that separate samples based on groupby best. Based on cluster quality metric returned by nnsa.classifiers.evaluation.cluster_quality(). Args: df (pd.DataFrame): DataFrame containing the data. groupby (str): categorical column in df that classifies the data. n (int): number of scatterplots/pairs to show. **kwargs (optional): keyword arguments for pair_separation_scores(). Returns: axes (list): list of Axis objects in which the data is plotted. """ if scores is None: # Compute separation scores for feature pairs. kwargs_scores = {'standardize_columns': True} kwargs_scores.update(kwargs) scores = pair_separation_scores(df, groupby=groupby, **kwargs_scores) columns = list(scores.columns) # Sort negative scores in ascending order to sort scores descendingly. values = -scores.values row_idx, col_idx = np.unravel_index(np.argsort(values, axis=None), values.shape) # For loop over subplot axis to plot each pair of features, coloured by groupby. if n == 1: axes = [plt.subplot(1, 1, 1)] else: nrows, ncols = subplot_rows_columns(n) axes = plt.subplots(nrows, ncols)[1].flatten() for row, col, ax in zip(row_idx, col_idx, axes): col_a = columns[row] col_b = columns[col] # Set axis and plot. plt.sca(ax) sns.scatterplot(x=col_a, y=col_b, data=df, hue=groupby) plt.title('score: {:.2f}'.format(scores.at[col_a, col_b]), position=(0.5, 0.9)) return axes
[docs]def pca(df, n_components, columns=None, standardize_columns=True, concatenate=True, **kwargs): """ Do a PCA on the columns of df. Args: df (pd.DataFrame): DataFrame containing the data. n_components (int): number of components for decomposition/projection. columns (list, optional): list of (numeric) columns that hold the features. If None, all numeric columns are used, except groupby. Defaults to None. standardize_columns (bool, optional): if True, first standardize the features. If False, do not standardize. Defaults to True. concatenate (bool, optional): if True, a new DataFrame consisting of the original df and the principal components is returned. If False, only a DataFrame with the principal components is returned. Defaults to True. **kwargs (optional): keyword arguments to pass to sklearn.decomposition.PCA(). Returns: principal_df (pd.DataFrame): dataframe with principal components. pca_ (sklearn.decomposition.pca.PCA): object containing info on the decomposition, see sklearn.decomposition.PCA(). """ if columns is None: # By default use all numeric columns. columns = list(df.select_dtypes(include='number').columns) elif not isinstance(columns, list): # Make sure columns is a list. columns = [columns] # Extract features from dataframe. df_x = df.loc[:, columns].copy() # Drop NaNs. df_x.dropna(how='any', inplace=True) if standardize_columns: standardize(df_x, inplace=True) x = df_x.values pca_ = PCA(n_components=n_components, **kwargs) principal_components = pca_.fit_transform(x) principal_df = pd.DataFrame(data=principal_components, index=df_x.index, columns=['principal_component_{}'.format(i+1) for i in range(n_components)]) if concatenate: principal_df = pd.concat([df, principal_df], axis=1) return principal_df, pca_
[docs]def plot_column_data(df, columns=None, groupby=None, hue=None, kind='box', sharex=True, sharey='none', all_axes=None, **kwargs): """ Create a figure and plot each specified column of a DataFrame in a subplot. Args: df (pd.DataFrame): pandas DataFrame of which to plot (specified) columns. columns (list, optional): list of (numeric) columns to plot. If None, all numeric columns are plotted. Defaults to None. groupby (str, optional): categorical column to split the data on (per subplot, a separate plot is drawn per group). hue (str, optional): categorical column to split the data on within a group, using colors. kind (str, optional): the type of plot to draw. Choose from 'strip', 'box', 'violin', 'swarm', 'line', 'dist'. Defaults to 'box'. sharex (str, optional): option to share the x-axis, see plt.subplots(). Defaults to True. sharey (str, optional): option to share the y-axis, see plt.subplots(). Defaults to 'none'. all_axes (list, optional): list with an aixs handle for each subplot/feature to plot. Must have at least the same length as `columns`. If None, a new figure with subplot axes is created. Defaults to None. **kwargs (optional): optional keyword arguments passed to the seaborn plot function. Returns: all_axes (list): list with an axis handle for each subplot. """ if columns is None: # By default plot all numeric columns. columns = list(df.select_dtypes(include='number').columns) elif not isinstance(columns, list): # Make sure columns is a list. columns = [columns] if groupby in columns: columns = columns[:] columns.remove(groupby) # Feature-wise boxplots. nrows, ncols = subplot_rows_columns(len(columns), minimize='rows') if all_axes is None: all_axes = plt.subplots(nrows=nrows, ncols=ncols, sharex=sharex, sharey=sharey)[1] all_axes = all_axes.flatten() if isinstance(all_axes, np.ndarray) else [all_axes] for i, col in enumerate(columns): # Set current axis. ax = all_axes[i] plt.sca(ax) plt.title(col) # Plot with the appropriate function. if kind == 'strip': sns.stripplot(x=groupby, y=col, hue=hue, data=df, **kwargs) elif kind == 'swarm': sns.swarmplot(x=groupby, y=col, hue=hue, data=df, **kwargs) elif kind == 'scatter': sns.scatterplot(x=groupby, y=col, hue=hue, data=df, **kwargs) elif kind == 'scatter_paired': scatter_paired(x=groupby, y=col, hue=hue, data=df, **kwargs) elif kind == 'box': sns.boxplot(x=groupby, y=col, hue=hue, data=df, **kwargs) elif kind == 'violin': sns.violinplot(x=groupby, y=col, hue=hue, data=df, **kwargs) elif kind == 'lineplot': sns.lineplot(x=groupby, y=col, hue=hue, data=df, **kwargs) elif kind == 'line': if hue is not None: unique_hue_values = df[hue].unique() for hue_val in unique_hue_values: df[df[hue] == hue_val].groupby(groupby).median().reset_index().plot( kind='line', x=groupby, y=col, ax=ax, label=hue_val, legend=False, **kwargs) plt.legend(title=hue) else: df.groupby(groupby).mean().reset_index().plot( kind='line', x=groupby, y=col, ax=ax, legend=False, **kwargs) # sns.lineplot(x=groupby, y=col, hue=hue, data=df, **kwargs) elif kind == 'dist': if groupby is None and hue is None: series = df[col].dropna() ax = sns.distplot(a=series, **kwargs) ax.axvline(series.mean(), linestyle='-', color='C1') ax.axvline(series.median(), linestyle='--', color='C2') else: raise NotImplementedError('`groupby` and `hue` argument should be None when using kind="dist". ' 'Use a violin plot instead to visualize distributions among groups.') else: raise ValueError("Invalid argument kind='{}'. Use one of {}.".format(kind, ['strip', 'box', 'violin', 'swarm', 'line', 'dist', 'scatter'])) width = 0.8 if groupby is not None and kind not in ['line', 'lineplot']: # Display the sample size of each subgroup in the tick labels. sample_sizes = df.groupby(groupby)[col].count().to_dict() locations, group_labels = plt.xticks() new_labels = [] new_locations = [] for idx_group, loc, lab in zip(range(len(locations)), locations, group_labels): text = lab.get_text() group = convert_string_auto(text) # object was converted to text in label, so need to convert back. if hue is not None: hue_values = [convert_string_auto(t.get_text()) for t in plt.gca().get_legend().get_texts()] new_locations.append(loc) new_labels.append(text) for idx_hue, hue_val in enumerate(hue_values): n = df[df[hue] == hue_val].groupby(groupby)[col].count()[group] new_locations.append(idx_group - width/2 + width/len(hue_values)*(idx_hue + 0.5)) new_labels.append('\nn={}'.format(n)) else: n = sample_sizes[group] new_locations.append(loc) new_labels.append('{}\nn={}'.format(text, n)) plt.xticks(new_locations, new_labels) plt.xlabel('') else: # Display the sample size in the title. sample_size = df[col].count() plt.title('{}\nn={}'.format(ax.get_title(), sample_size)) plt.ylabel('') # Remove empty axes in the subplot grid. for ax in all_axes[len(columns):]: plt.gcf().delaxes(ax) return all_axes
[docs]def regplots(df, x, columns=None, hue=None, axes=None): """ Plot regression plots for each feature in columns against feature x. Args: df (pd.DataFrame): DataFrame containing the data. x (str): name of the feature to compute the regression to. columns (list or str, optional): list of (numeric) columns to plot against `x`. Can also be a string to plot one column. If None, all numeric columns are plotted. Defaults to None. hue (str, optional): name of a (categorical) feature to control the coloring of the datapoints. If None, all datapoints are plotted in the same color. Defaults to None. axes (list, optional): list with Axes objects to plot in. If None, a new figure with subplots is created. Defaults to None. Returns: axes (list): the axes objects in which the plots are made. """ columns = _check_columns(columns, df) if axes is None: # Create subplot axes. nrows, ncols = subplot_rows_columns(len(columns), minimize='rows') axes = plt.subplots(nrows=nrows, ncols=ncols, sharex='all', sharey='none', tight_layout=True)[1] axes = axes.flatten() if isinstance(axes, np.ndarray) else [axes] # Add x to columns, x must be in df[columns]. if x not in columns: columns.append(x) # Compute correlations. df[columns] = df[columns].astype(float) df_cor = corrwith_pvalue(df[columns], x) # Loop over columns. for i, (ax, col) in enumerate(zip(axes, columns)): if x != col: df_i = df.dropna(subset=[x, col]) df_i[col] = df_i[col].astype(float) sns.regplot(data=df_i, x=x, y=col, scatter=True, color='0.3', ax=ax) legend = 'brief' if i == 0 else False sns.scatterplot(x=x, y=col, hue=hue, data=df_i, ax=ax, legend=legend) plt.sca(ax) plt.title('{}\n$\\rho={:.2f}$, p-value={:.4f}'.format(col, df_cor.at[col, 'correlation_coef'], df_cor.at[col, 'p-value'])) plt.title('{}\n$\\rho={:.2f}${}'.format(col, df_cor.at[col, 'correlation_coef'], make_stars(df_cor.at[col, 'p-value']))) plt.ylabel(col.split('_')[0]) # Set axis limits properly. y_values = df_i[col].values y_min = y_values.min() y_max = y_values.max() y_range = y_max - y_min dy = y_range / 5 plt.ylim([y_min - dy, y_max + dy]) for ax in axes[len(columns):]: ax.remove() # Adjust x range. x_values = df[x].dropna().values x_min = x_values.min() x_max = x_values.max() x_range = x_max - x_min dx = x_range/10 plt.xlim([x_min - dx, x_max + dx]) return axes
def scatterplots(df, x, columns=None, hue=None, axes=None): """ Plot scatter plots for each feature in columns against feature x. Args: df (pd.DataFrame): DataFrame containing the data. x (str): name of the feature to scatter with. columns (list or str, optional): list of (numeric) columns to plot against `x`. Can also be a string to plot one column. If None, all numeric columns are plotted. Defaults to None. hue (str, optional): name of a (categorical) feature to control the coloring of the datapoints. If None, all datapoints are plotted in the same color. Defaults to None. axes (list, optional): list with Axes objects to plot in. If None, a new figure with subplots is created. Defaults to None. Returns: axes (list): the axes objects in which the plots are made. """ columns = _check_columns(columns, df) if axes is None: # Create subplot axes. nrows, ncols = subplot_rows_columns(len(columns), minimize='rows') axes = plt.subplots(nrows=nrows, ncols=ncols, sharex='all', sharey='none')[1] axes = axes.flatten() if isinstance(axes, np.ndarray) else [axes] # Add x to columns, x must be in df[columns]. if x not in columns: columns.append(x) # Loop over columns. for i, (ax, col) in enumerate(zip(axes, columns)): if x != col: legend = 'brief' if i == 0 else False sns.scatterplot(x=x, y=col, hue=hue, data=df, ax=ax, legend=legend) plt.sca(ax) plt.title(col) plt.ylabel(col.split('_')[0]) # Set axis limits properly. y_values = df[col].dropna().values y_min = y_values.min() y_max = y_values.max() y_range = y_max - y_min dy = y_range / 5 plt.ylim([y_min - dy, y_max + dy]) # Adjust x range. x_values = df[x].dropna().values x_min = x_values.min() x_max = x_values.max() x_range = x_max - x_min dx = x_range/10 plt.xlim([x_min - dx, x_max + dx]) return axes
[docs]def scatter_paired(x, y, data, id, hue=None, style=None, **kwargs): """ Scatter plot of column `x` vs column `y` in data, connecting points for rows with same id. Args: x (str): name of the column in `data` for the x-axis. y (str): name of the column in `data` for the y-axis. data (pd.DataFrame): DataFrame containing the data. id (str): column name of the subject/patient/sample ID. hue (str, optional): name of the column for colouring of the scatter points. style (str, optional): name of the column for the style of the scatter points. kwargs (dict, optional): optional keyword arguments common for sns.scatterplot() and sns.lineplot(). """ sns.scatterplot(x=x, y=y, hue=hue, style=style, data=data, **kwargs) sns.lineplot(x=x, y=y, hue=hue, units=id, data=data, estimator=None, legend=False, **kwargs)
[docs]def boxplot_paired(x, y, data, id, hue=None, **kwargs): """ Boxplot of column `x` vs column `y` in data, connecting points for rows with same id. Args: x (str): name of the column in `data` for the x-axis. y (str): name of the column in `data` for the y-axis. data (pd.DataFrame): DataFrame containing the data. id (str): column name of the subject/patient/sample ID. hue (str, optional): name of the column for colouring of the boxes. kwargs (dict, optional): optional keyword arguments common for sns.boxplot() and sns.lineplot(). """ sns.boxplot(x=x, y=y, hue=hue, data=data, **kwargs) sns.lineplot(x=x, y=y, hue=hue, units=id, data=data, estimator=None, legend=False, **kwargs)
def separation_scores(df, groupby, columns=None, separation_metric='roc_auc', id=None): """ Compute a score per feature in `columns` for the binary classification task to predict `groupby`. `groupby` must be a categorical feature with 2 classes. Args: df (pd.DataFrame): DataFrame containing the data. groupby (str): categorical column in df that classifies the data as one of 2 classes. columns (list, optional): list of (numeric) columns that hold the features. If None, all numeric columns are used, except groupby. Defaults to None. separation_metric (str, optional): the separation metric to use. Choose from: 'roc_auc', 'fscore_carolina', 'mann_whitney_u', 'ranksums', 'opt_pooled_ttest', 'wilcoxon'. Defaults to 'roc_auc'. id (str, optional): column name of the subject/patient/sample ID. Required argument when separation_metric is 'opt_pooled_ttest'. Returns: (pd.Series): separation scores for each feature/column in descending order. Examples: >>> np.random.seed(0) >>> y = np.concatenate((np.zeros(10), np.ones(10))) >>> x = np.arange(len(y)) # Perfectly separates classes in y. >>> x_noisy = x + 10*np.random.rand(*x.shape) >>> x_random = np.random.rand(*x.shape) >>> df = pd.DataFrame({'y': y, 'x': x, 'x_noisy': x_noisy, 'x_random': x_random}) >>> scores = separation_scores(df, groupby='y', separation_metric='roc_auc') >>> print(scores) x 1.00 x_noisy 0.94 x_random 0.55 Name: roc_auc, dtype: float64 """ columns = _check_columns(columns, df) if separation_metric in ['opt_pooled_ttest', 'wilcoxon']: if id is not None: # Typically, id refers to a patient ID. pat_id = df[id].values else: raise ValueError('id not specified. id is required when separation_metric is "opt_pooled_ttest".') if groupby in columns: columns.remove(groupby) # Extract y values. y = df[groupby].values # Transform y to binary labels. lb = sklearn.preprocessing.LabelBinarizer() y = lb.fit_transform(y).squeeze() # Verify that the number of classes is 2. if len(lb.classes_) != 2: raise ValueError('Expected 2 classes. Instead got {} classes: {}. This function works for 2 classes only.' .format(len(lb.classes_), lb.classes_)) # For each feature (column) compute separation score. m = len(columns) scores_array = np.zeros(m) for i, col_i in enumerate(columns): x = df.loc[:, col_i].values if separation_metric == 'roc_auc': score = roc_auc_score(y_true=y, y_score=x, average='weighted') elif separation_metric == 'fscore_carolina': score = fscore_carolina(y_true=y, y_score=x) elif separation_metric in ['mann_whitney_u', 'ranksums', 'opt_pooled_ttest', 'wilcoxon']: if separation_metric == 'mann_whitney_u': p_value = mann_whitney_u(y_true=y, y_score=x, use_continuity=True, alternative='two-sided')[1] elif separation_metric == 'ranksums': p_value = ranksums(y_true=y, y_score=x)[1] elif separation_metric in ['opt_pooled_ttest', 'wilcoxon']: df_paired = extract_paired_data(y_true=y, y_score=x, id=pat_id) if separation_metric == 'opt_pooled_ttest': a = df_paired.iloc[:, 0].values b = df_paired.iloc[:, 1].values p_value = opt_pooled_ttest(a, b)[1] elif separation_metric == 'wilcoxon': # Drop incomplete measurements. df_paired.dropna(how='any', axis=0) a = df_paired.iloc[:, 0].values b = df_paired.iloc[:, 1].values p_value = wilcoxon(x=a, y=b)[1] # Compute 1 - p_value, such that low p-values have high scores. score = 1 - p_value else: raise ValueError('Invalid argument "{}" for separation_metric. See code for valid options.' .format(separation_metric)) scores_array[i] = score scores = pd.Series(data=scores_array, index=columns, name=separation_metric) return scores.sort_values(ascending=False)
[docs]def read_dataframe(filepath, **kwargs): """ Automatically detect the filetype and read the file into a DataFrame using pandas. Args: filepath (str): path to a file that can be read as a pandas DataFrame. Compatible file types/extensions are: csv xlsx **kwargs (optional): keyword arguments for the read function. Returns: df (pd.DataFrame): the content of the file as a DataFrame. """ extension = os.path.splitext(filepath)[1] if extension == '': raise ValueError('filepath "{}" is missing an extension. Cannot determine which reader to use.' .format(filepath)) elif extension == '.csv': df = pd.read_csv(filepath, **kwargs) elif extension == '.xlsx': df = pd.read_excel(filepath, **kwargs) else: raise NotImplementedError('Reading file with extension "{}" is not implemented.' .format(extension)) return df
def drop_unpaired(df, groupby, id, inplace=False): """ Remove samples (rows in the df) that do not have values for every group in the groupby column. Typically, the groups can be measurement moments, and the id can correspond to the patient ID. Args: df (pd.DataFrame): pandas DataFrame containing the data. groupby (str): name of a categrocial column in the df, specifying the groups. id (str): name of the column in the df that contains the ID of each subject/patient. inplace (bool): if True, modifies the DataFrame object inplace. If False, a copy of the df is create and the modified df copy is returned. Returns: df (pd.DataFrame): modified DataFrame object (if inplace is False). Examples: >>> np.random.seed(0) >>> df = pd.DataFrame(data=np.random.randint(low=0, high=50, size=(10, 3))) >>> df['pat_ID'] = [1, 1, 2, 2, 3, 4, 4, 5, 6, 7] >>> df['timing'] = ['pre', 'post', 'post', 'pre', 'post', 'pre', 'post', 'pre', 'post', 'pre'] >>> print(df) 0 1 2 pat_ID timing 0 44 47 0 1 pre 1 3 3 39 1 post 2 9 19 21 2 post 3 36 23 6 2 pre 4 24 24 12 3 post 5 1 38 39 4 pre 6 23 46 24 4 post 7 17 37 25 5 pre 8 13 8 9 6 post 9 20 16 5 7 pre >>> drop_unpaired(df, groupby='timing', id='pat_ID') 0 1 2 pat_ID timing 0 44 47 0 1 pre 1 3 3 39 1 post 2 9 19 21 2 post 3 36 23 6 2 pre 5 1 38 39 4 pre 6 23 46 24 4 post """ # Create a DataFrame that has the subjects on the rows and the groups on the columns, where the values are # True or False, indicating whether the subject has a value for each group. groups = df[groupby].values pat_ids = df[id].values df_paired = extract_paired_data(y_true=groups, y_score=np.ones(len(groups)), id=pat_ids).notnull() # Extract the subject IDs of the ones that do not occur in all groups. unpaired_ids = df_paired[df_paired.sum(axis=1) < df_paired.shape[1]].index.tolist() # Extract the index values in the df that correspond to the subjects that do not have all measurements unpaired_index = df[df[id].isin(unpaired_ids)].index.to_list() # Drop the unpaired rows. df = df.drop(unpaired_index, inplace=inplace) if not inplace: return df
[docs]def standardize(df, columns=None, inplace=False): """ Standardize (zero mean, unit standard deviation) columns in a DataFrame. See sklearn.preprocessing.StandardScaler() for additional info about the standardization. Args: df (pd.DataFrame): pandas DataFrame of which to standardize (specified) columns. columns (list or str, optional): list (or str) specifying the numeric column(s) to standardize. If None, all numeric columns are standardized. Defaults to None. inplace (bool, optional): if True, the values of the columns in df are replaced by the standardized values. If False, a new DataFrame is returned. Defaults to False. Returns: df_out (pd.DataFrame): DataFrame object with the standardized values (if inplace is False). """ if inplace: # Set option to ignore SettingWithCopyWarning. chained_ass = pd.options.mode.chained_assignment pd.options.mode.chained_assignment = None df_out = df else: df_out = df.copy() if columns is None: # By default plot all numeric columns. columns = list(df.select_dtypes(include='number').columns) elif isinstance(columns, str): # Convert to list. columns = [columns] # Get array of the column values. x = df.loc[:, columns].values # Standardizing the values. x = StandardScaler().fit_transform(x) # Replace the unstandardized columns with the standardized values. for idx, col in enumerate(columns): df_out.loc[:, col] = x[:, idx] if not inplace: return df_out else: # Rest the option. pd.options.mode.chained_assignment = chained_ass
def _check_columns(columns, df): """ Helper function to check the columns input for a given DataFrame, including default behaviour. Args: columns (str or list or None): either a single string of a column name, or a list of columns names, or None. If None, all numerical columns of df will be returned. Returns: columns (list): a list of column name(s). """ if columns is None: # By default use all numeric columns. columns = list(df.select_dtypes(include='number').columns) elif not isinstance(columns, list): # Make sure columns is a list. columns = [columns] # Return a copy, since lists are mutuable. return columns.copy() def extract_paired_data(y_true, y_score, id): """ Organize the data in y_true into a DataFrame where the rows correspond to a subject and the columns to a group. np.nan is used for missing values. Args: y_true (np.ndarray): array with values indicating the classes/groups (same shape as y_score and pat_id). y_score (np.ndarray): array with feature values to split into groups (same shape as y_true and pat_id). id (np.ndarray): array with values indicating the patient/subject/sample (same shape as y_true and y_score). Returns: df (pd.DataFrame): DataFrame containing feature values with the subjects/patients at the rows and the classes/groups at the columns. Examples: >>> y_true = np.array(['pre', 'post', 'pre', 'pre', 'post', 'post', 'pre']) >>> np.random.seed(0) >>> y_score = (y_true == 'post').astype(float) + np.random.rand(len(y_true))/2 >>> id = [1, 1, 2, 3, 4, 3, 5] >>> extract_paired_data(y_true, y_score, id) post pre 1 1.357595 0.274407 2 NaN 0.301382 3 1.322947 0.272442 4 1.211827 NaN 5 NaN 0.218794 """ # Unique IDs and groups (rows and columns). unique_ids = np.unique(id) unique_groups = np.unique(y_true) y_score = np.asarray(y_score) # Loop over groups and patients. data = np.zeros((len(unique_ids), len(unique_groups))) for j, group_j in enumerate(unique_groups): mask_group = y_true == group_j for i, id_i in enumerate(unique_ids): mask_pat = id == id_i value_ = y_score[np.logical_and(mask_pat, mask_group)] assert np.size(value_) <= 1 if len(value_) == 0: value = np.nan elif len(value_) == 1: value = value_[0] else: raise ValueError('patient {} has multiple values for group {}. value_ = {}' .format(id_i, group_j, value_)) data[i, j] = value df = pd.DataFrame(data=data, index=unique_ids, columns=unique_groups) return df def corrbarplot(x, y_columns, data, control_var=None, corr_method='pearson', plot_abs=True, horizontal=True, order=None, sort=True, palette=None, color_range=None, annotate=1, add_colorbar=True, ax=None): """ Plot barchart with correlation values of `x` with `y_columns`. """ df_corr = corrwith_pvalue(data[y_columns], other=data[x], control_var=data[control_var] if control_var is not None else None, method=corr_method) if ax is None: ax = plt.gca() # Range of values that will be mapped to the palette, i.e. min and max possible correlation if color_range is None: max_r = np.round(df_corr['correlation_coef'].abs().max(), 2) color_min, color_max = -max_r, max_r else: color_min, color_max = color_range if palette is None: palette = sns.diverging_palette(20, 220, n=256) # Assign color (which should indicate the sign). df_corr['color'] = df_corr['correlation_coef'].apply( lambda v: value_to_color(v, color_min=color_min, color_max=color_max, palette=palette)) # Take absolute value. if plot_abs: df_corr['correlation_coef'] = df_corr['correlation_coef'].abs() # Sort on correlation coefficient. if order is not None: df_corr = df_corr.loc[order, :] if sort: msg = '\nIgnoring `sort`=True when `order` is specified.' warnings.warn(msg) elif sort: df_corr = df_corr.sort_values('correlation_coef', ascending=horizontal) # Plot bars. bar_kwargs = dict(color=df_corr['color']) if horizontal: ax.barh(df_corr.index, df_corr['correlation_coef'], **bar_kwargs) else: ax.bar(df_corr.index, df_corr['correlation_coef'], **bar_kwargs) # Add text (asterisks and pvalues). if annotate: for idx, (_, row) in enumerate(df_corr.iterrows()): rval = row['correlation_coef'] pval = row['p-value'] # Plot stars (as text). stars = get_pstars(pval) if horizontal: ax.text(rval, idx, stars, va='center', ha='left' if rval >= 0 else 'right') else: ax.text(idx, rval, stars, ha='center', va='bottom' if rval >= 0 else 'top') if annotate > 1: ptext = '\np={:.2g} '.format(pval) if horizontal: ax.text(rval, idx, ptext, va='center', ha='right' if rval >= 0 else 'left') else: ax.text(idx, rval, ptext, ha='center', va='top' if rval >= 0 else 'bottom', rotation=90) if not horizontal: # Rotate labels on x-axis for better readability. plt.xticks(rotation=45, horizontalalignment='right') # Add axis label. if plot_abs: label = f'Absolute {corr_method.capitalize()} r (-)' else: label = f'{corr_method.capitalize()} r (-)' if horizontal: ax.set_xlabel(label) else: ax.set_ylabel(label) ax.set_title(f'Correlation with {x}') # Add color legend on the right side of the plot if add_colorbar and (color_min < color_max): divider = make_axes_locatable(ax) cax = divider.append_axes('right', size='5%', pad=0.05) col_x = [0] * len(palette) # Fixed x coordinate for the bars bar_y = np.linspace(color_min, color_max, len(palette)) # y coordinates for each of the n_colors bars bar_height = bar_y[1] - bar_y[0] cax.barh( y=bar_y, width=[5] * len(palette), # Make bars 5 units wide left=col_x, # Make bars start at 0 height=bar_height, color=palette, linewidth=0 ) cax.set_xlim(1, 2) # Bars are going from 0 to 5, so lets crop the plot somewhere in the middle cax.grid(False) # Hide grid cax.set_facecolor('white') # Make background white cax.set_xticks([]) # Remove horizontal ticks cax.set_yticks(np.linspace(min(bar_y), max(bar_y), 3)) # Show vertical ticks for min, middle and max cax.yaxis.tick_right() # Show vertical ticks on the right cax.spines['bottom'].set_visible(False) cax.spines['left'].set_visible(False) return df_corr def corrplot(data, x_columns=None, y_columns=None, control_var=None, corr_method='pearson', add_annotations=False, ax=None, **kwargs): """ Correlation plot of `x_columns` vs `y_columns`. Args: data (pd.DataFrame, np.ndarray): dataframe or arrays with columns. x_columns (list): list with columns names for x. If None, takes all numeric columns in data. y_columns (list): list with columns names for y. If None, `x_columns` will be used. control_var (str): variable to control for. If specified, computes partial correlation values. corr_method (str, optional): which correlation function to use ('pearson', 'spearman', 'kendall'). add_annotations (int): whether to annotate the plot. Options: 0: no annotations. 1: write asterisks to indicate significance. 2: write correlation values. ax (plt.Axes): axes to plot in. **kwargs: for plot_heatmap Returns: df_corr (pd.DataFrame): dataframe with correlation values. Columns correspond to `x_columns` and indices to `y_columns`. df_pval (pd.DataFrame): dataframe with p-values corresponding to the correlations (same shape as `df_corr`). See Also: compute_correlation_matrix() """ # Compute correlation matrix. df_corr, df_pval = compute_correlation_matrix( data=data, x_columns=x_columns, y_columns=y_columns, control_var=control_var, corr_method=corr_method) # Prepare annotations for pvalues. if add_annotations == 1: annotations = df_pval.applymap(get_pstars) elif add_annotations == 2: annotations = df_corr.applymap(lambda r: f'{r:.2f}') else: annotations = None if ax is None: ax = plt.gca() plot_heatmap(df_corr, annotations=annotations, ax=ax, **kwargs) ax.set_title(f'{corr_method.capitalize()} correlations') return df_corr, df_pval def get_pstars(pval): star_char = u"\u2217" # star_char = u"\u002A" if pval < 0.01: stars = star_char * 2 elif pval < 0.05: stars = star_char * 1 else: stars = star_char * 0 return stars class AnalyzerDf(object): """ Helper interface for plotting some relationships in a DataFrame. Args: df (pd.DataFrame): dataframe with one observation per row and features, outcomes, etc. as columns. x_columns (list, optional): optional list with default columns in df containing continuous features. y_columns (list, optional): optional list with default columns in df containing continuous outputs/targets. corr_method (str, optional): which default correlation function to use ('pearson', 'spearman', 'kendall'). """ def __init__(self, df, x_columns=None, y_columns=None, control_var=None, corr_method='pearson'): self.df = df numeric_columns = df.select_dtypes(include=np.number).columns.tolist() if x_columns is None: x_columns = numeric_columns if y_columns is None: y_columns = numeric_columns self.x_columns = x_columns self.y_columns = y_columns self.control_var = control_var self.corr_method = corr_method def corrbarplot(self, x, y_columns=None, control_var=None, corr_method=None, df=None, **kwargs): """ Plot barchart with correlation values of `columns` with `x`. See Also: corrbarplot() """ if df is None: df = self.df if y_columns is None: y_columns = self.y_columns if control_var is None: control_var = self.control_var if corr_method is None: corr_method = self.corr_method return corrbarplot(x=x, y_columns=y_columns, data=df, control_var=control_var, corr_method=corr_method, **kwargs) def corrplot(self, x_columns=None, y_columns=None, df=None, control_var=None, corr_method=None, add_annotations=False, ax=None, **kwargs): if df is None: df = self.df if x_columns is None: x_columns = self.x_columns if y_columns is None: y_columns = self.y_columns if control_var is None: control_var = self.control_var if corr_method is None: corr_method = self.corr_method df_corr, df_pval = corrplot( data=df, x_columns=x_columns, y_columns=y_columns, control_var=control_var, corr_method=corr_method, add_annotations=add_annotations, ax=ax, **kwargs) return df_corr, df_pval def regplot(self, x, y, control_var=None, df=None, corr_method=None, ax=None): """ Simple regression plot for x and y (with correlation in the title). """ if df is None: df = self.df if control_var is None: control_var = self.control_var if corr_method is None: corr_method = self.corr_method if ax is None: ax = plt.gca() sns.regplot( x=x, y=y, data=df, ax=ax) # Compute correlation and put it in the title. if control_var is None: a, b = df[[x, y]].dropna().values.T if corr_method == 'pearson': corrfun = pearsonr elif corr_method == 'spearman': corrfun = spearmanr elif corr_method == 'kendall': corrfun = kendalltau else: raise ValueError(f'Invalid option corr_method="{corr_method}".') stat, pval = corrfun(a, b) else: a, b, c = df[[x, y, control_var]].dropna().values.T stat, pval = partial_corr(a, b, c, method=corr_method) ax.set_title(f'r={stat:.2g}\n(p={pval:.1g})')#, n={len(a)})')