Source code for nnsa.utils.segmentation

import warnings

import numpy as np


__all__ = [
    'compute_n_segments',
    'get_all_segments',
    'get_segment_times',
    'segment_generator',
]


[docs]def compute_n_segments(x, segment_length, overlap=0, fs=1, axis=0): """ The total number of segments that segment_generator() will generate. Args: x (np.ndarray): see segment_generator(). segment_length (float): see segment_generator(). overlap (float, optional): see segment_generator(). fs (float, optional): see segment_generator(). axis (int, optional): see segment_generator(). Returns: n_segments (int): number of segments that segment_generator() will generate. """ if segment_length is None: # Use entire signal. segment_length = x.shape[axis] / fs # Check input. if segment_length*fs > x.shape[axis]: return 0 # Step size in seconds. step = segment_length - overlap n_segments = int(np.floor((x.shape[axis]/fs - overlap) / step)) return n_segments
[docs]def get_all_segments(x, segment_length, overlap=0, fs=1, axis=0): """ Segment the data in x along the specified axis and return an array with all segments. Args: x (np.ndarray): see segment_generator(). segment_length (float): see segment_generator(). overlap (float, optional): see segment_generator(). fs (float, optional): see segment_generator(). axis (int, optional): see segment_generator(). Returns: all_segments (np.ndarray): array with all segments, where the first axis corresponds to the segments. """ reshape_possible = False if overlap == 0: # Number of samples per segment. num_samples_float = segment_length * fs num_samples = int(round(num_samples_float)) # Warn if the specified segment length is not exactly realizable with the sample frequency. if abs(num_samples_float - num_samples) > 1e-8: warnings.warn('\nSignals with sample frequency of {} Hz cannot be reshaped in segments of {} s.' 'Doing slower segmentation without reshaping.'.format(fs, segment_length)) else: reshape_possible = True num_samples_tot = x.shape[axis] num_segments = num_samples_tot // num_samples rest = num_samples_tot % num_samples if overlap == 0 and x.ndim == 1 and reshape_possible: # Reshaping is fastest. all_segments = x[:num_samples_tot - rest].reshape(num_segments, num_samples) elif overlap == 0 and x.ndim >= 2 and (axis == 0 or axis == -x.ndim) and reshape_possible: # Reshaping is faster. all_segments = x[:num_samples_tot - rest].reshape((num_segments, num_samples,) + x.shape[1:], order='C') elif overlap == 0 and x.ndim == 2 and (axis == x.ndim-1 or axis == -1) and reshape_possible: # Reshaping is faster. x = x.T all_segments = np.swapaxes(x[:num_samples_tot - rest].reshape((num_segments, num_samples,) + x.shape[1:], order='C'), 1, 2) else: # Use a generator (this one is safest option, but may be slower for short segment_length. seg_generator = segment_generator(x, segment_length=segment_length, overlap=overlap, fs=fs, axis=axis) all_segments = np.array(list(seg_generator)) return all_segments
[docs]def segment_generator(x, segment_length, overlap=0, fs=1, axis=0, error_mode='raise'): """ Return a generator that segments the data in x along the specified axis. Args: x (np.ndarray): array to be segmented. segment_length (float): length of the segment in seconds (specify `fs`). If None, uses entire signal length (will yield 1 segment). overlap (float, optional): overlap between successive segments in seconds (specify `fs`). Defaults to 0. fs (float, optional): sample frequency. By default fs is 1, meaning that the segment_length and overlap can be given as number of samples. Defaults to 1. axis (int, optional): the axis along which to segment the data. Defaults to 0. Yields: (np.ndarray): the next segment. Examples: >>> x = np.arange(11) >>> seg_gen = segment_generator(x, segment_length=3, overlap=1, fs=1) >>> np.asarray(list(seg_gen)) array([[ 0, 1, 2], [ 2, 3, 4], [ 4, 5, 6], [ 6, 7, 8], [ 8, 9, 10]]) """ if segment_length is None: # Use entire signal. segment_length = x.shape[axis] / fs # Number of samples per segment. num_samples_float = round(segment_length * fs, 8) num_samples = int(round(num_samples_float)) # Raise error or warn if the specified segment length is not exactly realizable with the sample frequency. if abs(num_samples_float - num_samples) > 1e-8: if error_mode == 'raise': msg = '\nDesired segment length of {} seconds cannot be realized with sample frequency of {} Hz. ' \ 'Choose a different segment length, resample the signal, or set `error_mode` to "warn" to ' \ 'automatically change the segment length.'.format( segment_length, fs) raise ValueError(msg) elif error_mode == 'warn': msg = '\nDesired segment length of {} seconds cannot be realized with sample frequency of {} Hz. ' \ 'Instead a segment length of {} seconds is used.'.format( segment_length, fs, num_samples / fs) warnings.warn(msg) else: raise ValueError('Invalid option error_mode="{}".'.format(error_mode)) if num_samples <= 0: raise ValueError('Segmentation window is too small.') # Start of segmentation (seconds). start = 0 # Step size in seconds. step = segment_length - overlap # Compute when to stop iterating (seconds). stop = x.shape[axis]/fs - segment_length + 1/fs # Loop over segments. for start_time in np.arange(start, stop, step): # Compute start idx in samples. start_idx = int(round(start_time*fs)) # Yield segment along the desired axis. slc = [slice(None)] * len(x.shape) slc[axis] = slice(start_idx, start_idx + num_samples) yield x[tuple(slc)]
[docs]def get_segment_times(num_segments, segment_length, overlap, offset=None): """ Compute the segments time array given the number of segment, length of one segment and the overlap between segments. Args: num_segments (int): total numer of segments. segment_length (float): segment length (in seconds). overlap (float): overlap between succesive segments (in seconds). offset (float, optional): offset for the segment times. If None, the offset will equal segment_length/2, so that the segment times will fall in the middle of the segments. Defaults to None. Returns: segment_times (np.ndarray): time (in seconds) array for the axis corresponding to segments. """ if offset is None: offset = segment_length / 2 # Segment time are centered in each segment. segment_times = offset + np.arange(num_segments) * (segment_length - overlap) return segment_times