import numpy as np
__all__ = [
'delay_correlation',
]
[docs]def delay_correlation(x, y, delays):
"""
Compute the correlation between signals x and y, for specifc delays of x wrt y.
Does not include zero-padding, instead cuts off a piece of the signals to apply the delay.
For each delay, the same part of x is used.
Ignores nan values.
Args:
x (array-like): 1D array.
y (array-like): 1D array.
delays (array-like): 1D array with delays to apply.
Returns:
corr (np.ndarray): correlation values corresponding to `delays`.
Examples:
>>> t = np.linspace(6, 100)
>>> x = np.sin(t)
>>> y = np.sin(t)
>>> delays = np.arange(-15, 8)
>>> corr = delay_correlation(x, y, delays)
>>> best_delay = delays[np.argmax(corr)]
>>> print(best_delay)
0
"""
x = np.asarray(x).squeeze()
y = np.asarray(y).squeeze()
if x.ndim != 1 or y.ndim != 1:
ValueError('Both `x` and `y` must be 1-dimensional.')
# y should be larger than x in this code.
if len(y) < len(x):
raise ValueError('`y` should be at least as long as `x`.')
delays = delays.astype(int)
isort = np.argsort(delays)
min_delay = delays[isort[0]]
max_delay = delays[isort[-1]]
# Take subsegment of x that will be shifted over y.
if min_delay < 0:
x = x[-min_delay:]
if max_delay > 0:
x = x[:-max_delay]
# Precompute.
nx = len(x)
x = x - np.nanmean(x)
std_x = np.nanstd(x)
# Compute correlation for each delay.
corr = np.zeros((len(delays)))
for i, d in enumerate(delays):
y_seg = y[d - min_delay: d - min_delay + nx]
cov = np.nanmean((y_seg - np.nanmean(y_seg)) * x)
rho = cov / (std_x * np.nanstd(y_seg))
corr[i] = rho
return corr