"""
Author: Tim Hermans (tim-hermans@hotmail.com).
"""
import warnings
import numpy as np
from pycwt.wavelet import _check_parameter_wavelet
from nnsa.cwt.config import DEFAULT_WAVELET
from nnsa.cwt.transforms import cwt
from nnsa.preprocessing.detrending import detrend_poly
from nnsa.utils.event_detections import time_threshold
__all__ = [
'autoscale_rec',
'compute_coi',
'compute_moi',
'compute_power_cwt',
'compute_reconstruction',
'cwt_power',
'cwt_total_power',
'reconstruct_x',
]
[docs]def autoscale_rec(x_rec, x):
# Find scaling factor that ensures the lowest (least-squares) error
# between original and fully reconstructed signal.
a = np.nansum(x*x_rec) / np.nansum(x_rec**2) # Solution of min_a{sum((x - a*x_rec)**2)}.
return a*x_rec
[docs]def compute_coi(n0, dt, freqs, wavelet):
"""
Compute the cone of influence (COI).
The COI is a mask with True values at locations where the wavelet transform
is significantly influenced by artefacts (hence these regions should not be
included in the interpretation of the result).
Args:
n0 (int): number of time points of the transform.
dt (float): sampling period (1/fs).
freqs (np.ndarray): frequency array of the transform.
wavelet (Wavelet): wavelet object.
Returns:
insidecoi (np.ndarray): boolean array with shape (n0, len(freqs)).
Contains True at locations inside the COI, False otherwise.
"""
coi = (n0 / 2 - np.abs(np.arange(0, n0) - (n0 - 1) / 2))
coi = wavelet.flambda() * wavelet.coi() * dt * coi
period = np.ones([1, n0]) / freqs[:, None]
coi = np.ones([len(freqs), 1]) * coi[None, :]
insidecoi = (period > coi)
return insidecoi
[docs]def compute_moi(nan_mask, scales, dt, dj, wavelet,
threshold=0.07864960, ignore_short_nans=True):
"""
Compute the mask of influence (MOI) in the wavelet (time-frequency) domain, given the nan mask in time domain.
Where the cone of influence (COI) only considers edge effects, the mask of influence (MOI) extends this idea to
edge effects and nan effects (e.g. due to missing samples in the time series prior to a wavelet transform).
Args:
nan_mask (np.ndarray): 1D boolean mask for time dimension that is True at locations of
missing values/artefacts, i.e. at values where the input signal cannot be trusted.
scales, dt, dj, wavelet: parameters used by the wavelet decomposition.
threshold (float or None): the relative contribution of nan samples to the wavelet transform.
By default, is taken as the area under the Gaussian after the e-folding time
(equivalent to the definition of cone of influence for a Morlet wavelet according to Torrence et al. 1998).
If None, does not apply a threshold and the result of convolution is returned.
ignore_short_nans (bool): if True, nans with a length shorter than the Nyquist period are ignored.
This is determined per wavelet scale.
Returns:
moi (np.ndarray): boolean array same size as `C` containing True at locations where edge effects or missing
data (nan) effects cannot be ignored.
"""
# Create a mask in the time-frequency domain.
shape = (len(scales), len(nan_mask))
mask_tf = np.zeros(shape, int)
mask_tf[:, nan_mask] = 1
# Pad with ones at the edges.
npad = int(np.sqrt(2) * 1.1 * scales[-1] / dt) # Approx the support width of the largest wavelet.
mask_tf = np.hstack([np.ones((mask_tf.shape[0], npad), int),
mask_tf,
np.ones((mask_tf.shape[0], npad), int)])
moi = np.zeros(shape, bool if threshold is not None else float)
power_cutoff = 0.5 # Power drop where the cutoff frequency is defined (e.g. 0.999, 0.5 (=-3dB) or e^-2).
for i, (s, mask_i) in enumerate(zip(scales, mask_tf)):
# Remove nans if the length is shorter than the equivalent Fourier period of each scale.
if ignore_short_nans:
# High cutoff frequency of wavelet bandpass filter: freq where Morlet wavelet power equals `power_cutoff`.
# Result of setting (ψ∧0(sω))^2 in table 1 of Torrence and Compo 1998 to power_cutoff without normalization
# constant and solving for omega.
a = s ** 2
b = -2 * s * wavelet.f0
c = wavelet.f0 ** 2 + np.log(power_cutoff)
D = b ** 2 - 4 * a * c
fc = (-b + np.sqrt(D)) / (2 * a) / 2 / np.pi
# Maximum sampling period according to Nyquist.
T_max = 1 / (fc * 2) # fc * 2 is the minimum sampling rate for signal with maximum frequency fc.
max_nan_length = T_max / dt
# Remove short nans.
mask_i = time_threshold(mask_i, min_duration=max_nan_length+1)
# Convolve with Gaussian (do it per scale to not overload memory),
mask_i = mask_i.reshape(1, -1)
conv = wavelet.smooth(mask_i, dt, dj, scales[i:i + 1], time=True, scale=False,
scale_window=1, time_window=1)
# Remove padded zeros.
conv = conv[:, npad:-npad]
if threshold is not None:
# Where convolution if greater than the threshold, there are significant effects.
moi[i] = conv >= threshold
else:
moi[i] = conv
return moi
[docs]def compute_power_cwt(x, dt, freq_low=None, freq_high=None, dj=1/10, wavelet=None,
max_nan=0.5, **kwargs):
"""
Compute power in specified band.
For values in the cone of influence (COI), time averaged values are taken.
If at a given point in time, more than `max_nan`*100 % frequencies are in the COI,
nan is returned at that time point.
Args:
x (np.ndarray): 1D signal array.
dt (float): sample period of x (1/fs).
freq_low (float, optional): lower frequency cutoff for bandpower. If None, takes lowest possible.
freq_high (float, optional): upper frequency cutoff for bandpower. If None, takes highest possible.
dj (float, optional): scale spacing (in log-space).
wavelet (mothers.Mother, optional): wavelet to use. Defaults to DEFAULT_WAVELET.
max_nan (float, optional): maximum fraction of frequencies in the specified bands that can be nan
at a given time (e.g. due to COI). If the number of nans if greater, nan is returned at that time point.
If None, does not insert any nans in the output.
**kwargs (optional): optional keyword arguments for cwt().
Returns:
power (np.ndarray): array with same size as `x` containing the wavelet power in the specified band.
"""
if wavelet is None:
wavelet = DEFAULT_WAVELET
# Determine limits of the wavelet scales.
if kwargs.get('check_reconstruction', False) or kwargs.get('auto_detrend', False):
# We need the entire wavelet transform.
s0 = -1
J = -1
else:
if freq_high is not None:
s0 = wavelet.freq2scal(freq_high)
else:
s0 = wavelet.get_min_scale(dt)
if freq_low is not None:
J = wavelet.get_J(s0, dj, min_freq=freq_low)
else:
J = -1
# Do CWT.
W, scales, freqs, insidecoi = cwt(
x=x, dt=dt, dj=dj, s0=s0, J=J, wavelet=wavelet, **kwargs)
# Create frequency mask.
if freq_low is None:
freq_low = 0
if freq_high is None:
freq_high = np.inf
freq_mask = np.logical_and(freqs >= freq_low, freqs < freq_high)
# Apply mask.
if not np.all(freq_mask):
W = W[freq_mask]
scales = scales[freq_mask]
freqs = freqs[freq_mask]
insidecoi = insidecoi[freq_mask]
# Compute power of freq band (multiply by 1/s to remove bias of low frequencies (Liu et al. 2007)).
W = (np.abs(W)**2)/scales.reshape(-1, 1)
# Replace coi with NaN.
W[insidecoi] = np.nan
# Replace samples in COI with time average.
# I expect to see "RuntimeWarning: Mean of empty slice" in this block
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
W_mean = np.nanmean(W, axis=-1, keepdims=True)
W[insidecoi] = (W_mean * insidecoi)[insidecoi]
# Average power across computed frequencies.
power = np.nanmean(W, axis=0)
# Replace power by nan is too many values fell inside the COI.
if max_nan is not None:
nan_frac = insidecoi.mean(axis=0)
power[nan_frac > max_nan] = np.nan
return power
[docs]def compute_reconstruction(Wx, scales, x, **cwt_kwargs):
"""
For backward compatibility.
"""
x_rec, error = reconstruct_x(Wx, x, scales, cwt_kwargs)
return x_rec
[docs]def cwt_power(W, scales, dt, dj, s_min=None, s_max=None, wavelet=None, cdelta=None):
"""
Averages the instantaneous power in a specific scale band (s_min, s_max).
Args:
W (np.ndarray): 2D array containing the coefficients of the wavelet transform.
scales (np.ndarray): array with scales.
dt, dj, wavelet: parameters of the wavelet transform.
cdelta (float): reconstruction factor. Scaling factor for the power
(is a contsant across all scales, just scales the output).
If not given, takes the empirically defined cdelta from the wavelet (if exists).
Alternatively, specify 1 if you don't care about scaling.
s_min (float): minimum scale to include in the band. Defaults to the lowest scale.
s_max (float): max scale in the band. Defaults to largest scale.
Returns:
pow (np.ndarray):
"""
if cdelta is None:
wavelet = _check_parameter_wavelet(wavelet)
cdelta = wavelet.cdelta
# Defaults.
if s_min is None:
s_min = scales[0]
if s_max is None:
s_max = scales[-1]
# Mask W in the scale region.
mask = np.logical_and(scales >= s_min, scales <= s_max)
W = W[mask, :]
scales = scales[mask]
# As of Torrence and Compo (1998), eq. (24)
Wp = np.abs(W)**2
pow = dj * dt / (cdelta) \
* (Wp / scales.reshape(-1, 1)).sum(axis=0)
return pow
[docs]def cwt_total_power(W, scales, dt, dj, wavelet=None, cdelta=None):
"""
Total power over all scales, averaged over time.
Args:
see cwt_power().
"""
# As of Torrence and Compo (1998), eq. (14)
pow = np.mean(cwt_power(W, scales, dt, dj, wavelet=wavelet, cdelta=cdelta))
return pow
[docs]def reconstruct_x(Wx, x, scales, cwt_kwargs):
"""
Helper function to reconstuct x from its CWT.
"""
from nnsa.cwt.transforms import icwt
# Extract options used for CWT.
detrend_order = cwt_kwargs['detrend_order']
normalize = cwt_kwargs['normalize']
remove_outliers = cwt_kwargs['remove_outliers']
dt = cwt_kwargs['dt']
dj = cwt_kwargs['dj']
wavelet = cwt_kwargs['wavelet']
# Preprocess: detrend and normalize.
x = np.asarray(x)
x_old = x.copy()
if detrend_order is not None:
x = detrend_poly(x, order=detrend_order)
poly = x_old - x # For reconstruction.
else:
poly = 0
x_mean = np.nanmean(x)
if normalize:
x_std = np.nanstd(x)
if remove_outliers:
std_cut_off = 5
x_n = x[np.logical_and(x >= (x_mean - std_cut_off * x_std), x <= (x_mean + std_cut_off * x_std))]
x_mean = np.nanmean(x_n)
x_std = np.nanstd(x_n)
x = (x - x_mean) / x_std
else:
x_std = None
x_rec = icwt(Wx, scales, dt, dj, wavelet)
x_rec = autoscale_rec(x_rec, x) # Find best scaling.
# Undo normalization and detrending.
if x_std is not None:
x_rec *= x_std
x_rec += x_mean # Always add mean back.
x_rec += poly # Add back polynomial.
error = np.nanmean((x_old - x_rec) ** 2) / np.nanvar(x_old) * 100
return x_rec, error