Source code for nnsa.cwt.mothers

import psutil
import pycwt
import numpy as np
from pycwt.helpers import rect, fft, fft_kwargs
from scipy.signal import convolve2d


__all__ = [
    'Mother',
    'Morlet',
    'Paul',
    'get_wavelet',
]


[docs]class Mother(object):
[docs] def flambda(self): raise NotImplementedError
[docs] def scal2freq(self, s): f = 1 / (s * self.flambda()) return f
[docs] def freq2scal(self, f): s = 1 / (f * self.flambda()) return s
[docs] def get_J(self, s0, dj, min_freq=None): if s0 <= 0: raise ValueError('Minimum scale s0 should be positive.') if min_freq == 0: J = -1 else: # Determines min frequency together with s0 and ds. J = np.floor(np.log2(1 / (min_freq * s0 * self.flambda())) / dj) return int(J)
[docs] @staticmethod def get_dj(max_freq, min_freq, J): dj = np.log2(max_freq / min_freq) / J return dj
[docs] def smooth(self, *args, **kwargs): raise NotImplementedError
[docs] def coi(self, *args, **kwargs): raise NotImplementedError
[docs] def get_min_freq(self, dt, n0): # Return min frequency still outside the COI. fs = 1/dt return fs / (self.flambda() * self.coi() * (n0 - 1) / 2)
[docs] def get_max_scale(self, dt, n0): # Return min frequency still outside the COI. return self.freq2scal(self.get_min_freq(dt, n0))
[docs] def get_min_scale(self, dt): # Return minimum scale (Nyqvist). fs = 1/dt fmax = fs/2 return self.freq2scal(fmax)
[docs] def get_s0(self, dt): # Alias for self.get_min_scale(). return self.get_min_scale(dt)
[docs]class Morlet(pycwt.Morlet, Mother): """Implements the Morlet wavelet class. Note that the input parameters f and f0 are angular frequencies. f0 should be more than 0.8 for this function to be correct, its default value is f0 = 6. """ def __repr__(self): return '{}({})'.format(self.__class__.__name__, self.f0)
[docs] def psi_ft(self, f): """Fourier transform of the approximate Morlet wavelet.""" return (np.pi ** -0.25) * np.exp(-0.5 * (f - self.f0) ** 2) * (f > 0)
def _set_f0(self, f0): # Sets the Morlet wave number, the degrees of freedom and the # empirically derived factors for the wavelet bases C_{\delta}, # \gamma, \delta j_0 (Torrence and Compo, 1998, Table 2) self.f0 = f0 # Wave number self.dofmin = 2 # Minimum degrees of freedom if self.f0 == 6: self.cdelta = 0.776 # Reconstruction factor self.gamma = 2.32 # Decorrelation factor for time averaging self.deltaj0 = 0.60 # Factor for scale averaging else: self.cdelta = 1 self.gamma = 1 self.deltaj0 = 1 if self.f0 == 5: # http://mark-bishop.net/signals/CWTReconstructionFactors.pdf self.cdelta = 0.9484 elif self.f0 == 7: self.cdelta = 0.6616 elif self.f0 == 8: self.cdelta = 0.5758 elif self.f0 == 10: self.cdelta = 0.4579 elif self.f0 == 20: self.cdelta = 0.2272
[docs] def smooth(self, W, dt, dj, scales, time=True, scale=True, scale_window=1, time_window=1): """Smoothing function used in coherence analysis. The scale_window is a dimensionless parameters that scales the default window for scale averaging. Idem for time_window. """ # The smoothing is performed by using a filter given by the absolute # value of the wavelet function at each scale, normalized to have a # total weight of unity, according to suggestions by Torrence & # Webster (1999) and by Grinsted et al. (2004). isreal = np.isreal(W).all() m, n = W.shape # Filter in time. if time: k = 2 * np.pi * fft.fftfreq(fft_kwargs(W[0, :])['n']) k2 = k ** 2 snorm = scales / dt * time_window # Smoothing by Gaussian window (absolute value of wavelet function) # using the convolution theorem: multiplication by Gaussian curve in # Fourier domain for each scale, outer product of scale and frequency F = np.exp(-0.5 * (snorm[:, np.newaxis] ** 2) * k2) # Outer product W = fft.ifft(F * fft.fft(W, axis=1, **fft_kwargs(W[0, :])), axis=1, # Along Fourier frequencies **fft_kwargs(W[0, :], overwrite_x=True)) W = W[:, :n] # Remove possibly padded region due to FFT if isreal: W = W.real # Filter in scale. if scale: wsize = np.int(np.round(1/dj * scale_window)) # Take the number of scales in a octave (like Matlab). wsize = max([1, wsize]) win = np.ones(wsize)/wsize W = convolve2d(W, win[:, np.newaxis], 'same') # Scales are "vertical" return W
[docs]class Paul(pycwt.Paul, Mother): """Implements the Paul wavelet class. Note that the input parameter f is the angular frequency and that the default order for this wavelet is m=4. """
[docs] def psi(self, t): """Paul wavelet as described in Torrence and Compo (1998).""" return (2 ** self.m * 1j ** self.m * np.prod(range(2, self.m + 1)) / np.sqrt(np.pi * 2*np.prod(range(2, 2 * self.m + 1))) * (1 - 1j * t) ** (-(self.m + 1)))
def _set_m(self, m): # Sets the m derivative of a Gaussian, the degrees of freedom and the # empirically derived factors for the wavelet bases C_{\delta}, # \gamma, \delta j_0 (Torrence and Compo, 1998, Table 2) self.m = m # Wavelet order self.dofmin = 2 # Minimum degrees of freedom if self.m == 4: self.cdelta = 1.132 # Reconstruction factor self.gamma = 1.17 # Decorrelation factor for time averaging self.deltaj0 = 1.50 # Factor for scale averaging else: self.cdelta = 1 self.gamma = 1 self.deltaj0 = 1 if self.m == 5: # http://mark-bishop.net/signals/CWTReconstructionFactors.pdf self.cdelta = -0.9065j elif self.m == 6: self.cdelta = -1.1879 elif self.m == 7: self.cdelta = -1.2327j elif self.m == 8: self.cdelta = 1.2731 elif self.m == 10: self.cdelta = -1.3441 elif self.m == 20: self.cdelta = 1.5934
[docs]def get_wavelet(wavelet): """ Return a Mother wavelet object. Args: wavelet (str or Wavelet): e.g. 'Morlet(6)'. Returns: w (Mother): wavelet object. """ if isinstance(wavelet, str): # Convert string to corresponding wavelet object. wavelet = wavelet.lower() if wavelet[:6] == 'morlet': f0 = float(wavelet[7:-1]) w = Morlet(f0=f0) else: return NotImplementedError('get_wavelet not implemented for "{}".'.format(wavelet)) else: # Assume wavelet already is a wavelet object. w = wavelet return w