Source code for nnsa.utils.event_detections



import numpy as np

__all__ = [
    'get_onsets_offsets',
    'time_threshold',
]


[docs]def get_onsets_offsets(detected, fs=None): """ Compute the onsets and offsets of events in detected. Treats nans as not detected. Args: detected (np.ndarray): 1D array with 1s (detected) and 0s (not-detected). fs (float, optional): sampling frequency (optional). If given, the returned onsets and offsets are in seconds. If not given, in samples. Returns: onsets (np.ndarray): array with indices (or times) corresponding to onsets. offsets (np.ndarray): array with indices (or times) corresponding to offsets. Examples: >>> get_onsets_offsets(np.array([1, 1, 1, 0, 0, 1, 1, 0])) (array([0, 5], dtype=int64), array([3, 7], dtype=int64)) >>> get_onsets_offsets(np.array([0, 1, 1, 1, 0, 1, 1, 1])) (array([1, 5], dtype=int64), array([4, 8], dtype=int64)) >>> get_onsets_offsets(np.array([np.nan, 0, 1, np.nan, 1, 0])) (array([2, 4], dtype=int64), array([3, 5], dtype=int64)) """ if detected.ndim != 1: raise ValueError('`detected` should be a 1D array. Got an array with shape {}.' .format(detected.shape)) # Replace NaNs with zeros. detected = np.nan_to_num(detected) # Find state transitions. onsets = np.where(np.diff(detected, prepend=0) == 1)[0] offsets = np.where(np.diff(detected, append=0) == -1)[0] + 1 if fs is not None: onsets = onsets / fs offsets = offsets / fs if len(onsets) != len(offsets): raise AssertionError('This should never happen.') return onsets, offsets
[docs]def time_threshold(detected, min_duration=0, max_duration=np.inf): """ Remove detected events that last > max_duration or < min_duration (number of samples). Args: detected (np.ndarray): 1D array containing 1s at samples with detected events and 0s at samples without the event (e.g. burst mask). min_duration (int, optional): minimum number of samples that a detected event must last. If the duration of the event (in number of samples) is less than this value, the event is removed by replacing the 1s with 0s (convert those samples from detected to undetected). Defaults to 0. max_duration (int, optional): maximum number of samples that a detected event can last. If the duration of the event (in number of samples) is larger than this value, the event is removed by replacing the 1s with 0s (convert those samples from detected to undetected). Defaults to np.inf. Returns: detected_joined (np.ndarray): new array with same shape as input detected. Examples: >>> detected = np.array([1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1]) >>> print(detected) [1 1 1 1 1 0 0 1 0 1 1 1 0 1 1 0 1 1 1 1 0 1] >>> detected_thresholded = time_threshold(detected, min_duration=3) >>> print(detected_thresholded) [1 1 1 1 1 0 0 0 0 1 1 1 0 0 0 0 1 1 1 1 0 0] >>> detected_thresholded = time_threshold(detected, max_duration=3) >>> print(detected_thresholded) [0 0 0 0 0 0 0 1 0 1 1 1 0 1 1 0 0 0 0 0 0 1] >>> detected_thresholded = time_threshold(detected, min_duration=3, max_duration=4) >>> print(detected_thresholded) [0 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 1 1 1 1 0 0] """ # Make a copy of the detected array (do not mutate the input). detected_thresholded = np.copy(detected).astype(int) # Find onsets and offsets. d = np.diff(detected_thresholded) idx_onsets = np.where(d == 1)[0] + 1 # First 1. idx_offsets = np.where(d == -1)[0] # Last 1. # If begins with 1, add offset 0. if detected_thresholded[0] == 1: idx_onsets = np.concatenate(([0], idx_onsets)) # If ends with 1, add offset. if detected_thresholded[-1] == 1: idx_offsets = np.concatenate((idx_offsets, [len(detected_thresholded) - 1])) assert len(idx_onsets) == len(idx_offsets) # Loop over onset and offset of each detected event. for idx_on, idx_off in zip(idx_onsets, idx_offsets): duration = idx_off - idx_on + 1 if duration < min_duration or duration > max_duration: # Remove detected event. detected_thresholded[idx_on: idx_off + 1] = 0 return detected_thresholded
def join_events(detected, min_separation): """ Join periods with separation < min_separation (number of samples). Args: detected (np.ndarray): 1D array containing 1s at samples with detected events and 0s at samples without the event (e.g. burst mask). min_separation (int): minimum number of samples that has to be in between two detected events. If the separation (in number of samples) between events is lower than this value, the two events are merged by replacing the samples in between them with 1s (convert those samples from undetected to detected). Returns: detected_joined (np.ndarray): new array with same shape as input detected. Examples: >>> detected = np.array([1, 1, 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 1]) >>> print(detected) [1 1 0 0 0 1 0 1 1 0 0 0 0 1 0 0 1] >>> detected_joined = join_events(detected, min_separation=3) >>> print(detected_joined) [1 1 0 0 0 1 1 1 1 0 0 0 0 1 1 1 1] """ # Make a copy of the detected array (do not mutate the input). detected_joined = np.copy(detected) # Find onsets and offsets. d = np.diff(detected_joined) idx_onsets = np.where(d == 1)[0] + 1 # First 1. idx_offsets = np.where(d == -1)[0] # Last 1. # If first onset is earlier than first offset: remove this onset. if idx_onsets[0] < idx_offsets[0]: idx_onsets = idx_onsets[1:] # Loop over offset of current and onset of next event. for idx_off, idx_on in zip(idx_offsets, idx_onsets): if idx_on - idx_off - 1 < min_separation: detected_joined[idx_off + 1: idx_on] = 1 return detected_joined