"""
Author: Tim Hermans (tim-hermans@hotmail.com).
"""
import warnings
import pandas as pd
import pyprind
from scipy import stats
from scipy.stats import pearsonr, spearmanr, kendalltau, mannwhitneyu
from scipy.stats._stats_py import SignificanceResult
from nnsa.utils.arrays import mirror_boundaries, remove_nans
from nnsa.utils.segmentation import segment_generator
import numpy as np
import statsmodels.formula.api as smf
__all__ = [
'compute_mad',
]
[docs]def compute_mad(x, k=1.4826, n=None, axis=-1, keepdims=False):
"""
Compute the (running) median absolute deviation.
Ignores nan values.
Args:
x (np.ndarray): input array.
k (float): scale factor for MAD. Defaults to 1.4826 to approximate SD of normal distribution.
See also: https://en.wikipedia.org/wiki/Median_absolute_deviation.
n (int): Deprecated. Do not use. This is only here to provide Deprecation warnings.
axis (int): axis along which to compute the MAD.
keepdims (bool): whether to keep dims or not (see np.median()).
Returns:
mad (float or np.ndarray): MAD value(s) along the specified axis.
"""
if n is not None:
deprc_msg = 'Use moving_mad() instead of compute_mad()[0].'
raise DeprecationWarning(deprc_msg)
# To array.
x = np.asarray(x)
# Median.
med = np.nanmedian(x, axis=axis, keepdims=True)
# Median absolute deviation.
mad = k * np.nanmedian(np.abs(x - med), axis=axis, keepdims=keepdims)
return mad
def pairwise_multiple_regression(x_columns, y_columns, df, formula='{y}~{x}', verbose=1):
"""
Compute R2-adjusted scores and p-values for pairwise y~x regressions.
Args:
x_columns (list): list of columns to use for x-variables (one at a time).
y_columns (list): list of columns to use for y-variables (one at a time).
df (pd.DataFrame): dataframe with the data.
formula (str): formula in which to insert the x and y variables.
Indicate where the variables should be placed using {x} and {y}, e.g. {y} ~ {x} + covariate_column
verbose (int): verbosity level (show a progress bar (1) or not (0)).
Returns:
df_results (pd.DataFrame): dataframe with complex values, where the real part is the R2-adjusted score,
and the imaginary part the p-value for the contribution of {x} on the model for {y}.
"""
# Check inputs.
if '{x}' not in formula:
raise ValueError('"{x}" missing in formula.')
if '{y}' not in formula:
raise ValueError('"{y}" missing in formula.')
# Loop over x and y columns.
output = np.zeros((len(x_columns), len(y_columns)), np.complex)
bar = pyprind.ProgBar(len(x_columns) * len(y_columns))
for ix, x in enumerate(x_columns):
for iy, y in enumerate(y_columns):
# Create formula.
formula_i = formula.format(x=x, y=y)
# Fit multiple linear regression.
model = smf.ols(formula=formula_i, data=df, missing='drop')
results = model.fit()
# Score and p-value.
score = results.rsquared_adj
pvalue = results.pvalues[x]
# Save in output (pvalue as imaginary part).
output[ix, iy] = score + 1j * pvalue
if verbose:
bar.update()
df_results = pd.DataFrame(data=output, index=x_columns, columns=y_columns)
return df_results
def partial_corr(x, y, z, method='pearson'):
"""
Calculate the partial correlation coefficient and its p-value
between x and y, controlling for the effects of z.
Args:
x : array-like, shape (n_samples,)
First variable of interest.
y : array-like, shape (n_samples,)
Second variable of interest.
z : array-like, shape (n_samples,)
Control variable.
method : str, optional
Correlation method to use: 'pearson', 'spearman', or 'kendall'.
Returns:
res : SignificanceResult containing the partial correlation coefficient (statistic)
between x and y, controlling for z, and its two-tailed pvalue.
Examples:
# Create x and y with confounfing z.
>>> rng = np.random.RandomState(43)
>>> z = rng.random(100)
>>> x = z + 0.2*rng.random(100)
>>> y = z + 0.2*rng.random(100)
>>> pearsonr(x, y)
PearsonRResult(statistic=0.9617349300921725, pvalue=6.584320485068616e-57)
>>> spearmanr(x, y)
SignificanceResult(statistic=0.9613921392139213, pvalue=1.0109548429778636e-56)
>>> partial_corr(x, y, z, method='pearson')
SignificanceResult(statistic=0.04490137980266458, pvalue=0.6589872916398769)
>>> partial_corr(x, y, z, method='spearman')
SignificanceResult(statistic=0.12897318724509949, pvalue=0.20326996612845738)
"""
x = np.asarray(x)
y = np.asarray(y)
z = np.asarray(z)
method = method.lower()
# Force z to be 2D.
if x.ndim != 1 or y.ndim != 1 or z.ndim != 1:
raise ValueError('All input arrays (x, y, z) must be 1-dimensional.')
# Calculate correlation matrices.
if method == 'pearson':
r_XY = np.corrcoef(x, y)[0, 1]
r_XZ = np.corrcoef(x, z)[0, 1]
r_YZ = np.corrcoef(y, z)[0, 1]
elif method == 'spearman':
r_XY = spearmanr(x, y)[0]
r_XZ = spearmanr(x, z)[0]
r_YZ = spearmanr(y, z)[0]
elif method == 'kendall':
r_XY = kendalltau(x, y)[0]
r_XZ = kendalltau(x, z)[0]
r_YZ = kendalltau(y, z)[0]
else:
raise ValueError(f"Invalid method '{method}'. Choose 'pearson', 'spearman', or 'kendall'.")
# Compute partial correlation coefficients.
r_XY_Z = (r_XY - r_XZ * r_YZ) / np.sqrt((1 - r_XZ ** 2) * (1 - r_YZ ** 2))
# Compute degrees of freedom.
n_samples = x.shape[0]
n_controls = 1
df = n_samples - n_controls - 2
# Compute p-value.
t = r_XY_Z * np.sqrt(df / (1 - r_XY_Z ** 2))
p = 2 * (1 - stats.t.cdf(abs(t), df))
# Named tuple, like the result of stats.spearmanr.
res = SignificanceResult(r_XY_Z, p)
return res
def test_groups_pairwise(x, group, data=None, test_fun=mannwhitneyu):
"""
Do pairwise test between unique groups.
"""
# Test between each group.
if data is not None and isinstance(x, str):
x = data[x]
if data is not None and isinstance(group, str):
group = data[group]
x = np.asarray(x)
group = np.asarray(group)
nan_mask = np.isnan(x)
x = x[~nan_mask]
group = group[~nan_mask]
unique_groups = np.unique(group)
stat_matrix = np.full((len(unique_groups), len(unique_groups)), fill_value=np.nan)
pval_matrix = np.full_like(stat_matrix, fill_value=np.nan)
for ii, group_i in enumerate(unique_groups):
mask_i = group == group_i
a_i = x[mask_i]
for jj, group_j in enumerate(unique_groups):
if jj <= ii:
continue
mask_j = group == group_j
b_j = x[mask_j]
stat, pval = test_fun(a_i, b_j)
stat_matrix[ii, jj] = stat
pval_matrix[ii, jj] = pval
# Symmetric.
stat_matrix[jj, ii] = stat
pval_matrix[jj, ii] = pval
df_stat = pd.DataFrame(stat_matrix, columns=unique_groups, index=unique_groups)
df_pval = pd.DataFrame(pval_matrix, columns=unique_groups, index=unique_groups)
return df_stat, df_pval