"""
Code related to annotation sets.
"""
import copy
import operator
from collections import Counter
import numpy as np
import pandas as pd
from nnsa.annotations.annotation import Annotation
from nnsa.annotations.config import STANDARD_ANNOTATIONS, SLEEP_LABELS, NO_LABEL
from nnsa.annotations.sleep_stages import standardize_annotation
from nnsa.utils.plotting import shade_axis
from nnsa.utils.conversions import convert_time_scale
__all__ = [
'AnnotationSet',
]
[docs]class AnnotationSet(object):
"""
Holds a collection of annotations (e.g. all annotations from one file/investigator).
Args:
annotations (iterable, optional): iterable collection with Annotation objects (e.g. list, generator).
label (str, optional): a label for this AnnotationSet (e.g. code/name of the observer).
"""
def __init__(self, annotations=None, label='annotations'):
self.label = label
self._annotations = []
# Append annotations given in annotations_list to the annotation set.
if annotations is not None:
self.extend(annotations, inplace=True)
def __getitem__(self, item):
"""
Return annotation at index `item`.
"""
return self.annotations[item]
def __iter__(self):
"""
Return iterator that iterates over Annotation objects in this AnnotationSet.
This makes it possible to iterate over the annotations list by iterating over the AnnotationSet directly
instead of having to iterate over AnnotationSet.annotations.
Returns:
(list_iterator): iterator that iterates over Annotation objects in this AnnotationSet.
"""
# Return the iterator of the list that holds the Annotation objects.
return self._annotations.__iter__()
def __len__(self):
"""
Return the number of the annotations in the set.
Returns:
(int): length of annotations list.
"""
return len(self.annotations)
def __repr__(self):
"""
Return a comprehensive info string about this object.
Returns:
(str): a comprehensive info string about this object.
"""
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
return '{} with label: "{}" and annotations:\n{}'.format(self.__class__.__name__, self.label,
self.to_dataframe())
@property
def annotations(self):
"""
Return the list with annotations.
Returns:
(list): list with edfreadpy.Annotation objects.
"""
return self._annotations
[docs] def add_postfix(self, postfix, inplace=False):
"""
Add postfix to all annotations.
Args:
prefix (str): string to add after each Annotation's text.
inplace (bool, optional): if True, modify the annotation in place.
If False, make a copy and modify the copy.
Returns:
annotation_set (edfreadpy.AnnotationSet): new AnnotationSet object with the modified annotations
(if inplace is False).
"""
if inplace:
annotation_set = self
else:
# Create a copy of the annotation set (to preserve the original one).
annotation_set = copy.deepcopy(self)
# Iterate over the to be added annotations.
for annot in annotation_set:
annot.text = annot.text + postfix
if not inplace:
return annotation_set
[docs] def add_prefix(self, prefix, inplace=False):
"""
Add prefix to all annotations.
Args:
prefix (str): string to add in front of each Annotation's text.
inplace (bool, optional): if True, modify the annotation in place.
If False, make a copy and modify the copy.
Returns:
annotation_set (edfreadpy.AnnotationSet): new AnnotationSet object with the modified annotations
(if inplace is False).
"""
if inplace:
annotation_set = self
else:
# Create a copy of the annotation set (to preserve the original one).
annotation_set = copy.deepcopy(self)
# Iterate over the to be added annotations.
for annot in annotation_set:
annot.text = prefix + annot.text
if not inplace:
return annotation_set
[docs] def append(self, annotation, inplace=False):
"""
Safely append an annotation to the annotation set.
Args:
annotation (edfreadpy.Annotation): Annotation object.
inplace (bool, optional): if True, appends the annotation in place. If False, a new AnnotationSet object
with the appended annotation is returned.
Defaults to False.
Returns:
annotation_set (edfreadpy.AnnotationSet): new AnnotationSet object with the appended annotation
(if inplace is False).
"""
# Check whether to be appended object is an annotation object.
if not isinstance(annotation, Annotation):
raise ValueError('Invalid object of type "{}" to append to {}. Object should be "Annotation"'
.format(type(annotation).__name__, type(self).__name__))
if inplace:
# Append annotation to annotation list.
self._annotations.append(annotation)
else:
# Create a copy of the annotation set (to preserve the original one).
annotation_set = copy.deepcopy(self)
annotation_set._annotations.append(annotation)
return annotation_set
[docs] def clip_overlaps(self, how='first', inplace=False):
"""
Clip annotations if they overlap.
Args:
how (str, optional): specifies which annotation is clipped. Choose from 'first', 'second'.
For two overlapping annotations, either the first one can be shortered ('first') by
decreasing its duration or the second one ('second') can be shortened by increasing its onset.
inplace (bool, optional): if True, clips the annotations in place. If False, a new AnnotationSet
object with the clipped annotations is returned.
Defaults to False.
Returns:
as_out (nnsa.AnnotationSet): AnnotationSet object with clipped annotations
(returned only if inplace is False).
"""
# Sort.
if inplace:
self.sort(inplace=True)
as_out = self
else:
as_out = self.sort(inplace=False)
# Find gaps in between annotations.
tol = 1e-3 # in seconds.
durations = as_out.durations()
onsets = as_out.onsets()
endings = onsets + durations
gaps = onsets[1:] - endings[:-1]
overlap_idx = np.where(gaps < -tol)[0] + 1
for idx in overlap_idx:
if how == 'first':
as_out.annotations[idx - 1].duration = onsets[idx] - onsets[idx - 1]
elif how == 'second':
as_out.annotations[idx].onset = onsets[idx - 1] + durations[idx - 1]
else:
raise ValueError('Invalid parameter how="{}". Choose from {}.'
.format(how, ['first', 'second']))
if not inplace:
return as_out
[docs] def compute_nan_durations(self, total_duration=None, inplace=False):
"""
Compute duration from inter-annotation interval for annotations with duration np.nan.
Args:
total_duration (float): total duration of the annotated recording (needed to determine the duration of the
last annotation).
inplace (bool, optional): apply inplcace (True) or return a new object (False).
Defaults to False.
Returns:
annotations (nnsa.AnnotationSet): new AnnotationSet with Annotations (if inplace is False).
"""
if not inplace:
# Create a copy of the annotation set (to preserve the original ones).
annotations = copy.deepcopy(self)
else:
annotations = self
onsets = self.onsets()
order_onsets = np.argsort(onsets)
onsets_sorted = onsets[order_onsets]
# Check end time.
if total_duration is None:
total_duration = onsets_sorted[-1]
elif total_duration < onsets_sorted[-1]:
raise ValueError('Specified total duration ({} s) is higher than onset of last annotation ({} s).'.format(
total_duration, onsets_sorted[-1]))
# Add end time.
onsets_sorted = np.append(onsets_sorted, total_duration)
# Compute inter-annotation intervals.
intervals = np.diff(onsets_sorted)
for i, a in enumerate(annotations):
if np.isnan(a.duration):
a.duration = intervals[order_onsets[i]]
if not inplace:
return annotations
[docs] def count_neighbors(self, text):
"""
Count the number of times a certain text is a neighbour (in time) of a specified text.
Assumes that the annotations are sorted by onset.
Args:
text (str): annotation text of which the neighbours should be counted.
Returns:
(pd.DataFrame): dataframe with counts where the index represents the left neighbour and the column
represents the right neighbour
"""
all_texts = self.texts()
unique_texts = np.unique(all_texts).tolist()
idx_target = np.where(all_texts == text)[0]
count = np.zeros((len(unique_texts) + 1, len(unique_texts) + 1), dtype=int)
for i in idx_target:
row_idx = unique_texts.index(all_texts[i - 1]) if i > 0 else len(unique_texts)
col_idx = unique_texts.index(all_texts[i + 1]) if i < len(all_texts) - 1 else len(unique_texts)
count[row_idx, col_idx] += 1
unique_texts.append('no_neighbor')
return pd.DataFrame(data=count, index=unique_texts, columns=unique_texts)
[docs] def durations(self):
"""
Return the durations of all annotations in the set.
Returns:
(np.ndarray): the durations (in seconds) of all annotations in the set.
"""
return np.array([a.duration for a in self.annotations])
[docs] def end_time(self):
"""
Return the time at which the last annotation ends.
Returns:
(float): the time (in seconds) at which the last annotation ends.
"""
last_annot = self.sort().annotations[-1]
return last_annot.onset + last_annot.duration
[docs] def extend(self, annotations, inplace=False):
"""
Safely extend the list with annotation with another annotations from an iterable collection.
Args:
annotations (collection): iterable collection with Annotation objects (e.g. list, generator).
inplace (bool, optional): if True, extends the annotations in place. If False, a new AnnotationSet object
extended with the annotations is returned.
Defaults to False.
Returns:
annotation_set (edfreadpy.AnnotationSet): new AnnotationSet object with the appended annotations
(if inplace is False).
"""
if inplace:
annotation_set = self
else:
# Create a copy of the annotation set (to preserve the original one).
annotation_set = copy.deepcopy(self)
# Iterate over the to be added annotations and append them.
for annot in annotations:
annotation_set.append(annot, inplace=True)
if not inplace:
return annotation_set
[docs] def fill_unlabeled_periods(self, label_to_insert=None, begin=None, end=None, inplace=False):
"""
Insert Annotation objects into the AnnotationSet to make the annotations continuous.
An Annotation with label `label_to_insert` is inserted between two succesive annotations if they are
not continuous, i.e. the start of the second annotation does not coincide with the end of the first one.
Args:
label_to_insert (str, optional): the text that is used for the Annotations that are inserted in the
unlabeled periods. If None, NO_LABEL is used.
Defaults to None.
begin (flaot, optional): begin time of period to fill. Cannot be greater than the first onset.
If None, the first annotation in the set marks the beginning.
Defaults to None.
end (float, optional): end time of period to fill. Cannot be smaller than the end time of the last
annotation. If None, the last annotation marks the end.
Defaults to None.
inplace (bool, optional): if True, fills the annotations in place. If False, a new AnnotationSet object
with the filled annotations is returned.
Defaults to False.
Returns:
annotation_set (nnsa.AnnotationSet): new AnnotationSet object that has continuous labels
(if inplace is False).
"""
def insert_annotation(text, onset, duration, begin, end, annotation_set, insert_idx):
# Check if the Annotation falls inside the range to be filled.
if begin <= onset <= end:
# Clip the duration if it falls outside the range.
max_duration = end - onset
duration = min([duration, max_duration])
annotation_set.annotations.insert(insert_idx,
Annotation(text=text,
onset=onset,
duration=duration))
if label_to_insert is None:
# Default label to insert is NO_LABEL (see the config.py file for the exact characters that are used).
label_to_insert = NO_LABEL
# Sort.
if inplace:
self.sort(inplace=True)
annotation_set = self
else:
annotation_set = self.sort(inplace=False)
# Find gaps in between annotations.
tol = 1e-2 # in seconds.
durations = annotation_set.durations()
onsets = annotation_set.onsets()
endings = onsets + durations
# Check begin and end.
if begin is None:
begin = onsets[0]
if end is None:
end = endings[-1]
# Find gaps (unlabbeled_periods) to fill.
gaps = onsets[1:] - endings[:-1]
gaps_idx = np.where(gaps > tol)[0]
# Check for overlapping sleep stage annotations.
if any(gaps < -tol):
raise RuntimeError('Overlapping sleep stage annotations at onset(s) {}.'.format(onsets[1:][[gaps < -tol]]))
# Insert a new annotation in the gaps.
for num_additions, idx in enumerate(gaps_idx):
onset = endings[idx]
duration = gaps[idx]
insert_idx = idx + 1 + num_additions
insert_annotation(text=label_to_insert, onset=onset, duration=duration, begin=begin, end=end,
annotation_set=annotation_set, insert_idx=insert_idx)
# Check if we need to put an Annotation before the first one.
if onsets[0] > begin:
onset = begin
duration = onsets[0] - onset
insert_idx = 0
insert_annotation(text=label_to_insert, onset=onset, duration=duration, begin=begin, end=end,
annotation_set=annotation_set, insert_idx=insert_idx)
# Check if we need to put an Annotation after the last one.
if endings[-1] < end:
onset = endings[-1]
duration = end - onset
insert_idx = len(annotation_set) # Insert at the end of the list.
insert_annotation(text=label_to_insert, onset=onset, duration=duration, begin=begin, end=end,
annotation_set=annotation_set, insert_idx=insert_idx)
if not inplace:
return annotation_set
[docs] def filter(self, patterns, how='keep', case_sensitive=True, inplace=False):
"""
Filter the annotation by looking for matching patterns in the texts.
Args:
patterns (str or list): pattern(s) to find matches for.
how (str, optional): whether to keep or remove the annotations that match the patterns.
Options:
- 'keep'
- 'remove'
Defaults to 'keep'.
case_sensitive (bool, optional): whether the pattern matching should be case sensitive.
Defaults to True.
inplace (bool, optional): if True, filters the annotations in place. If False, a new AnnotationSet
object with the filtered annotations is returned.
Defaults to False.
Returns:
as_out (edfreadpy.AnnotationSet): new AnnotationSet with a subset of the annotations.
Examples:
>>> annot_set = AnnotationSet()
>>> annot_set.append(Annotation(onset=10.25, duration=100, text='QS'), inplace=True)
>>> annot_set.append(Annotation(110.25, 200, 'AS'), inplace=True)
>>> annot_set.append(Annotation(500, 300, 'QS'), inplace=True)
>>> annot_set.append(Annotation(800, 10, 'AS'), inplace=True)
>>> annot_set.print_all_annotations()
onset duration text
0 10.25 100 QS
1 110.25 200 AS
2 500.00 300 QS
3 800.00 10 AS
>>> as_new = annot_set.filter('QS', how='keep')
>>> as_new.print_all_annotations()
onset duration text
0 10.25 100.0 QS
1 500.00 300.0 QS
>>> as_new = annot_set.filter('QS', how='remove')
>>> as_new.print_all_annotations()
onset duration text
0 110.25 200 AS
1 800.00 10 AS
"""
if not isinstance(patterns, list):
patterns = [patterns]
def fun_keep(text):
# Function that returns True if any pattern in text, else False.
keep = False
for pat in patterns:
if case_sensitive:
keep = pat in text
else:
keep = pat.lower() in text.lower()
if keep:
break # Exit loop.
return keep
def fun_remove(text):
# Function that returns True if None of the pattern in text, else False.
keep = True
for pat in patterns:
if case_sensitive:
keep = pat not in text
else:
keep = pat.lower() not in text.lower()
if not keep:
break
return keep
if how.lower() == 'keep':
return self.filter_fun(fun=fun_keep, pass_text=True, inplace=inplace)
elif how.lower() == 'remove':
return self.filter_fun(fun=fun_remove, pass_text=True, inplace=inplace)
else:
raise ValueError('Invalid option for `how`: "{}". Choose from: {}.'.format(
how, ['keep', 'remove']))
[docs] def filter_fun(self, fun, pass_text=True, inplace=False):
"""
Filter the annotation keeping only annotations that yield True after evaluating fun on them.
Args:
fun (function): function that takes in a string or Annotation object (depending on `pass_text`)
and returns True or False. Annotations that yield False are removed.
pass_text (bool): if Ture, the annotation text (str) is passed to `fun`, otherwise the Annotation
object is passed.
inplace (bool, optional): if True, filters the annotations in place. If False, a new AnnotationSet
object with the filtered annotations is returned.
Defaults to False.
Returns:
as_out (edfreadpy.AnnotationSet): new AnnotationSet with annotations that match the specified
pattern (only retruned if inplace is False).
"""
if inplace:
as_out = self
else:
# Create a copy of the annotation set (to preserve the original one).
as_out = copy.deepcopy(self)
# Create new set and append each annotation that matches the pattern.
idx = 0
while idx < len(as_out.annotations):
an_i = as_out.annotations[idx]
if pass_text:
# Only use the text.
an_i = an_i.text
# Check if any pattern matches the text.
keep = fun(an_i)
# Check if we should keep the annotation.
if not keep:
as_out.annotations.pop(idx)
# Do not increment idx, since we just popped one out of the list.
else:
# Go the next one by incrementing idx.
idx += 1
if not inplace:
return as_out
[docs] def from_df(self, df, onset='onset', duration='duration', text='text', **kwargs):
"""
Convert a DataFrame to an AnnotationSet.
Args:
df (pd.DataFrame): a DataFrame.
onset (str, optional): column name in df that corresponds to the onset times (in seconds).
duration: (str, optional): column name in df that corresponds to the duration times (in seconds).
text: (str, optional): column name in df that corresponds to the annotation text.
**kwargs (optional): optional keyword arguments for AnnotationSet().
Returns:
annotation_set (AnnotationSet): AnnotationSet.
"""
annotation_set = self.from_lists(df[onset], df[duration], df[text])
return annotation_set
[docs] def from_lists(self, onsets, durations, texts, **kwargs):
"""
Convert a DataFrame to an AnnotationSet.
Args:
onsets (list, np.ndarray): list with the onset times (in seconds).
durations (list, np.ndarray): list with the duration times (in seconds).
texts (list, np.ndarray): list with the annotation texts.
**kwargs (optional): optional keyword arguments for AnnotationSet().
Returns:
annotation_set (AnnotationSet): AnnotationSet.
"""
# Initialize empty annotation set.
annotation_set = self.__class__(**kwargs)
# Go over entries in df.
for (on, dur, t) in zip(onsets, durations, texts):
annotation_set.append(Annotation(on, dur, t), inplace=True)
return annotation_set
[docs] def from_mask(self, mask, time=None, label_mapping=None, label=None):
"""
Convert a mask with arbitrary number of classes to an annotation set.
Args:
mask (np.ndarray): 1D mask array containing discrete values.
time (np.ndarray): time array corresponding to starttimes of `mask`.
If None, the onset and durations in the returned AnnotationSet are
in samples.
label_mapping (dict, optional): dictionary mapping a value in the mask to a label (str).
Those labels will become the texts in the annotation set.
If None, the mask values will be the labels.
label (str, optional): label for the annotation set.
Defaults to None.
Returns:
annotation_set (AnnotationSet): annotation set with onsets, durations and texts.
Examples:
>>> time = np.arange(10)
>>> mask = [0, 0, 1, 1, 1, 0, 0, 2, 2, 2]
>>> AnnotationSet().from_mask(time, mask)
AnnotationSet with label: "None" and annotations:
onset duration text
0 0 2 0
1 2 3 1
2 5 2 0
3 7 3 2
>>> AnnotationSet().from_mask(time, mask, label_mapping={0: 'null', 1: 'one', 2: 'two'})
AnnotationSet with label: "None" and annotations:
onset duration text
0 0 2 null
1 2 3 one
2 5 2 null
3 7 3 two
"""
# Check input.
mask = np.asarray(mask).squeeze()
if time is None:
time = np.arange(len(mask))
else:
time = np.asarray(time).squeeze()
if time.shape != mask.shape:
raise ValueError('`time` and `mask` should have the same shape.')
if time.ndim != 1:
raise ValueError('`time` and `mask` should be 1-dimensional arrays.')
if label_mapping is None:
label_mapping = dict()
# Initialize empty annotation set.
annotation_set = self.__class__(label=label)
# Extrapolate the time vector with one sample.
time = np.append(time, 2*time[-1] - time[-2])
# Add epochs with same mask value to annotation set.
transition_idx = np.append(np.nonzero(np.diff(mask))[0], len(mask) - 1)
t_onset = time[0]
for idx in transition_idx:
# Compute duration.
duration = time[idx + 1] - t_onset
# Extract text label.
mask_value = mask[idx]
label = label_mapping.get(mask_value, str(mask_value))
# Add annotation.
annotation = Annotation(onset=t_onset, duration=duration, text=label)
annotation_set.append(annotation, inplace=True)
t_onset = time[idx + 1]
return annotation_set
[docs] def get_occurrence_count(self):
"""
Return the unqiue annotation texts and their number of occurences (sorted).
Returns:
sorted_text_count (dict): dictionary that maps unique annotations in the annotation set to the
frequency/counts of that annotation in the set.
"""
# Collect all annotation texts.
all_texts = []
for a in self:
all_texts.append(a.text)
# Print annotation texts and their occurrences.
texts_counted = Counter(all_texts)
sorted_text_count = sorted(texts_counted.items(), key=operator.itemgetter(1), reverse=True)
return sorted_text_count
[docs] def get_overlapping_annotations(self, begin=-np.inf, end=np.inf, clip=False, start=None):
"""
Return a sub set of the annotations with annotations that (partially) overlap with the specified time interval.
Args:
begin (float): begin time in seconds of the time interval.
end (float): end time in seconds of the time interval.
clip (bool): if True, clips the first and last annotations to the begin and end times.
If False, keeps original lengths.
start (flaot): overrides `begin` (for backwards compatibility).
Returns:
(nnsa.AnnotationSet): subset of the current AnnotationSet.
"""
if start is not None:
# For backwards compatibility.
begin = start
# Create an empty annotation object with same properies as current.
annotation_set = copy.deepcopy(self)
annotation_set._annotations = []
# Loop over annotations (copy such that clipping is never inplace).
for annot in copy.deepcopy(self):
an_start = annot.onset
an_end = an_start + annot.duration
if begin <= an_start < end or begin <= an_end <= end or an_start <= begin <= an_end:
if clip:
# Clip to begin and end times.
if annot.onset < begin:
dt = begin - annot.onset
annot.onset = begin
annot.duration -= dt
if (annot.onset + annot.duration) > end:
annot.duration = end - annot.onset
annotation_set.append(annot, inplace=True)
return annotation_set
[docs] def get_text_times(self, texts):
"""
Return the start and end times of a specific annotation text.
Args:
texts (str or list): annotation text(s) to get the start and end times for.
Returns:
begin_times (np.ndarray): begin (onset) times.
end_times (np.ndarray): end times.
"""
if isinstance(texts, str):
texts = [texts]
if not isinstance(texts, list):
raise TypeError('Expected `texts` to be a list, but got type {}.'.format(type(texts)))
begin_times = []
end_times = []
for annot in self:
if annot.text in texts:
begin_times.append(annot.onset)
end_times.append(annot.onset + annot.duration)
return np.array(begin_times), np.array(end_times)
[docs] def interpolate_artefacts(self, artefact_texts=None, max_duration=180, inplace=False):
"""
Interpolate artefact segments.
Replaces an artefact annotation by the surrounding class labels if the artefact annotation is surrounded by
equal class labels and if its duration is less than the specified max_duration.
Assumes the annotations are sorted by onset. Ultimately merges equal succesive labels.
Args:
artefact_texts (str or list, optional): annotation texts indicating artefact segments, can be a single
string or a list of strings when multiple texts indicate artefacts. If None, a default set of
artefact-related annotation texts are used.
Defaults to None.
max_duration (float, optional): maximum duration (in seconds) that the artefact annotation can have
to get replaced.
Defaults to 180.
inplace (bool, optional): if True, replaces the annotations in place. If False, a new AnnotationSet
object with the replaced annotations is returned.
Defaults to False.
Returns:
standardized_annotations (nnsa.AnnotationSet): new AnnotationSet object with potentially less artefact
annotations/labels (if inplace is False).
"""
if inplace:
interpolated_annotations = self
else:
# Create a copy of the annotations (to preserve the original ones).
interpolated_annotations = copy.deepcopy(self)
if artefact_texts is None:
# Default list of artefact-related texts.
artefact_texts = [
STANDARD_ANNOTATIONS['artefact'],
SLEEP_LABELS['artefact'],
SLEEP_LABELS['no_label'],
]
# Make sure af_labels is a list (could be a string).
elif not isinstance(artefact_texts, (list, tuple)):
artefact_texts = [artefact_texts]
# Find annotations whose duration is short enough (exclude first or last one).
durations = self.durations()
idx_short = np.where(durations[1: -1] <= max_duration)[0] + 1
# Loop over the short annotations.
annotations = interpolated_annotations.annotations
for idx in idx_short:
# Check if it is an artefact annotation.
an_cur = annotations[idx]
if an_cur.text in artefact_texts:
# Check if previous and next annotation are the same.
prev_label = annotations[idx - 1].text
next_label = annotations[idx + 1].text
if prev_label not in artefact_texts and prev_label == next_label:
# Replace af annotation with previous label.
an_cur.text = prev_label
# Merge successive labels.
interpolated_annotations.merge_successive_texts(inplace=True)
if not inplace:
return interpolated_annotations
[docs] def merge_successive_texts(self, inplace=False):
"""
Merge successive annotations if there texts are the same.
Args:
inplace (bool, optional): if True, merges the annotations in place. If False, a new AnnotationSet object
with the merged annotations is returned.
Defaults to False.
Returns:
annotation_set (nnsa.AnnotationSet): new AnnotationSet object with successive labels merged
(if inplace is False).
"""
# Sort.
if inplace:
self.sort(inplace=True)
annotation_set = self
else:
annotation_set = self.sort(inplace=False)
# Tolerance for checking equality of floats.
tol = 1e-7
# Iterate over annotations and merge if successive annotations are equal.
annotations = annotation_set.annotations
idx = 0
while idx < len(annotations) - 1:
annot_cur = annotations[idx]
annot_next = annotations[idx + 1]
if annot_cur.text == annot_next.text and (abs((annot_cur.onset + annot_cur.duration) - annot_next.onset) < tol):
annot_cur.duration += annot_next.duration
annotations.pop(idx + 1)
else:
idx += 1
if not inplace:
return annotation_set
[docs] def onsets(self):
"""
Return the onsets of all annotations in the set.
Returns:
(np.ndarray): the onsets (in seconds) of all annotations in the set.
"""
return np.array([a.onset for a in self.annotations])
[docs] def print_annotation(self, index, print_header=True):
"""
Print the onset, text and duration of an annotation specified by the index.
Args:
index (int): the index of the anntation in self.annotations list to print.
print_header (bool, optional): if True, a header will be printed, if False, not.
Defaults to True.
"""
# The onset, text and duration should be separated by fixed widths.
string_format_header = '{:<12} {:<14} {:<60}'
string_format_data = '{:<12.2f} {:<14.2f} {:<60}'
if print_header:
print(string_format_header.format('Onset (s)', 'Duration (s)', 'Text'))
annot = self.annotations[index]
line = string_format_data.format(annot.onset, annot.duration, annot.text)
print(line)
[docs] def print_all_annotations(self):
"""
Print all annotations at once using a pandas dataframe.
"""
# Make sure it prints all rows and columns.
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print(self.to_dataframe())
[docs] def remove(self, patterns, case_sensitive=True, inplace=False):
"""
Filter the annotation by looking for matching patterns in the texts.
Args:
patterns (str or list): pattern(s) to find matches for.
inplace (bool, optional): if True, removes the annotations in place. If False, a new AnnotationSet
object without the removed annotations is returned.
Defaults to False.
Returns:
as_out (edfreadpy.AnnotationSet): new AnnotationSet without annotations that match the specified
pattern (only retruned if inplace is False).
Examples:
>>> annot_set = AnnotationSet()
>>> annot_set.append(Annotation(onset=10.25, duration=100, text='QS'), inplace=True)
>>> annot_set.append(Annotation(110.25, 200, 'AS'), inplace=True)
>>> annot_set.append(Annotation(500, 300, 'QS'), inplace=True)
>>> annot_set.append(Annotation(800, 10, 'AS'), inplace=True)
>>> annot_set.print_all_annotations()
onset duration text
0 10.25 100 QS
1 110.25 200 AS
2 500.00 300 QS
3 800.00 10 AS
>>> as_new = annot_set.remove('QS')
>>> as_new.print_all_annotations()
onset duration text
0 110.25 200 AS
1 800.00 10 AS
"""
return self.filter(patterns=patterns, how='remove', case_sensitive=case_sensitive, inplace=inplace)
[docs] def replace_text_by_index(self, indices, new_text, inplace=False):
"""
Replace the text of annotations with index in `indices` with `new_text`.
Args:
indices (list): list with sorted indices that correspond to the to be replaced annotations in the
annotations list.
new_text (str): new text for the annotations that are to be replaced.
inplace (bool, optional): if True, replaces the annotation texts in place. If False, a new AnnotationSet
object with the replaced annotation texts is returned.
Defaults to False.
Returns:
replaced_annotations (nnsa.AnnotationSet): new AnnotationSet object with the replaced annotation texts
(if inplace is False).
"""
if inplace:
replaced_annotations = self
else:
# Create a copy of the annotations (to preserve the original ones).
replaced_annotations = copy.deepcopy(self)
for idx in indices:
replaced_annotations[idx].text = new_text
if not inplace:
return replaced_annotations
[docs] def replace_in_range(self, replace_with, begin, end, resize=False, inplace=False):
"""
Replace the texts of annotations that overlap with the given range with the specified text.
Note: by default, replaces the entire annotation in case of any overlap, see `resize` parameter.
Args:
replace_with (str): new text for annotations overlapping with the specified range.
begin (float): begin of range.
end (float): end of range.
resize (bool, optional): if True, the annotations that partially overlap are resized so that
they fit. If False, all annotations with any overlap are removed completely.
inplace (bool, optional): if True, replace the annotations in place. If False, a new AnnotationSet
object with the new annotations is returned.
Defaults to False.
Returns:
as_out (nnsa.AnnotationSet): new object where all annotations that overlap with the range are
replaced seconds (if inplace is False).
Examples:
>>> ans = AnnotationSet([Annotation(0, 20, '1'), Annotation(20, 50, '2'), Annotation(100, 110, '3')])
>>> ans.replace_in_range('NaN', begin=0, end=10)
AnnotationSet with label: "annotations" and annotations:
onset duration text
0 0 20 NaN
1 20 50 2
2 100 110 3
>>> ans.replace_in_range('NaN', begin=0, end=10, resize=True)
AnnotationSet with label: "annotations" and annotations:
onset duration text
0 0 0 1
1 0 10 NaN
2 10 10 1
3 20 50 2
4 100 110 3
>>> ans.replace_in_range('NaN', begin=0, end=30)
AnnotationSet with label: "annotations" and annotations:
onset duration text
0 0 20 NaN
1 20 50 NaN
2 100 110 3
>>> ans.replace_in_range('NaN', begin=0, end=30, resize=True)
AnnotationSet with label: "annotations" and annotations:
onset duration text
0 0 0 1
1 0 30 NaN
2 30 50 2
3 100 110 3
>>> ans.replace_in_range('NaN', begin=60, end=70)
AnnotationSet with label: "annotations" and annotations:
onset duration text
0 0 20 1
1 20 50 NaN
2 100 110 3
>>> ans.replace_in_range('NaN', begin=60, end=70, resize=True)
AnnotationSet with label: "annotations" and annotations:
onset duration text
0 0 20 1
1 20 40 2
2 60 10 NaN
3 70 0 2
4 100 110 3
"""
if inplace:
as_out = self
else:
# Create a copy of the current object (to preserve the original one).
as_out = copy.deepcopy(self)
# Begin and end times of annotations.
begin_all = as_out.onsets()
end_all = begin_all + as_out.durations()
# Find overlapping annotations.
case_1 = np.logical_and(begin < begin_all, begin_all < end) # Overlaps start of an annotation.
case_2 = np.logical_and(begin < end_all, end_all < end) # Overlaps end of an annotation.
case_3 = np.logical_and(begin_all <= begin, end <= end_all) # Happens within an annotation.
case_1_or_2 = np.logical_or(case_1, case_2)
idx_overlap = np.where(np.logical_or(case_1_or_2, case_3))[0]
an_to_add = []
for idx in idx_overlap:
if not resize:
# Replace texts of annotations with any overlap.
as_out.annotations[idx].text = replace_with
else:
# Insert the replace text as a new annotations and resize any overlapping annotations so that there
# is no overlap.
c1 = case_1[idx]
c2 = case_2[idx]
c3 = case_3[idx]
an = as_out.annotations[idx]
if c1 and not c2:
# Adjust onset of annotation if overlap at start of annotation, but not at end.
an.onset = end
if c2 and not c1:
# Adjust duration of annotation if overlap at end of annotation, but not at start.
an.duration = begin - an.onset
if c1 and c2:
# Replace entire annotation due to complete overlap with text.
an.text = replace_with
if c3:
# Split annotation in two: one before and one after the new annotation.
# Change the current annotation.
an_duration = an.duration
an.duration = begin - an.onset
an_to_add.append(Annotation(text=an.text, onset=end, duration=an.onset+an_duration-end))
if resize:
# Add annotation.
as_out.append(Annotation(text=replace_with, onset=begin, duration=end-begin), inplace=True)
as_out.extend(an_to_add, inplace=True)
as_out.sort(inplace=True)
if not inplace:
return as_out
[docs] def replace_texts(self, texts, new_text, inplace=False):
"""
Replace the text of all annotations with of one `texts` with `new_text`.
Args:
texts (str or list): text of annotation to replace or a list of text to replace.
new_text (str): new text for the annotations that are to be replaced.
inplace (bool, optional): if True, replaces the annotation texts in place. If False, a new AnnotationSet
object with the replaced annotation texts is returned.
Defaults to False.
Returns:
replaced_annotations (nnsa.AnnotationSet): new AnnotationSet object with the replaced annotation texts
(if inplace is False).
"""
if inplace:
replaced_annotations = self
else:
# Create a copy of the annotations (to preserve the original ones).
replaced_annotations = copy.deepcopy(self)
if isinstance(texts, str):
# Convert to list.
texts = [texts]
for annot in replaced_annotations:
if annot.text in texts:
annot.text = new_text
if not inplace:
return replaced_annotations
[docs] def shade_axis(self, begin=None, end=None, labels_mapping=None, time_scale='seconds', **kwargs):
"""
Shade the areas in the current axis corresponding to the annotations.
Args:
begin (float, optional): begin time (in same units as `time_scale`) to start shading.
If None, the time of the first annotation is used.
Defaults to None.
end (float, optional): end time (in same units as `time_scale`) to end shading.
If None, the end time of the last annotation is used.
Defaults to None.
labels_mapping (dict): a dictionary mapping each to be included annotation text to a color.
If None, all unique annotations will be assigned a unique color.
Defaults to None.
time_scale (str, optional): the time scale to use. Choose from 'seconds', 'minutes', 'hours'.
Defaults to 'seconds'.
**kwargs (optional): optional keyword arguments for shade_axis().
"""
# Sort annotations on time.
annot_set = self.sort()
# Extract onsets, durations and labels arrays.
onsets = annot_set.onsets()
durations = annot_set.durations()
labels = annot_set.texts()
# Convert to requested time scale.
onsets = convert_time_scale(onsets, time_scale=time_scale)
durations = convert_time_scale(durations, time_scale=time_scale)
# Default args.
if begin is None:
begin = onsets[0]
if end is None:
end = onsets[-1] + durations[-1]
else:
end = np.min([end, onsets[-1] + durations[-1]])
# Remove periods before begin and after end.
idx_begin = np.max([np.argmax(onsets > begin) - 1, 0])
durations[idx_begin] -= begin - onsets[idx_begin]
onsets[idx_begin] = begin
idx_end = np.min([np.argmax(onsets + durations >= end), len(onsets) - 1])
durations[idx_end] = end - onsets[idx_end]
onsets = onsets[idx_begin: idx_end+1]
durations = durations[idx_begin: idx_end+1]
labels = labels[idx_begin: idx_end+1]
# Create labels_mapping (map annotation text to a color).
if labels_mapping is None:
unique_class_labels = np.unique(labels).tolist()
if NO_LABEL in unique_class_labels:
unique_class_labels.remove(NO_LABEL)
labels_mapping = dict(
(
(label, 'C{}'.format(i))
for i, label in enumerate(unique_class_labels)
)
)
# Call the function that shades the axis.
shade_axis(onsets, durations, labels, color=labels_mapping,
**kwargs)
[docs] def sleep_stages(self, **kwargs):
"""
Convert the annotations to sleep stages and return the sleep stages as a SleepStagesResult object.
This is a wrapper that prepares the input for SleepStages.sleep_stages() and returns the result.
Args:
**kwargs (optional): optional keyword arguments to overrule default parameters of the SleepStages class.
Returns:
result (nnsa.SleepStagesResult): SleepStagesResult object containing annotations related to
sleep stages.
"""
# Initialize SleepStages object (updates default parameters with user specified keyword arguments).
# Import locally since sleep_stages.py also imports from this file.
from nnsa.feature_extraction.sleep_stages import SleepStages
sleep_stages = SleepStages(**kwargs)
# Extract the sleep stages.
result = sleep_stages.sleep_stages(self)
return result
[docs] def sort(self, inplace=False):
"""
Sort the annotations based on onset.
Args:
inplace (bool, optional): if True, sorts the annotations in place. If False, a new AnnotationSet object with
the sorted annotations is returned.
Defaults to False.
Returns:
sorted_annotations (edfreadpy.AnnotationSet): new AnnotationSet object with the sorted annotations
(if inplace is False).
"""
# Function that extracts onset for sorting.
def f(x): return x.onset
if inplace:
self.annotations.sort(key=f)
else:
# Create a copy of the annotations (to preserve the original ones).
sorted_annotations = copy.deepcopy(self)
# Sort by onset.
sorted_annotations.annotations.sort(key=f)
return sorted_annotations
[docs] def standardize_annotations(self, inplace=False):
"""
Convert the annotations to standard labels and return a new AnnotationSet with the standard labels.
Args:
inplace (bool, optional): if True, standardizes the annotations in place. If False, a new AnnotationSet
object with the standardized annotations is returned.
Defaults to False.
Returns:
standardized_annotations (nnsa.AnnotationSet): new AnnotationSet object with standard annotations/labels
(if inplace is False).
"""
if inplace:
standardized_annotations = self
else:
# Create a copy of the annotations (to preserve the original ones).
standardized_annotations = copy.deepcopy(self)
for annot in standardized_annotations:
# Standardize text if not already standardized.
if annot.text not in STANDARD_ANNOTATIONS.values():
annot.text = standardize_annotation(annot.text)
if not inplace:
return standardized_annotations
[docs] def start_time(self):
"""
Return the onset time of the first annotation (in seconds).
Returns:
(float): onset time of first annotation (in seconds).
"""
return self.sort().annotations[0].onset
[docs] def subtract_offset(self, offset, inplace=False):
"""
Alias for self.subtract_time() for backwards compatibility reasons.
"""
return self.subtract_time(dt=offset, inplace=inplace)
[docs] def subtract_time(self, dt, inplace=False):
"""
Subtract a constant amount of time from the onset of each annotation in the annotation set.
May be used to perfectly align the annotations with the recorded signals, in case the recording of the signals
did not start at the exact (whole) second defined as the starttime the fileheader. In EDF+ files, the EDF
Annotation channel contains the (fractional) onset of each datarecord w.r.t. the starttime defined in the file
header.
Args:
dt (float): the amount of time to subtract from the annotation onsets (in seconds).
inplace (bool, optional): if True, subtracts the offset from the annotations in place. If False, a new
AnnotationSet object with the edited annotations is returned.
Defaults to False.
Returns:
annotation_set (edfreadpy.AnnotationSet): new AnnotationSet object with the edited annotations
(if inplace is False).
"""
if inplace:
annotation_set = self
else:
# Create a copy of the annotation set (to preserve the original one).
annotation_set = copy.deepcopy(self)
# Change the onset of each Annotation object in place.
for annot in annotation_set.annotations:
annot.onset -= dt
if not inplace:
return annotation_set
[docs] def texts(self):
"""
Return the texts of all annotations in the set.
Returns:
(np.ndarray): the texts of all annotations in the set.
"""
return np.array([a.text for a in self.annotations])
[docs] def to_dataframe(self):
"""
Return a DataFrame with the annotations.
Returns:
(pd.DataFrame): DataFrame object with annotations.
"""
onsets = []
durations = []
texts = []
for annot in self:
onsets.append(annot.onset)
durations.append(annot.duration)
texts.append(annot.text)
return pd.DataFrame({'onset': onsets, 'duration': durations, 'text': texts})
[docs] def to_df(self):
"""
Alias for self.to_dataframe().
"""
return self.to_dataframe()