import warnings
import numpy as np
__all__ = [
'compute_n_segments',
'get_all_segments',
'get_segment_times',
'segment_generator',
]
[docs]def compute_n_segments(x, segment_length, overlap=0, fs=1, axis=0):
"""
The total number of segments that segment_generator() will generate.
Args:
x (np.ndarray): see segment_generator().
segment_length (float): see segment_generator().
overlap (float, optional): see segment_generator().
fs (float, optional): see segment_generator().
axis (int, optional): see segment_generator().
Returns:
n_segments (int): number of segments that segment_generator() will generate.
"""
if segment_length is None:
# Use entire signal.
segment_length = x.shape[axis] / fs
# Check input.
if segment_length*fs > x.shape[axis]:
return 0
# Step size in seconds.
step = segment_length - overlap
n_segments = int(np.floor((x.shape[axis]/fs - overlap) / step))
return n_segments
[docs]def get_all_segments(x, segment_length, overlap=0, fs=1, axis=0):
"""
Segment the data in x along the specified axis and return an array with all segments.
Args:
x (np.ndarray): see segment_generator().
segment_length (float): see segment_generator().
overlap (float, optional): see segment_generator().
fs (float, optional): see segment_generator().
axis (int, optional): see segment_generator().
Returns:
all_segments (np.ndarray): array with all segments, where the first axis corresponds to the segments.
"""
reshape_possible = False
if overlap == 0:
# Number of samples per segment.
num_samples_float = segment_length * fs
num_samples = int(round(num_samples_float))
# Warn if the specified segment length is not exactly realizable with the sample frequency.
if abs(num_samples_float - num_samples) > 1e-8:
warnings.warn('\nSignals with sample frequency of {} Hz cannot be reshaped in segments of {} s.'
'Doing slower segmentation without reshaping.'.format(fs, segment_length))
else:
reshape_possible = True
num_samples_tot = x.shape[axis]
num_segments = num_samples_tot // num_samples
rest = num_samples_tot % num_samples
if overlap == 0 and x.ndim == 1 and reshape_possible:
# Reshaping is fastest.
all_segments = x[:num_samples_tot - rest].reshape(num_segments, num_samples)
elif overlap == 0 and x.ndim >= 2 and (axis == 0 or axis == -x.ndim) and reshape_possible:
# Reshaping is faster.
all_segments = x[:num_samples_tot - rest].reshape((num_segments, num_samples,) + x.shape[1:],
order='C')
elif overlap == 0 and x.ndim == 2 and (axis == x.ndim-1 or axis == -1) and reshape_possible:
# Reshaping is faster.
x = x.T
all_segments = np.swapaxes(x[:num_samples_tot - rest].reshape((num_segments, num_samples,) + x.shape[1:],
order='C'), 1, 2)
else:
# Use a generator (this one is safest option, but may be slower for short segment_length.
seg_generator = segment_generator(x, segment_length=segment_length,
overlap=overlap, fs=fs, axis=axis)
all_segments = np.array(list(seg_generator))
return all_segments
[docs]def segment_generator(x, segment_length, overlap=0, fs=1, axis=0, error_mode='raise'):
"""
Return a generator that segments the data in x along the specified axis.
Args:
x (np.ndarray): array to be segmented.
segment_length (float): length of the segment in seconds (specify `fs`). If None, uses entire signal length
(will yield 1 segment).
overlap (float, optional): overlap between successive segments in seconds (specify `fs`).
Defaults to 0.
fs (float, optional): sample frequency. By default fs is 1, meaning that the segment_length and overlap can be
given as number of samples.
Defaults to 1.
axis (int, optional): the axis along which to segment the data.
Defaults to 0.
Yields:
(np.ndarray): the next segment.
Examples:
>>> x = np.arange(11)
>>> seg_gen = segment_generator(x, segment_length=3, overlap=1, fs=1)
>>> np.asarray(list(seg_gen))
array([[ 0, 1, 2],
[ 2, 3, 4],
[ 4, 5, 6],
[ 6, 7, 8],
[ 8, 9, 10]])
"""
if segment_length is None:
# Use entire signal.
segment_length = x.shape[axis] / fs
# Number of samples per segment.
num_samples_float = round(segment_length * fs, 8)
num_samples = int(round(num_samples_float))
# Raise error or warn if the specified segment length is not exactly realizable with the sample frequency.
if abs(num_samples_float - num_samples) > 1e-8:
if error_mode == 'raise':
msg = '\nDesired segment length of {} seconds cannot be realized with sample frequency of {} Hz. ' \
'Choose a different segment length, resample the signal, or set `error_mode` to "warn" to ' \
'automatically change the segment length.'.format(
segment_length, fs)
raise ValueError(msg)
elif error_mode == 'warn':
msg = '\nDesired segment length of {} seconds cannot be realized with sample frequency of {} Hz. ' \
'Instead a segment length of {} seconds is used.'.format(
segment_length, fs, num_samples / fs)
warnings.warn(msg)
else:
raise ValueError('Invalid option error_mode="{}".'.format(error_mode))
if num_samples <= 0:
raise ValueError('Segmentation window is too small.')
# Start of segmentation (seconds).
start = 0
# Step size in seconds.
step = segment_length - overlap
# Compute when to stop iterating (seconds).
stop = x.shape[axis]/fs - segment_length + 1/fs
# Loop over segments.
for start_time in np.arange(start, stop, step):
# Compute start idx in samples.
start_idx = int(round(start_time*fs))
# Yield segment along the desired axis.
slc = [slice(None)] * len(x.shape)
slc[axis] = slice(start_idx, start_idx + num_samples)
yield x[tuple(slc)]
[docs]def get_segment_times(num_segments, segment_length, overlap, offset=None):
"""
Compute the segments time array given the number of segment, length of one segment and the overlap between segments.
Args:
num_segments (int): total numer of segments.
segment_length (float): segment length (in seconds).
overlap (float): overlap between succesive segments (in seconds).
offset (float, optional): offset for the segment times. If None, the offset will equal segment_length/2,
so that the segment times will fall in the middle of the segments.
Defaults to None.
Returns:
segment_times (np.ndarray): time (in seconds) array for the axis corresponding to segments.
"""
if offset is None:
offset = segment_length / 2
# Segment time are centered in each segment.
segment_times = offset + np.arange(num_segments) * (segment_length - overlap)
return segment_times