import psutil
import pycwt
import numpy as np
from pycwt.helpers import rect, fft, fft_kwargs
from scipy.signal import convolve2d
__all__ = [
'Mother',
'Morlet',
'Paul',
'get_wavelet',
]
[docs]class Mother(object):
[docs] def flambda(self):
raise NotImplementedError
[docs] def scal2freq(self, s):
f = 1 / (s * self.flambda())
return f
[docs] def freq2scal(self, f):
s = 1 / (f * self.flambda())
return s
[docs] def get_J(self, s0, dj, min_freq=None):
if s0 <= 0:
raise ValueError('Minimum scale s0 should be positive.')
if min_freq == 0:
J = -1
else:
# Determines min frequency together with s0 and ds.
J = np.floor(np.log2(1 / (min_freq * s0 * self.flambda())) / dj)
return int(J)
[docs] @staticmethod
def get_dj(max_freq, min_freq, J):
dj = np.log2(max_freq / min_freq) / J
return dj
[docs] def smooth(self, *args, **kwargs):
raise NotImplementedError
[docs] def coi(self, *args, **kwargs):
raise NotImplementedError
[docs] def get_min_freq(self, dt, n0):
# Return min frequency still outside the COI.
fs = 1/dt
return fs / (self.flambda() * self.coi() * (n0 - 1) / 2)
[docs] def get_max_scale(self, dt, n0):
# Return min frequency still outside the COI.
return self.freq2scal(self.get_min_freq(dt, n0))
[docs] def get_min_scale(self, dt):
# Return minimum scale (Nyqvist).
fs = 1/dt
fmax = fs/2
return self.freq2scal(fmax)
[docs] def get_s0(self, dt):
# Alias for self.get_min_scale().
return self.get_min_scale(dt)
[docs]class Morlet(pycwt.Morlet, Mother):
"""Implements the Morlet wavelet class.
Note that the input parameters f and f0 are angular frequencies.
f0 should be more than 0.8 for this function to be correct, its
default value is f0 = 6.
"""
def __repr__(self):
return '{}({})'.format(self.__class__.__name__, self.f0)
[docs] def psi_ft(self, f):
"""Fourier transform of the approximate Morlet wavelet."""
return (np.pi ** -0.25) * np.exp(-0.5 * (f - self.f0) ** 2) * (f > 0)
def _set_f0(self, f0):
# Sets the Morlet wave number, the degrees of freedom and the
# empirically derived factors for the wavelet bases C_{\delta},
# \gamma, \delta j_0 (Torrence and Compo, 1998, Table 2)
self.f0 = f0 # Wave number
self.dofmin = 2 # Minimum degrees of freedom
if self.f0 == 6:
self.cdelta = 0.776 # Reconstruction factor
self.gamma = 2.32 # Decorrelation factor for time averaging
self.deltaj0 = 0.60 # Factor for scale averaging
else:
self.cdelta = 1
self.gamma = 1
self.deltaj0 = 1
if self.f0 == 5:
# http://mark-bishop.net/signals/CWTReconstructionFactors.pdf
self.cdelta = 0.9484
elif self.f0 == 7:
self.cdelta = 0.6616
elif self.f0 == 8:
self.cdelta = 0.5758
elif self.f0 == 10:
self.cdelta = 0.4579
elif self.f0 == 20:
self.cdelta = 0.2272
[docs] def smooth(self, W, dt, dj, scales, time=True,
scale=True, scale_window=1, time_window=1):
"""Smoothing function used in coherence analysis.
The scale_window is a dimensionless parameters that scales the default window for scale averaging.
Idem for time_window.
"""
# The smoothing is performed by using a filter given by the absolute
# value of the wavelet function at each scale, normalized to have a
# total weight of unity, according to suggestions by Torrence &
# Webster (1999) and by Grinsted et al. (2004).
isreal = np.isreal(W).all()
m, n = W.shape
# Filter in time.
if time:
k = 2 * np.pi * fft.fftfreq(fft_kwargs(W[0, :])['n'])
k2 = k ** 2
snorm = scales / dt * time_window
# Smoothing by Gaussian window (absolute value of wavelet function)
# using the convolution theorem: multiplication by Gaussian curve in
# Fourier domain for each scale, outer product of scale and frequency
F = np.exp(-0.5 * (snorm[:, np.newaxis] ** 2) * k2) # Outer product
W = fft.ifft(F * fft.fft(W, axis=1, **fft_kwargs(W[0, :])),
axis=1, # Along Fourier frequencies
**fft_kwargs(W[0, :], overwrite_x=True))
W = W[:, :n] # Remove possibly padded region due to FFT
if isreal:
W = W.real
# Filter in scale.
if scale:
wsize = np.int(np.round(1/dj * scale_window)) # Take the number of scales in a octave (like Matlab).
wsize = max([1, wsize])
win = np.ones(wsize)/wsize
W = convolve2d(W, win[:, np.newaxis], 'same') # Scales are "vertical"
return W
[docs]class Paul(pycwt.Paul, Mother):
"""Implements the Paul wavelet class.
Note that the input parameter f is the angular frequency and that
the default order for this wavelet is m=4.
"""
[docs] def psi(self, t):
"""Paul wavelet as described in Torrence and Compo (1998)."""
return (2 ** self.m * 1j ** self.m * np.prod(range(2, self.m + 1)) /
np.sqrt(np.pi * 2*np.prod(range(2, 2 * self.m + 1))) *
(1 - 1j * t) ** (-(self.m + 1)))
def _set_m(self, m):
# Sets the m derivative of a Gaussian, the degrees of freedom and the
# empirically derived factors for the wavelet bases C_{\delta},
# \gamma, \delta j_0 (Torrence and Compo, 1998, Table 2)
self.m = m # Wavelet order
self.dofmin = 2 # Minimum degrees of freedom
if self.m == 4:
self.cdelta = 1.132 # Reconstruction factor
self.gamma = 1.17 # Decorrelation factor for time averaging
self.deltaj0 = 1.50 # Factor for scale averaging
else:
self.cdelta = 1
self.gamma = 1
self.deltaj0 = 1
if self.m == 5:
# http://mark-bishop.net/signals/CWTReconstructionFactors.pdf
self.cdelta = -0.9065j
elif self.m == 6:
self.cdelta = -1.1879
elif self.m == 7:
self.cdelta = -1.2327j
elif self.m == 8:
self.cdelta = 1.2731
elif self.m == 10:
self.cdelta = -1.3441
elif self.m == 20:
self.cdelta = 1.5934
[docs]def get_wavelet(wavelet):
"""
Return a Mother wavelet object.
Args:
wavelet (str or Wavelet): e.g. 'Morlet(6)'.
Returns:
w (Mother): wavelet object.
"""
if isinstance(wavelet, str):
# Convert string to corresponding wavelet object.
wavelet = wavelet.lower()
if wavelet[:6] == 'morlet':
f0 = float(wavelet[7:-1])
w = Morlet(f0=f0)
else:
return NotImplementedError('get_wavelet not implemented for "{}".'.format(wavelet))
else:
# Assume wavelet already is a wavelet object.
w = wavelet
return w