"""
Module for reading EDF(+) files.
"""
import os
import sys
from abc import ABC, abstractmethod
import datetime
import warnings
from functools import partial
import numpy as np
import pyprind
from nnsa.edfreadpy.anonymization.anonymization import extract_patient_file_id, compute_anonymized_date
from nnsa.edfreadpy.io.config import default_edf_file_header, default_edf_signal_header, EDFPLUS_TYPES, MONTH_TO_INT, \
INT_TO_MONTH, EDF_EXTENSIONS
from nnsa.edfreadpy.io.utils import standardize_and_check_label
from nnsa.annotations.annotation import Annotation
from nnsa.annotations.annotation_set import AnnotationSet
import copy
__all__ = [
'BaseReader', 'EdfReader',
]
[docs]class BaseReader(ABC):
"""
Abstract base class for readers of time series data.
"""
def __init__(self):
pass
# For use with context manager.
def __enter__(self):
return self
# For use with context manager.
def __exit__(self, exc_type, exc_val, exc_tb):
self.close_file()
def __del__(self):
self.close_file()
'''
Required methods: when developing a new child of this class, the following abstract methods
must be defined in the child class.
'''
[docs] @abstractmethod
def close_file(self):
"""
Close the data file (if opened).
"""
raise NotImplementedError
[docs]class EdfReader(BaseReader):
"""
High-level interface for reading EDF(+) files.
Args:
filepath (str): path to the EDF(+) file to read.
"""
def __init__(self, filepath):
# Call parent's __init__.
super().__init__()
# Store filepath.
self.filepath = filepath
# The following protected variables will be populated with data only once there are needed and should not be
# accessed directly by the user. Instead, they should be accessed via their corresponding property or method.
self._additional_info = None # Additional file and signal info (dict).
self._all_digital_data = None # Digital data values of all datarecords, all signals (np.ndarray).
self._file_header = None # File header (dict).
self._signal_headers = None # Signals headers (dict).
# The following protected variables should not be accessed or used by the user directly.
self._fid = None # Handle to (opened) file (file object).
# The following private variables should not be accessed, changed or used by the user directly.
self.__bytes_per_sample = 2 # In EDF files, the number of bytes per data sample is 2.
# Warn if the file is not anonymized (according to self.is_anonymized() method).
if not self.is_anonymized:
msg = '\nFile "{}" is possibly not anonymized. Pat id: "{}".'.format(filepath, self.file_header['patient_id'])
warnings.warn(msg)
def __repr__(self):
"""
Return a comprehensive info string about this object.
Returns:
(str): a comprehensive info string about this object.
"""
open_or_closed = 'closed' if self._fid.closed else 'open'
return '{} for file {} ({}).'.format(self.__class__.__name__, self.filepath, open_or_closed)
@property
def additional_info(self):
"""
Return additional file and signal info.
Returns:
(dict): Dictionary with additional file and signal info.
"""
if self._additional_info is None:
self._additional_info = self._collect_additional_info()
return self._additional_info
@property
def encoding(self):
"""
Encoding is ascii by EDF convention.
"""
return 'ascii'
@property
def file_header(self):
"""
Return the file header (as a dictionary).
Returns:
(dict): file header.
"""
if self._file_header is None:
self._file_header = self._read_file_header()
return self._file_header
@property
def is_anonymized(self):
"""
Check if EDF is anonymized.
Returns:
(bool): True if anonymization of EDF file is detected, False if not.
"""
if '+' in self.additional_info['filetype']:
# Ignore the birthdate for now.
pat_id_dict = self.additional_info['patient_id']
pat_id = pat_id_dict['code'] + pat_id_dict['sex'] + pat_id_dict['name']
# Check birthdate. It's definitely anonymous if the default birthdate was used, or if the birthdate
# lies in the future. Note that the date can still be anonymous even if its in the past (the date was
# anonymized by adding a random amount of days to it, so for most file the anonymized data will lie
# in the future, for now...).
is_default_birthdate = pat_id_dict['birthdate'] == datetime.date(1900, 1, 1)
is_future_birthdate = pat_id_dict['birthdate'] > datetime.date.today()
birthdate_is_anonymized = is_default_birthdate or is_future_birthdate
else:
pat_id = self.file_header['patient_id']
birthdate_is_anonymized = True # No birthdate explicitly in the pat id for regular EDF files.
pat_id = pat_id.lower().replace(' ', '')
num_unique_chars = len(set(pat_id))
if num_unique_chars < 2 or ('anonym' in self.file_header['patient_id'].lower()):
pat_id_is_anonymized = True
else:
pat_id_is_anonymized = False
return pat_id_is_anonymized and birthdate_is_anonymized
@property
def is_discontinuous(self):
"""
Check if EDF is discontinuous.
Returns:
(bool): True if discontinuous, False if not.
"""
return self.additional_info['filetype'] == 'EDF+D'
@property
def signal_headers(self):
"""
Return the signals headers (as a dictionary of lists).
E.g. the label of signal 3 is in self.signal_header['label'][3]
Returns:
(dict): signal headers. Each value in the dict is a list, corresponding to the signals.
"""
if self._signal_headers is None:
self._signal_headers = self._read_signal_headers()
return self._signal_headers
@property
def size(self):
"""
Return the filesize in bytes.
"""
return os.path.getsize(self.filepath)
@property
def total_duration(self):
"""
Return the total duration of the recording (in seconds).
"""
return self.additional_info['total_duration']
[docs] def append_and_save(self, filepath_out, *args, allow_duplicates=False, overwrite=False,
hdr_updates_bytes=None, sig_hdr_updates=None, verbose=1):
"""
Append signals to the EDF and save.
Args:
filepath_out (str): filepath to save to.
*args (dict): tuple of dicts with data for signals to append (one dict for each signal).
The dicts should have required fields:
"signal", "fs", "label".
Optional fileds are:
"transducer", "physical_dimension", "physical_min", "physical_max",
"digital_min", digital_max", "prefilter", "reserved".
See the EDF specs on their website for meaning of the fields.
allow_duplicates (bool): if False, raises an error if signals are appended with labels
that already exist in the EDF file.
overwrite (bool): if False, raises an error if output file already exist.
If True, overwrites any existing EDF file with same name.
hdr_updates_bytes (dict): optional updates for the file header (in bytes).
verbose (int): verbosity level.
Examples:
filepath = '<filepath>.EDF'
filepath_out = 'test.EDF'
signals = (
dict(signal=np.random.rand(15000), fs=10, label='test'),
dict(signal=np.random.randint(0, 4, 1500), fs=1, label='test2'),
)
with EdfReader(filepath) as r:
r.append_and_save(filepath_out, *signals)
"""
# args are the signals.
signals = args
# Check file path (create dir if not exists, error when file already exists, check valid extension).
_check_filepath_edf(filepath_out, overwrite=overwrite)
# Read/get original file header (also in bytes).
hdr = self.file_header.copy()
hdr_bytes = self._read_file_header(convert_bytes=False).copy()
# Signal headers.
sig_hdr = copy.deepcopy(self.signal_headers)
# Update headers.
if hdr_updates_bytes is not None:
hdr_bytes.update(hdr_updates_bytes)
if sig_hdr_updates is not None:
sig_hdr.update(sig_hdr_updates)
# Read all original digital data.
if verbose:
print('Reading data...')
all_data = self._get_all_digital_data()
if verbose:
print('Appending new data...')
# Update entries that change when adding signals.
encoding = self.encoding
hdr_bytes.update({
'size_header': bytes('{:.0f}'.format(hdr['size_header'] + len(signals) * 256).ljust(8), encoding=encoding),
'num_signals': bytes(str(hdr['num_signals'] + len(signals)).ljust(4), encoding=encoding),
})
hdr = self._convert_raw_header(hdr_bytes.copy())
# Process signals.
for sig_data in signals:
sig_hdr, all_data = self._append_signal_data(sig_data=sig_data, file_hdr=hdr, sig_hdr=sig_hdr, all_data=all_data, allow_duplicates=allow_duplicates)
# Save to file.
self._save(filepath_out=filepath_out, hdr_bytes=hdr_bytes,
sig_hdr=sig_hdr, all_data=all_data, verbose=verbose)
@staticmethod
def _append_signal_data(sig_data, file_hdr, sig_hdr, all_data, allow_duplicates=False):
"""
Adds the signal in `sig_data` to `all_data` matrix and the `sig_hdr` dict (for saving EDF files).
Args:
sig_data (dict): data of the signal to append.
The dict should have required fields:
"signal" (signal matrix (physical values)), "fs" (sampling freq in Hz), "label" (str).
Optional fileds are:
"transducer", "physical_dimension" or "unit", "physical_min", "physical_max",
"digital_min", digital_max", "prefilter", "reserved".
See the EDF specs on their website for meaning of the fields.
file_hdr (dict): the file header of the EDF file to save to (will not change by this function).
sig_hdr (dict): the signal header of the EDF file to save to (will change by this function).
If there are no existing signals, use an empty defaultdict for lists: sig_hdr=defaultdict(list).
all_data (np.ndarray): 2D array containing the digital data with shape (num_datarecords, num_samples).
The new signal will be appended to this matrix. If there are no existing data,
use an empty matrix all_data=np.zeros((num_datarecords, 0), dtype=np.int16).
allow_duplicates (bool): if False, raises an error when trying to append a signal with a label that already
exists (in the sig_hdr).
Returns:
sig_hdr (dict): the updated signal header.
all_data (np.ndarray): the updated digital data matrix.
"""
# Extract required fields.
sig_phys = np.asarray(sig_data['signal']).squeeze()
fs = sig_data['fs']
label = sig_data['label']
if 'unit' in sig_data.keys() and 'physical_dimension' not in sig_data.keys():
# physical_dimension and unit are the same, but it needs to be attributed to "physical_dimension".
sig_data['physical_dimension'] = sig_data['unit']
# Some checks.
if sig_phys.ndim != 1:
raise ValueError('Signal should be 1-dimensional. Got a signal with shape {}.'
.format(sig_phys.shape))
if len(label) > 16:
msg = '\nLabel "{}" is longer than 16 characters. ' \
'Cutting off all characters after the 16th to fit in EDF header...'.format(label)
warnings.warn(msg)
if label in sig_hdr['label'] and not allow_duplicates:
raise ValueError('Signal "{}" already exists in EDF file. Enable `overwrite_signals` to suppress '
'this error and overwrite the signal.'.format(label))
# Get dimensions (use defaults if not given).
dig_min = sig_data.get('digital_min', -32768)
dig_max = sig_data.get('digital_max', 32767)
phys_min = sig_data.get('physical_min', int(np.floor(np.nanmin(sig_phys))))
phys_max = sig_data.get('physical_max', int(np.ceil(np.nanmax(sig_phys))))
# Physical min and max cannot be the same, so change one of them if they are.
if phys_min == phys_max:
phys_min -= 1
# Verify that the values fit in 2-bytes.
if dig_min < -32768 or dig_max > 32767:
raise ValueError('digital_min and digital_max should be within [-32768, 32767], but got {}.'
.format([dig_min, dig_max]))
# Warning if signal exceeds the physical limits.
if np.min(sig_phys) < phys_min:
msg = f'Signal contains values lower than physical_minimum ({phys_min}).'
warnings.warn(msg)
elif np.max(sig_phys) > phys_max:
msg = f'Signal contains values larger than physical_maximum ({phys_max}).'
warnings.warn(msg)
# Number of samples in datarecord.
num_samples = sig_data['fs'] * file_hdr['duration_datarecord']
if not float(num_samples).is_integer():
raise ValueError('Signal "{}" with fs={} Hz does not fit in datarecord with duration {} s.'
.format(label, fs, file_hdr['duration_datarecord']))
else:
num_samples = int(num_samples)
# Update signal header.
sig_hdr['label'].append(label)
sig_hdr['transducer'].append(sig_data.get('transducer', ''))
sig_hdr['physical_dimension'].append(sig_data.get('physical_dimension', ''))
sig_hdr['physical_min'].append(phys_min)
sig_hdr['physical_max'].append(phys_max)
sig_hdr['digital_min'].append(dig_min)
sig_hdr['digital_max'].append(dig_max)
sig_hdr['prefilter'].append(sig_data.get('prefilter', ''))
sig_hdr['num_samples'].append(num_samples)
sig_hdr['reserved'].append(sig_data.get('reserved', ''))
# Transform signal to digital domain.
sig_dig = EdfReader._convert_physical_to_digital(
sig_phys, physical_min=float(phys_min), physical_max=float(phys_max),
digital_min=int(dig_min), digital_max=int(dig_max))
# Check size of signal.
num_datarecords = file_hdr['num_datarecords']
tot_samples = num_datarecords * num_samples
if abs(len(sig_dig) - tot_samples) > num_samples: # Difference is greater than one datarecord.
msg = 'Number of samples in signal to add does not correspond to length of other signals in EDF. \n' \
'Expected number of samples: {}. Got {} samples (for signal "{}").' \
.format(tot_samples, len(sig_dig), label)
raise ValueError(msg)
if len(sig_dig) < tot_samples:
# Append zeros.
sig_dig = np.concatenate([sig_dig, np.zeros(tot_samples - len(sig_dig), dtype=np.int16)])
elif len(sig_dig) > tot_samples:
# Cut off end.
sig_dig = sig_dig[:tot_samples]
assert len(sig_dig) == tot_samples
# Reshape to shape (num_datarecords, num_samples).
sig_dig.shape = (num_datarecords, num_samples)
# Add signal to signal matrix.
all_data = np.concatenate([all_data, sig_dig], axis=-1)
return sig_hdr, all_data
[docs] def anonymize(self, seed_offset, extract_patient_id_fun=extract_patient_file_id, **kwargs):
"""
Anonymize the header information in place (does not save to a new EDF file, but adapt the info in memory).
Changes the startdate, patient_id and in case of EDF+ also the startdate in recording_id.
Args:
seed_offset (int): this seed offset will be added to the seed of the random generator when changing
the dates. Therefore, you can only trace back the original date if you know what the `seed_offset` was
when this function was called (such that you cannot trace the original date back from just this code).
extract_patient_id_fun (function): optional function that take in the (absolute) filepath of the
EDF and **kwargs, and returns the id of the patient. This ID will then be used to seed the
random generator when changing the date to a random date, such that the date change is the same
for files from the same patient. If set to False or None, the date randomnization is completely random.
**kwargs: for extract_patient_id_fun() (if specified).
"""
# Read/get header in normal format.
hdr = self.file_header
if not extract_patient_id_fun:
# Generate a random id.
patient_file_id = str(np.random.randint(1e6))
else:
# Extract patient file id from filepath.
patient_file_id = extract_patient_id_fun(self.filepath, **kwargs)
print('Anonymize date with id {}'.format(patient_file_id))
# Compute anonymized startdate.
anonymized_startdate = compute_anonymized_date(
hdr['startdate'], id=patient_file_id, seed_offset=seed_offset)
hdr['startdate'] = anonymized_startdate
# Compute anonymized birthdate.
if 'patient_id' in self.additional_info:
if self.additional_info['patient_id']['birthdate'] == datetime.date(year=1900, month=1, day=1):
# This is a default date, meaning that the date was not specified.
patient_birthdate = 'X'
elif self.additional_info['patient_id']['birthdate'] == 'X':
patient_birthdate = 'X'
else:
anonymized_birthdate = compute_anonymized_date(
self.additional_info['patient_id']['birthdate'], id=patient_file_id, seed_offset=seed_offset)
patient_birthdate = self._datetime_to_datestring(anonymized_birthdate, date_format='dd-MMM-yyyy')
else:
patient_birthdate = 'X'
# Anonymize patient ID. Use the EDF+ format: patient's code, sex, birthdate, name are separated by a space and
# use 'X' for anonymized fields.
patient_code = 'X'
patient_sex = 'X'
patient_name = 'X'
additional = '' # This removes any additional info that was potentially there (assuming this is could also contain sensitive info).
hdr['patient_id'] = ' '.join([patient_code, patient_sex, patient_birthdate,
patient_name, additional])
# Replace startdate in recording id field in case of EDF+ file.
if self.additional_info['filetype'] in EDFPLUS_TYPES:
# Subfields are separated by spaces.
recording_id_list = hdr['recording_id'].split(' ')
# Replace startdate, which is the second subfield.
recording_id_list[1] = self._datetime_to_datestring(anonymized_startdate, date_format='dd-MMM-yyyy')
# Concatenate EDF+ recording id subfields again using a space as separator and save to file header.
hdr['recording_id'] = ' '.join(recording_id_list)
else:
hdr['recording_id'] = 'X'
# Reset collected additional info, as it may have changed due to anonymization.
self._additional_info = None
[docs] def anonymize_and_save(self, filepath_out, seed_offset, check_is_anonymized=True, skip_anonymized=False, **kwargs):
"""
Anonymize the header information in place and save to a new EDF file.
Changes the startdate, patient_id and in case of EDF+ also the startdate in recording_id.
Args:
filepath_out (str): file path for the new anonymized EDF file.
seed_offset (int): see self.anonymize().
check_is_anonymized (bool): if True, only anonymizes the header and date if not self.is_anonymized().
skip_anonymized (bool): if True, checks if anonymized and only saves new files for EDFs that were not yet
anonymized. If False, a new file is always saved to `filepath_out` (also when the file
already was anonymized, i.e., a copy is made).
**kwargs: for self.anonymize().
"""
encoding = self.encoding
if skip_anonymized:
check_is_anonymized = True
# Anonymized if not already done (or requested not to check).
if check_is_anonymized and self.is_anonymized:
if skip_anonymized:
print(f'Skipping file with patient_id {self.file_header["patient_id"]} which is already anonymized.')
return
else:
pass # Continue to save the file.
else:
self.anonymize(seed_offset=seed_offset, **kwargs)
# Read/get header in normal format.
hdr = self.file_header
# Read original header in bytes.
hdr_bytes = self._read_file_header(convert_bytes=False)
# Go to the beginning of the first signal headers in EDF file (257th byte).
fid = self._get_fid()
fid.seek(256, 0)
# Read rest of the EDF after file header (this will contain the signals headers and the data, which do not
# require anonymization).
rest_bytes = fid.read()
# Partition anonymized startdate as dd.mm.yy.
startdate_string = self._datetime_to_datestring(hdr['startdate'], date_format='dd.mm.yy')
# Replace the sensitive original fields with their anonymized values/texts (in bytes).
hdr_bytes['startdate'] = bytes(startdate_string, encoding=encoding)
hdr_bytes['patient_id'] = bytes(hdr['patient_id'].ljust(80), encoding=encoding)
hdr_bytes['recording_id'] = bytes(hdr['recording_id'].ljust(80), encoding=encoding)
# Concatenate hdr_bytes and rest_bytes and write to new EDF.
hdr_bytes_line = b''.join([hdr_bytes['version'],
hdr_bytes['patient_id'],
hdr_bytes['recording_id'],
hdr_bytes['startdate'],
hdr_bytes['starttime'],
hdr_bytes['size_header'],
hdr_bytes['reserved'],
hdr_bytes['num_datarecords'],
hdr_bytes['duration_datarecord'],
hdr_bytes['num_signals']])
file_bytes_line = b''.join([hdr_bytes_line, rest_bytes])
# Check file path (create dir if not exists, error when file already exists, check valid extension).
_check_filepath_edf(filepath_out)
# Write to file.
with open(filepath_out, 'wb') as f:
f.write(file_bytes_line)
[docs] def close(self):
"""
Close the EDF file (if opened).
"""
self.close_file()
[docs] def close_file(self):
"""
Close the EDF file (if opened).
"""
self.flush_all_digital_data()
if self._fid:
self._fid.close()
[docs] def flush_all_digital_data(self):
"""
Do no longer store the raw digital data values in memory.
"""
self._all_digital_data = None
[docs] def extract_epoch_and_save(self, filepath_out, begin=0, end=None, overwrite=False, verbose=1):
"""
Read the data, extract an epoch (e.g. the first 4 hours), and save the epoch to an EDF file.
Args:
filepath_out (str): filepath to save to.
begin (float): starttime of epoch in seconds, relative to the start of the recording.
end (float, None): endtime of epoch in secpnds, relative to the start of the recording.
If None, takes the end of the recording.
overwrite (bool): If True, overwrites any existing EDF file with same name.
If False, raise error if filepath already exist.
verbose (int): verbosity level.
"""
if overwrite and os.path.isfile(filepath_out):
raise FileExistsError(f'File {filepath_out} already exists. Preventing overwrite. Change the filepath or set overwrtie=True.')
num_samples_in_datarecord = sum(self.signal_headers['num_samples'])
num_datarecords = self.file_header['num_datarecords']
duration_datarecord = self.file_header['duration_datarecord']
# Start index.
start = int(begin / duration_datarecord)
# Default for stop is the end of the recording.
stop = num_datarecords if end is None else min([int(end/duration_datarecord), num_datarecords])
# Mimic array indexing: when start or stop is negative, it specifies the offset w.r.t. the end of the signal.
start = num_datarecords + start if start < 0 else start
stop = num_datarecords + stop if stop < 0 else stop
# Check input.
if (start > num_datarecords) or (start < 0):
raise ValueError('begin={} is not compatible with recording duration ({}).'
.format(begin, num_datarecords*duration_datarecord))
if (stop > num_datarecords) or (stop < 0):
raise ValueError('end={} is not compatible with recording duration ({}).'
.format(begin, num_datarecords*duration_datarecord))
if start >= stop:
raise ValueError('`begin` ({}) must lie before `end` ({}).'
.format(begin, end))
# Extract the digital signal data from the matrix containing all digital data.
all_data = self._get_all_digital_data()[start: stop]
# Header updates related to shortening of file.
hdr_bytes = self._read_file_header(convert_bytes=False)
hdr_bytes.update({
'num_datarecords': bytes('{:.0f}'.format(len(all_data)).ljust(8)[:8], encoding=self.encoding),
})
# Save.
self._save(filepath_out, hdr_bytes=hdr_bytes, sig_hdr=self.signal_headers, all_data=all_data, verbose=verbose, encoding=self.encoding)
[docs] def insert_annotations_and_save(self, filepath_out, annotations, reset_annotations=False,
overwrite=False, verbose=1):
"""
Inserts annotations in the EDF and save.
If there already exists an EDF Annotations signal, the new annotations are added (unless `reset_annotations` is set to True, see below).
If there does not exists an EDF Annotations signal, annotataions are created.
Args:
filepath_out (str): filepath to save to.
annotations (AnnotationSet, pd.DataFrame): AnnotationSet or pandas DataFrame with the following columns:
'onset': the starttime of the annotation (in seconds with repect to the start of recording).
'duration': the duration of the annotation (specify -1 if not applicable).
'text': annotation text.
reset_annotations (bool): if True, any existing annotations will be removed. If False not (new
annotations will be appended to the existing ones).
overwrite (bool): If True, overwrites any existing EDF file with same name.
If False, raise error if `filepath_out` already exist.
verbose (int): verbosity level.
Examples:
filepath = '<filepath>.EDF'
filepath_out = 'test.EDF'
annotations = pd.DataFrame({
'onset': [43.9],
'duration': [20],
'text': ['Hello there']})
with EdfReader(filepath) as r:
r.insert_annotations_and_save(filepath_out=filepath_out, annotations=annotations)
"""
# Check file path (create dir if not exists, error when file already exists, check valid extension).
_check_filepath_edf(filepath_out, overwrite=overwrite)
# To dataframe if needed.
if isinstance(annotations, AnnotationSet):
annotations = annotations.to_dataframe()
# Read/get original file header (also in bytes).
hdr = self.file_header
# Get number of datarecords.
num_datarecords = hdr['num_datarecords']
duration_datarecord = hdr['duration_datarecord']
# Check if there is an EDF Annotations channel.
annot_channel = self._get_annot_channel(raise_error=False)
if annot_channel is None:
# No EDF Annotations in file, need to create them.
if verbose:
print('Creating annotations...')
# Create annotations for each datarecord.
time_offset = 0 # Must be positive and < 1.
num_samples = 40 # Set large enough to fit any time-keeping annotation.
annot_bytes = []
for i in range(num_datarecords):
time_str = '+{}'.format(time_offset + i*duration_datarecord)
annot_b = time_str.encode('utf-8', 'strict') + b'\x14\x14' # Annotations should be encoded by UTF-8.
annot_b = annot_b.ljust(self.__bytes_per_sample*num_samples, b'\x00')
annot_bytes.append(annot_b)
else:
# Read all original digital data.
all_data = self._get_all_digital_data()
# Get indices of EDF Annotations signal.
num_samples_all = self.signal_headers['num_samples']
num_samples = num_samples_all[annot_channel]
idx_start = sum(num_samples_all[:annot_channel])
idx_stop = idx_start + num_samples_all[annot_channel]
# Get original annotations in int16 (shape num_datarecords, num_samples).
annot_int16 = all_data[:, idx_start: idx_stop]
# To list of bytes.
annot_bytes = [annot.tobytes() for annot in annot_int16]
if reset_annotations:
# Cut off any text annotations (only keep the first time keeping annotation).
# Use trailing '\x00' bytes to make all entries the same length.
annot_bytes = [annot_b[:annot_b.find(b'\x00') + 1].ljust(
num_samples * self.__bytes_per_sample, b'\x00') for annot_b in annot_bytes]
# Insert new annotations.
annot_bytes = self._insert_annotations(
annot_bytes, new_annotations=annotations)
if annot_channel is None:
# If originally no annotations in file, append EDF Annotations signal.
self._append_annotations_and_save(
filepath_out=filepath_out, annot_bytes=annot_bytes,
overwrite=overwrite, verbose=verbose)
else:
# Else if EDF Annotations already in file, replace them with the updated ones.
# Back to int16 for easier handling using numpy arrays.
new_annot_in16 = [np.frombuffer(annot_b, dtype=np.int16) for annot_b in annot_bytes]
# To shape (num_datarecords, num_samples).
new_annot_in16 = np.vstack(new_annot_in16)
assert len(new_annot_in16) == num_datarecords
sig_hdr = copy.deepcopy(self.signal_headers)
sig_hdr['num_samples'][annot_channel] = new_annot_in16.shape[1]
# Replace old with new annotations (size of annotations may be changed, so stack parts together).
all_data = np.hstack([all_data[:, :idx_start], new_annot_in16, all_data[:, idx_stop:]])
# Save to file.
hdr_bytes = self._read_file_header(convert_bytes=False)
self._save(filepath_out=filepath_out, hdr_bytes=hdr_bytes,
sig_hdr=sig_hdr, all_data=all_data, verbose=verbose)
[docs] def read_annotations(self, efficiency='speed', offset=0, annotations_label=None):
"""
Read annotations in an EDF+ file.
Note: EDF+ only. Will raise an error if no EDF Annotations channel is present in the file.
Note: by default the fractional offset of the start of the recording is subtracted from the annotation onset
times, assuming the time array of the loaded signals will start at zero exactly (whereas in the file, the
signals might start a fraction of a second later than reported by the starttime in the file header).
Args:
efficiency (str, optional): Specify which algorithm to use: 'speed' uses an algorithm optimized for speed
when reading annotations from a large file (see _read_annotations_max_speed), 'memory' uses an
algorithm that requires the least amount of memory (see _read_annotations_min_memory).
offset (float, optional): This offset value will be subtracted from the onset time of each annotation. If
None, this offset is inferred from the EDF file, such that the start of recording corresponds to
time 0 s (the offset is the start offset of recording, read from the annotations).
By default, the offset is 0, such that the times are used as in the annotations.
This means that the start of recording is not exactly at 0 s, but may lie between 0 and 1 s.
If synchronization between annotations and signals is important, either do one of the two:
1. Read the signal as a TimeSeries object (use the extension form the nnsa package) and read the
annotations with offset = 0. The time series object will contain the time_offset in its time array
(it does not start at exactly 0) and this time array will correspond to the onset times in the
annotation set.
Without using nnsa package:
2. For continuous signals:
Read the signal as an array with self.read_signal and read annotations with offset = None.
The time array of the signal will start at 0 seconds. Using the sampling frequency and
starttime = 0, you can create the time array that is compatible with the onset times in
the annotation set.
NOTE: This appraoch does not work for discontinuous signals.
3. For discontinuous signals:
Read the signals as with self.read_signal with discontinuous_mode to 'all', and read annotations
with offset = 0. Use self._get_discontinuous_timestamps() to get the starttime of each signal in
the returned list. Using the sampling frequency and these starttimes, you can create the time
arrays that are compatible with the onset times in the annotation set.
annotations_label (float, optional): Specify a label for the AnnotationSet that will be created. By default
the name of the investigator as saved in the EDF+ header is used.
Returns:
annotation_set (edfreadpy.AnnotationSet): Collection of annotations, which are stored as
edfreadpy.Annotation objects.
"""
# Extract relevant header information.
labels = self.signal_headers['label']
# Check for existence of EDF Annotations signal.
annot_channel = self._get_annot_channel()
# Read the annotations as bytes and collect the annotation bytes per datarecord in a list (annot_bytes).
if efficiency == 'speed':
# Read with maximum speed.
annot_bytes = self._read_annotation_bytes_max_speed(annot_channel)
elif efficiency == 'memory':
# Read with minimum memory requirement.
annot_bytes = self._read_annotation_bytes_min_memory(annot_channel)
else:
raise ValueError('Invalid efficiency value "{}". Chose from "speed", "memory".'.format(efficiency))
# Merge annotations of all datarecords. Put a b'\x00' (byte value 0) in between data records which indicates the
# end of a TAL at the end of a data record (we stripped the trailing b'\x00' bytes when reading the bytes).
merged_annotations = b'\x00'.join(annot_bytes)
# Default name/label for annotations.
if annotations_label is None:
annotations_label = 'Annotations in {}'.format(os.path.basename(self.filepath))
# Collect the annotations: scan for non-empty annotations and collect the annotation text, onset and duration as
# edfreadpy.Annotation objects in an AnnotationSet object.
annotation_set = AnnotationSet(annotations=_annotation_generator_edfplus(merged_annotations),
label=annotations_label)
# By default, read the offset that the signals have w.r.t. the beginning of the file by reading the timestamp
# of the first datarecord.
if offset is None:
offset = self._read_timestamp(annot_bytes[0])
if offset > 1:
raise AssertionError('Offset ({}) is larger than 1 second. The first annotation timestamp does '
'not seem to correspond to the start of recording. Find out why and fix this!'
.format(offset))
# Subtract the offset from the annotation onset times.
annotation_set.subtract_offset(offset, inplace=True)
return annotation_set
[docs] def read_signal(self, channel, start=0, stop=None, discontinuous_mode='longest', efficiency='speed',
verbose=0):
"""
Read a (part of a) signal from the EDF file.
Args:
channel (int or string): Specify which signal to read, by specifying its channel index (int) or channel
label (str).
start (int, optional): Specify the sample to start reading from (counting from 0).
stop (int, optional): Specify the sample to stop reading (the specified sample will not be read, but note
counting is from 0).
discontinuous_mode (str, optional): see self._handle_discontinuous_signal()
efficiency (str, optional): the algorithm to use for reading: 'speed' uses an algorithm optimized for speed
when loading a large portion of the signal (see _read_digital_data_max_speed), 'memory' uses an
algorithm that requires the least amount of memory (see _read_digital_data_min_memory).
Note that the 'memory' option may be faster than the 'speed' option when reading only a small part of
the signal. However, when reading multiple times from the same file (e.g. read multiple signals),
'speed' is probably fastest, even when reading only small parts, since this algorithm stores the raw
data of the entire file in memory the first time it's called.
verbose (int): verbosity level (when efficiency if 'memory'). If 1, shows a progress bar.
Returns:
signal_data (np.ndarray): Array holding the physical values of the specified signal.
"""
# Verify specified channel is in file and convert channel label to channel index if needed.
channel = self._check_channel(channel)
num_samples = self.signal_headers['num_samples'][channel]
signal_length = self.file_header['num_datarecords']*num_samples
# Default for stop is the end of the signal.
stop = signal_length if stop is None else min([stop, signal_length])
# Mimic array indexing: when start or stop is negative, it specifies the offset w.r.t. the end of the signal.
stop = signal_length + stop if stop < 0 else stop
start = signal_length + start if start < 0 else start
# Determine the datarecords to read.
datarecord_start = int(np.floor(start/num_samples))
datarecord_stop = int(np.ceil(stop/num_samples))
if efficiency == 'speed':
# Read with maximum speed.
signal_digital = self._read_digital_data_max_speed(channel, datarecord_start, datarecord_stop)
elif efficiency == 'memory':
# Read with minimum memory requirement.
signal_digital = self._read_digital_data_min_memory(channel, datarecord_start, datarecord_stop,
verbose=verbose)
else:
raise ValueError('Invalid efficiency value "{}". Chose from "speed", "memory".'.format(efficiency))
# Convert digital signal values to physical values.
signal_physical = self._convert_digital_to_physical(signal_digital,
self.signal_headers['physical_min'][channel],
self.signal_headers['physical_max'][channel],
self.signal_headers['digital_min'][channel],
self.signal_headers['digital_max'][channel])
if self.additional_info['filetype'] == 'EDF+D':
# Handle discontinuous signal.
signal_physical = self._handle_discontinuous_signal(
signal_physical, num_samples=num_samples,
datarecord_start=datarecord_start, datarecord_stop=datarecord_stop,
discontinuous_mode=discontinuous_mode, efficiency=efficiency)
return signal_physical
[docs] def reset_annotations_and_save(self, filepath_out, overwrite=False, verbose=1):
"""
Reset annotations in the EDF and save.
If there exists EDF Annotations, only the time keeping TAL is retained and any other annotations are removed.
If there does not exists an EDF Annotations field, a EDF Annotations field is created with time-kepping TAL,
with offset 0 s.
Args:
filepath_out (str): filepath to save to.
overwrite (bool): if False, raises an error if output file already exist.
If True, overwrites any existing EDF file with same name.
verbose (int): verbosity level.
"""
# Check file path (create dir if not exists, error when file already exists, check valid extension).
_check_filepath_edf(filepath_out, overwrite=overwrite)
# Read/get original file header (also in bytes).
hdr = self.file_header
hdr_bytes = self._read_file_header(convert_bytes=False)
# Get number of datarecords.
num_datarecords = hdr['num_datarecords']
duration_datarecord = hdr['duration_datarecord']
# Check if there is an EDF Annotations channel.
annot_channel = self._get_annot_channel(raise_error=False)
if annot_channel is None:
# No EDF Annotations in file, need to create them.
if verbose:
print('Creating annotations...')
# Create annotations for each datarecord.
time_offset = 0 # Must be positive and < 1.
num_samples = 40 # Set large enough to fit any time-keeping annotation.
annot_bytes = []
for i in range(num_datarecords):
time_str = '+{}'.format(time_offset + i*duration_datarecord)
annot_b = time_str.encode('utf-8', 'strict') + b'\x14\x14' # Annotations should be encoded by UTF-8.
annot_b = annot_b.ljust(self.__bytes_per_sample*num_samples, b'\x00')
annot_bytes.append(annot_b)
# Append the created annotations to the data and save to file.
self._append_annotations_and_save(
filepath_out=filepath_out, annot_bytes=annot_bytes, num_samples=num_samples,
overwrite=overwrite, verbose=verbose)
else:
# EDF Annotations already in file, need to reset them.
if verbose:
print('Resetting annotations...')
# Signal headers.
sig_hdr = self.signal_headers
# Read all original digital data.
all_data = self._get_all_digital_data()
# Get indices of EDF Annotations signal.
num_samples_all = sig_hdr['num_samples']
idx_start = sum(num_samples_all[:annot_channel])
idx_stop = idx_start + num_samples_all[annot_channel]
# Get original annotations in int16 (shape num_datarecords, num_samples).
annot_int16 = all_data[:, idx_start: idx_stop]
# Loop over datarecords.
new_annot_in16 = []
num_samples = num_samples_all[annot_channel]
for annot in annot_int16:
# To bytes.
annot_b = annot.tobytes()
# Cut off any text annotations (only keep the first time keeping annotation).
# Use trailing '\x00' bytes to make all entries the same length.
reset_annot_b = annot_b[:annot_b.find(b'\x00') + 1].ljust(
num_samples * self.__bytes_per_sample, b'\x00')
# Back to int16 for easier handling using numpy arrays.
new_annot_in16.append(np.frombuffer(reset_annot_b, dtype=np.int16))
# To shape (num_datarecords, num_samples).
new_annot_in16 = np.vstack(new_annot_in16)
assert len(new_annot_in16) == num_datarecords
# Replace old with new data.
all_data[:, idx_start: idx_stop] = new_annot_in16
# Save to file.
self._save(filepath_out=filepath_out, hdr_bytes=hdr_bytes, sig_hdr=sig_hdr, all_data=all_data)
def _append_annotations_and_save(self, filepath_out, annot_bytes, num_samples=None,
overwrite=False, verbose=1):
"""
Append a list of annotations in bytes to the file data and save to filepath.
Args:
annot_bytes (list): list of annotations in bytes according to the specs
(the length of the list must equal the number of datarecords).
"""
if num_samples is None:
num_samples = int(len(annot_bytes[0])/self.__bytes_per_sample)
assert all([len(b) == num_samples*self.__bytes_per_sample for b in annot_bytes])
assert len(annot_bytes) == self.file_header['num_datarecords']
# To int16 for easy concatenation to other data.
annot_int16 = np.frombuffer(b''.join(annot_bytes), dtype=np.int16)
# Set signal header information for EDF Annotations.
# For the sake of EDF compatibility, the fields 'digital minimum' and 'digital maximum' must be filled with
# -32768 and 32767, respectively. The 'Physical maximum' and 'Physical minimum' are set to same as digital,
# to keep the signal the same when converting physical to digital in self.append_and_save().
sig_data = {
'signal': annot_int16,
'fs': int(num_samples / self.file_header['duration_datarecord']),
'label': 'EDF Annotations',
'digital_min': -32768,
'digintal_max': 32767,
'physical_min': -32768,
'physical_max': 32767,
}
# Header updates related to conversion of EDF to EDF+
# (create subfields in patient and recording id and set the type to continuous EDF+).
encoding = self.encoding
hdr_bytes = self._read_file_header(convert_bytes=False)
hdr_updates_bytes = {
'patient_id': (bytes('X X X X ', encoding=encoding) + hdr_bytes['patient_id'])[:80],
'recording_id': (bytes('Startdate X X X X ', encoding=encoding) + hdr_bytes['recording_id'])[:80],
'reserved': (bytes('EDF+C', encoding=encoding) + hdr_bytes['reserved'])[:44],
}
# Update signal labels.
labels = self.signal_headers['label'].copy()
for idx in range(len(labels)):
labels[idx] = standardize_and_check_label(labels[idx])[0]
sig_hdr_updates = {
'label': labels,
}
# Append Annotations signal and save as if it were an ordinary signal.
self.append_and_save(filepath_out, sig_data, overwrite=overwrite, verbose=verbose,
hdr_updates_bytes=hdr_updates_bytes, sig_hdr_updates=sig_hdr_updates)
@staticmethod
def _bytes_to_string(s, encoding='ascii', strip=True):
"""
Convert bytes to string.
Args:
s (bytes): bytes encoding ASCII characters.
encoding (str, optional): the encodings to be used ('ascii' is default for EDF).
strip (bool, optional): if True (default), remove whitespaces.
Returns:
string (str): decoded string.
"""
# Type check.
if not isinstance(s, bytes):
raise ValueError('Unexpected type')
# Strip white spaces.
if strip:
s = s.strip()
try:
string = s.decode(encoding)
except UnicodeDecodeError as error:
# Workaround for common non-ascii characters in EDF files.
s_original = s
s = s.replace(b'\xb0', b'deg') # \xb0 is the degree symbol in ascii extended and unicode.
s = s.replace(b'O\x82', b'O2') # O\x82 most likely corresponds to O^2 (as in SpO2, etc.).
# Retry decoding.
# If the workaround did not work, we can simply replace the character by a ? character (see decode()).
string = s.decode(encoding, errors='replace')
# Always display a warning in case of non-ascii characters.
msg = '\n' + str(error) + '\nWorkaround: non ascii field {} decoded as {}.'.format(s_original, string)
warnings.warn(msg)
return string
def _check_channel(self, channel):
"""
Verify that the channel is in the file and (if needed) convert the channel label to the channel index.
Args:
channel (int or str): the channel specifying a signal in the file. May either be an integer specifying the
order in which the signal appeard in the EDF file (i.e. index), or a string specifying the signal label.
Returns:
(int): channel index.
"""
# Check type of channel.
if isinstance(channel, str):
# Assume it is a label and find the corresponding integer index/channel.
labels = self.signal_headers['label']
if channel in labels:
channel = labels.index(channel)
else:
raise ValueError('Channel label "{}" not found in file. Labels in file: {}.'
.format(channel, self.signal_headers['label']))
elif isinstance(channel, int):
if channel > self.file_header['num_signals'] - 1:
raise ValueError('Channel index ({}) out of range for file with {} signals.'
.format(channel, self.file_header['num_signals']))
else:
raise ValueError('Invalid channel "{}". Specify either the channel number (as int) or the channel label '
'(as str).'.format(channel))
return channel
def _check_file_header(self):
"""
Check for unusual values in the EDF file header (and maybe try to fix them).
Args:
file_header (dict): dictionary of the EDF file header.
Returns:
file_header (dict): dictionary of the EDF file header (with possibly fixed values).
"""
file_header = self.file_header
if file_header['num_datarecords'] == -1:
raise NotImplementedError('Number of datarecords unknown from file header. '
'Infer this number from total datarecord length')
# Check if filesize matches the info in the header. If not, try to fix num_datarecords.
filesize = self.size
size_header = file_header['size_header']
size_datarecord = sum(self.signal_headers['num_samples']) * self.__bytes_per_sample
num_datarecords_expected = (filesize - size_header) / size_datarecord
if file_header['num_datarecords'] != num_datarecords_expected:
if num_datarecords_expected.is_integer():
msg = '\nnum_datarecords in header does not match filesize. Correction made automatically.'
file_header['num_datarecords'] = int(num_datarecords_expected)
else:
msg = '\nnum_datarecords in header does not match filesize. Could not correct automatically.'
warnings.warn(msg)
return file_header
@staticmethod
def _check_signal_headers(signal_headers):
"""
Check for unusual values in the EDF signal headers (and maybe try to fix them).
Args:
signal_headers (dict): dictionary of the EDF signal headers.
Returns:
signal_headers (dict): dictionary of the EDF signal headers (with possibly fixed values).
"""
labels = signal_headers['label']
physical_min = signal_headers['physical_min']
physical_max = signal_headers['physical_max']
digital_min = signal_headers['digital_min']
digital_max = signal_headers['digital_max']
for i, fields in enumerate(zip(labels, physical_min, physical_max, digital_min, digital_max)):
# Unpack the zipped fields.
lab, p_min, p_max, d_min, d_max = fields
# Some checks.
if not lab:
warnings.warn('Empty label name for signal {}!'.format(i))
if p_min >= p_max:
warnings.warn('physical_min ({}) >= physical max ({}) for signal {}!'.format(p_min, p_max, i))
if d_min >= d_max:
warnings.warn('digital_min ({}) >= digital max ({}) for signal {}!'.format(d_min, d_max, i))
return signal_headers
def _collect_additional_info(self):
"""
Extracts and collects additional file and signal info in header (e.g. frequencies, EDF+ fields).
Returns:
additional_info (dict): Dictionary with additional file and signal info.
"""
# Initialize additional_info dictionary.
additional_info = dict()
# Signal frequencies.
duration_datarecord = self.file_header['duration_datarecord']
additional_info['fs'] = [round(num_samples/duration_datarecord, 10) if duration_datarecord > 0 else 0
for num_samples in
self.signal_headers['num_samples']]
# File type.
filetype = self._read_filetype()
additional_info['filetype'] = filetype
# Total duration (seconds).
total_duration = self.file_header['num_datarecords'] * duration_datarecord
additional_info['total_duration'] = total_duration
# EDF+ subfields.
if filetype in EDFPLUS_TYPES:
additional_info.update(self._collect_edfplus_subfields())
return additional_info
def _collect_edfplus_subfields(self):
"""
Extracts and collects the subfields in EDF+ header.
Returns:
edfplus_subfields (dict): Dictionary with keys 'patient_id' and 'recording_id' whose values are also
dictionaries containing the subfields of the patient id and recording id, respectively.
"""
# Extract entire fields for patient and recording identification from file header.
patient_id = self.file_header['patient_id']
recording_id = self.file_header['recording_id']
# Subfields are separated by spaces.
patient_id_list = patient_id.split(' ')
recording_id_list = recording_id.split(' ')
# We will collect the patient and recording subfields in dedicated dictionaries.
patient_id_subfields = dict()
recording_id_subfields = dict()
# The first four patient id subfields are required and have fixed interpretations.
patient_id_subfields['code'] = patient_id_list[0]
patient_id_subfields['sex'] = patient_id_list[1]
patient_id_subfields['birthdate'] = self._datestring_to_datetime(patient_id_list[2])
patient_id_subfields['name'] = patient_id_list[3]
# Additional patient information may be specified after the fourth required subfield.
if len(patient_id_list) > 4:
patient_id_subfields['additional'] = ' '.join(patient_id_list[4:])
# The first four recording id subfields are required and have fixed interpretations.
recording_id_subfields['startdate_text'] = recording_id_list[0]
recording_id_subfields['startdate'] = self._datestring_to_datetime(recording_id_list[1],
date_format='dd-MMM-yyyy')
recording_id_subfields['hospital_administration_code'] = recording_id_list[2]
recording_id_subfields['investigator'] = recording_id_list[3]
recording_id_subfields['equipement'] = recording_id_list[4]
# Additional recording information may be specified after the fifth required subfield.
if len(recording_id_list) > 5:
recording_id_subfields['additional'] = ' '.join(recording_id_list[5:])
# Collect the patient and recording subfields in one dictionary.
edfplus_subfields = dict(patient_id=patient_id_subfields,
recording_id=recording_id_subfields)
return edfplus_subfields
@staticmethod
def _convert_digital_to_physical(signal_digital, physical_min, physical_max, digital_min, digital_max):
"""
Convert the digital signal values to physical signal values using corresponding digital and physical minimum and
maximum values as specified in the EDF signal header.
Args:
signal_digital (np.ndarray): array with the digital values of a signal.
physical_min (float): physical minimum.
physical_max (float): physical maximum.
digital_min (int): digital minimum corresponding to physical minimum.
digital_max (int): digital maximum corresponding to physical maximum.
Returns:
signal_physical (np.ndarray): Array with the physical values of the signal.
"""
# Compute scale and offset.
scale, offset = EdfReader._compute_scale_offset(physical_min, physical_max,
digital_min, digital_max)
# Compute physical values.
return signal_digital * scale + offset
@staticmethod
def _convert_physical_to_digital(signal_physical, physical_min, physical_max, digital_min, digital_max):
"""
Convert the physical signal values to digital signal values using corresponding digital and physical minimum and
maximum values.
Args:
signal_physical (np.ndarray): array with the physical values of the signal.
physical_min (float): physical minimum.
physical_max (float): physical maximum.
digital_min (int): digital minimum corresponding to physical minimum.
digital_max (int): digital maximum corresponding to physical maximum.
Returns:
signal_digital (np.ndarray): array with the digital values of a signal.
"""
# Compute scale and offset.
scale, offset = EdfReader._compute_scale_offset(physical_min, physical_max,
digital_min, digital_max)
# Compute digital values.
signal_digital = np.round((signal_physical - offset) / scale)
# Set nans to zero.
signal_digital = np.nan_to_num(signal_digital)
# Clip to make sure they fit in the digital_min and digital_max and convert to int16.
signal_digital = np.clip(signal_digital, digital_min, digital_max, out=signal_digital)
return signal_digital.astype(np.int16)
@staticmethod
def _convert_raw_header(hdr):
"""
Convert the raw EDF file header entries in bytes to appropriate data types.
Args:
hdr (dict): Dictionary containing the raw EDF file header.
Returns:
hdr (dict): Dictionary containing the converted EDF file header.
"""
# Wrap the function that converts the bytes to string.
b2s = partial(EdfReader._bytes_to_string, encoding='ascii', strip=True)
hdr['version'] = b2s(hdr['version']) # str
hdr['patient_id'] = b2s(hdr['patient_id']) # str
hdr['recording_id'] = b2s(hdr['recording_id']) # str
# Store startdate as a datetime.date object.
startdate_str = b2s(hdr['startdate'])
hdr['startdate'] = EdfReader._datestring_to_datetime(startdate_str, date_format='dd.mm.yy') # datetime.date
# Store starttime as a datetime.time object.
starttime_str = b2s(hdr['starttime'])
starttime_hour, starttime_minute, starttime_second = [int(i) for i in starttime_str.split('.')]
hdr['starttime'] = datetime.time(starttime_hour, starttime_minute, starttime_second) # datetime.time
hdr['size_header'] = int(b2s(hdr['size_header'])) # int
hdr['reserved'] = b2s(hdr['reserved']) # str
hdr['num_datarecords'] = int(b2s(hdr['num_datarecords'])) # int
hdr['duration_datarecord'] = float(b2s(hdr['duration_datarecord'])) # float
hdr['num_signals'] = int(b2s(hdr['num_signals'])) # int
return hdr
@staticmethod
def _convert_header_to_bytes(file_hdr):
"""
Convert a file header dictionary (with normal strings and datetime objects) to a dict with bytes as per specs.
Args:
file_hdr (dict): dictionary of a complete file header. The startdate and starttime fields should
be datetime.date and datetime.time objects, respectively.
Returns:
file_hdr_bytes (dict): dictionary with the bytes representation of the entries in the header.
The fields are justified such that they have the correct length in order to save it as an EDF file.
"""
encoding = 'ascii'
file_hdr_bytes = {
'version':
bytes(str(file_hdr['version']).ljust(8)[:8], encoding=encoding),
'patient_id':
bytes(str(file_hdr['patient_id']).ljust(80)[:80], encoding=encoding),
'recording_id':
bytes(str(file_hdr['recording_id']).ljust(80)[:80], encoding=encoding),
'startdate':
bytes(
EdfReader._datetime_to_datestring(date=file_hdr['startdate'], date_format="dd.mm.yy").ljust(8)[:8],
encoding=encoding),
'starttime':
bytes(file_hdr['starttime'].strftime("%H.%M.%S").ljust(8)[:8], encoding=encoding),
'size_header':
bytes('{:.0f}'.format(file_hdr['size_header']).ljust(8)[:8], encoding=encoding),
'reserved':
bytes(str(file_hdr['reserved']).ljust(44)[:44], encoding=encoding),
'num_datarecords':
bytes('{:.0f}'.format(file_hdr['num_datarecords']).ljust(8)[:8], encoding=encoding),
'duration_datarecord':
bytes('{:.0f}'.format(file_hdr['duration_datarecord']).ljust(8)[:8],
encoding=encoding),
'num_signals':
bytes('{:.0f}'.format(file_hdr['num_signals']).ljust(4)[:4], encoding=encoding),
}
# Check that header consists of 256 bytes.
assert np.sum([len(item) for item in file_hdr_bytes.values()]) == 256
return file_hdr_bytes
def _compute_offset_in_file(self, channel, datarecord_start):
"""
Compute the offset (in bytes) in the EDF file to point to a certain signal (channel) at a certain datarecord.
Args:
channel (int): Specify which signal to read, by specifying its channel.
datarecord_start (int): Specify the datarecord to start reading from (counting from 0).
Returns:
(int): Number of bytes to skip in the EDF file to start reading signal in specified channel from specified
datarecord on.
"""
# Header size.
size_header = self.file_header['size_header'] # in bytes
# Compute offset (in bytes!) to start reading at a specific datarecord.
datarecord_offset_bytes = datarecord_start * sum(self.signal_headers['num_samples']) * self.__bytes_per_sample
# Compute the index (in bytes!) in a datarecord where the signal starts.
start_byte_in_datarecord = sum(self.signal_headers['num_samples'][:channel]) * self.__bytes_per_sample
return size_header + datarecord_offset_bytes + start_byte_in_datarecord
@staticmethod
def _compute_scale_offset(physical_min, physical_max, digital_min, digital_max):
"""
The formula to compute the physical value, P, reads: P = Pmin + (Pmax - Pmin) * (D - Dmin) / (Dmax - Dmin).
We will rewrite this to: P = scale*D + offset.
Args:
physical_min (float): physical minimum.
physical_max (float): physical maximum.
digital_min (int): digital minimum corresponding to physical minimum.
digital_max (int): digital maximum corresponding to physical maximum.
Returns:
scale (float): scale to convert digital to physical values.
offset (float): offset to convert digital to physical values.
"""
scale = (physical_max - physical_min)/(digital_max - digital_min)
offset = physical_min - scale*digital_min
return scale, offset
@staticmethod
def _datestring_to_datetime(datestring, date_format='dd-MMM-yyyy'):
"""
Converts a string specifying the date in specified format to a datetime.date object.
Args:
datestring (str): string specifying the date in specified format.
Either "dd-MMM-yyyy" or "dd.mm.yy".
Returns:
(datetime.date): datetime.date object of the date.
"""
if date_format == 'dd-MMM-yyyy':
dateparts = datestring.split('-')
try:
day = int(dateparts[0])
month = int(MONTH_TO_INT[dateparts[1].upper()])
year = int(dateparts[2])
except ValueError:
day = 1
month = 1
year = 1900
elif date_format == 'dd.mm.yy':
day, month, year = [int(i) for i in datestring.split('.')]
century = 2000 if year < 85 else 1900 # 1985 as clipping date according to EDF(+) specs.
year += century
else:
raise ValueError('Invalid date format "{}". Choose "dd-MMM-yyyy" or "dd.mm.yy".'.format(date_format))
return datetime.date(year, month, day)
@staticmethod
def _datetime_to_datestring(date, date_format='dd-MMM-yyyy'):
"""
Converts a datetime.date object to a string specifying the date in dd-MMM-yyyy or dd.mm.yyyy format.
Args:
date (datetime.date): datetime.date object of the date.
Either "dd-MMM-yyyy" or "dd.mm.yy".
Returns:
datestring (str): string specifying the date in specified format
"""
# Extract day and year.
day = date.day
year = date.year
# Check year.
if year < 1000 or year > 9999:
raise ValueError('Invalid year "{}". Year must be in the range of 1000 - 9999.'.format(year))
if date_format == 'dd-MMM-yyyy':
month = INT_TO_MONTH[date.month].upper() # 3 letter abbreviation of the month.
datestring = '{:02d}-{}-{}'.format(day, month, year)
elif date_format == 'dd.mm.yy':
month = date.month
year = year % 100 # Ignore the century.
datestring = '{:02d}.{:02d}.{:02d}'.format(day, month, year)
else:
raise ValueError('Invalid date format "{}". Choose "dd-MMM-yyyy" or "dd.mm.yy".'.format(date_format))
return datestring
def _get_all_digital_data(self):
"""
Return digital data values of all signals and all datarecords in the EDF file.
Returns:
(np.ndarray): (num_datarecords, size_datarecord) array with the int16 values of the digital data.
"""
if self._all_digital_data is None:
self._all_digital_data = self._read_all_digital_data()
return self._all_digital_data
def _get_annot_channel(self, raise_error=True):
"""
Return the channel index of the EDF Annotations signal.
Args:
raise_error (bool): if True, raises a ValueError if there is no EDF Annotations signal.
If False, returns None if there is no EDF Annotations signal.
"""
labels = self.signal_headers['label']
num_annot_signals = sum((lab == 'EDF Annotations' for lab in labels))
if num_annot_signals > 1:
raise NotImplementedError('Multiple EDF Annotation signals found. This is possible, but not implemented. '
'The first EDF Annotations signal only conatins the starttime of each '
'datarecord.')
elif num_annot_signals == 0:
if raise_error:
raise ValueError('No EDF Annotations signal found in the EDF file with labels {}.'.format(labels))
else:
annot_channel = None
else: # num_annot_signals == 1
# Find the index (channel) of the EDF Annotations signal.
annot_channel = labels.index('EDF Annotations')
return annot_channel
def _get_discontinuous_timestamps(self, begin=0, end=None, efficiency='speed'):
"""
Return the timestamps that mark the beginning/offset of each subsignal in a discontinous EDF.
Returns:
time_offsets (np.ndarray): array with the timestamps that mark the beginning of each subsignal.
The length of time_offsets is equal to the number of subsignals in the file.
efficiency (str, optional): see self._read_timestamps_datarecords().
"""
# Read the timestamps of the datarecords.
timestamps = self._read_timestamps_datarecords(efficiency=efficiency)
if begin != 0:
timestamps = timestamps[timestamps >= begin]
if end is not None:
timestamps = timestamps[timestamps <= end]
# Find the indices where the signal is discontinuous.
duration_datarecord = self.file_header['duration_datarecord']
diff = np.diff(timestamps)
tol = duration_datarecord / 1e4
idx_gaps = np.where(abs(diff - duration_datarecord) > tol)[0] + 1
# Add the first timestamp the beginning of the array.
time_offsets = np.concatenate((timestamps[0:1], (timestamps[idx_gaps])))
return time_offsets
def _get_idx_longest(self, begin=0, end=None, efficiency='speed'):
"""
Return the index of the longest signal if the file is discontinuous.
"""
# Read the timestamps of the datarecords.
timestamps = self._read_timestamps_datarecords(efficiency=efficiency)
if begin != 0:
timestamps = timestamps[timestamps >= begin]
if end is not None:
timestamps = timestamps[timestamps <= end]
# Find the indices where the signal is discontinuous.
duration_datarecord = self.file_header['duration_datarecord']
diff = np.diff(timestamps)
tol = duration_datarecord / 1e4
idx_gaps = np.where(abs(diff - duration_datarecord) > tol)[0] + 1
# Add the total length, to include the last continuous signal.
idx_starts = [0] + list(idx_gaps) + [len(timestamps)]
return np.argmax(np.diff(idx_starts))
def _get_fid(self):
"""
Return the file handle (opens the file if necessary).
Returns:
(file object): file handle.
"""
if self._fid is None or self._fid.closed:
self._fid = open(self.filepath, 'rb')
return self._fid
def _handle_discontinuous_signal(self, discontinuous_signal, num_samples,
datarecord_start, datarecord_stop,
discontinuous_mode='longest', efficiency='speed'):
"""
Split the discontinuous signal in continuous sub-signals and return one, or some, or all of them.
Args:
discontinuous_signal (np.ndarray): array with the discontinuous signal.
num_samples (int): number of samples per datarecord for signal.
discontinuous_mode (str, optional): how to handle discontinuous data (EDF+D).
If 'longest', return the longest continuous segment.
If 'all', return all sements each as a separate EegDataset object in a list.
If 'ignore', the distontinuous signal is returned as if it is continuous.
efficiency (str, optional): see _read_timestamps_datarecords.
Returns:
(np.ndarray or list): if discontinuous_mode == 'longest', an array is returned containing the longest sub-signal.
if discontinuous_mode == 'all', a list with arrays is returned containing all sub-signals.
"""
if discontinuous_mode == 'ignore':
# Return the discontinuous signal as if it is continuous.
msg = '\nIgnoring discontinuity in EDF: treating discontinuous EDF as continuous EDF.'
warnings.warn(msg)
return discontinuous_signal
# Read the timestamps of the datarecords.
timestamps = self._read_timestamps_datarecords(efficiency=efficiency)[datarecord_start: datarecord_stop]
# Find the indices where the signal is discontinuous.
duration_datarecord = self.file_header['duration_datarecord']
num_samples_per_datarecord = num_samples
diff = np.diff(timestamps)
tol = duration_datarecord/1e4
idx_gaps = ((np.where(abs(diff - duration_datarecord) > tol)[0] + 1) * num_samples_per_datarecord).astype(int)
# Add the length of the discontinuous signal, to include the last continuous signal in the for loop below.
idx_gaps = np.append(idx_gaps, len(discontinuous_signal))
# Loop over continuous segments in discontinuous data and collect the continuous segments as a list.
continuous_signals = []
idx_old = 0
for idx in idx_gaps:
continuous_signals.append(discontinuous_signal[idx_old: idx])
idx_old = idx
if discontinuous_mode == 'all':
# Return all the continuous signals as a list.
return continuous_signals
elif discontinuous_mode == 'longest':
# Return the longest continuous signal.
signal_lengths = [len(s) for s in continuous_signals]
longest_signal = continuous_signals[np.argmax(signal_lengths)]
# Report the amount of data in the selected signal compared to the total amount of discontinuous data.
data_frac = len(longest_signal) / len(discontinuous_signal)
msg = '\nSelecting longest continuous signal in discontinuous EDF: selected signal is {} % of the '\
'discontinuous file.'.format(data_frac*100)
warnings.warn(msg)
return longest_signal
else:
raise ValueError('Invalid `discontinuous_mode` argument "{}". Choose from: all, longest, ignore'.format(discontinuous_mode, ))
def _insert_annotations(self, annot_bytes, new_annotations):
"""
Insert new annotations in existing annotations.
Args:
annot_bytes (list): list with existing annotations in bytes. Length should equal the number of datarecords.
new_annotations (pd.DataFrame): pandas DataFrame with the following columns:
'onset': the starttime of the annotation (in seconds with repect to the start of recording).
'duration': the duration of the annotation (specify -1 if not applicable).
'text': annotation text.
"""
# Get timestamps of annotations.
time_edf_annotations = np.array([self._read_timestamp(annot) for annot in annot_bytes])
# Get the number of bytes per annotation.
num_bytes = len(annot_bytes[0])
# Insert annotations.
longest = 0
for _, new_annot in new_annotations.iterrows():
onset = new_annot['onset']
duration = new_annot['duration']
text = new_annot['text']
# Find index of datarecord to insert current annotation.
idx = np.argmin(np.abs(time_edf_annotations - onset))
# Get old annotation.
annot_b = annot_bytes[idx]
# Strip the trailing \x00 bytes, but put one back to indicate end of the last TAL.
annot_b = annot_b.rstrip(b'\x00') + b'\x00'
# Create TAL for the new annotation.
annot_b += '+{}'.format(onset).encode('utf-8', 'strict')
if duration is not None and not np.isnan(duration) and duration > 0:
# Add duration.
annot_b += b'\x15' + '{}'.format(duration).encode('utf-8', 'strict')
# Add annotation text and indicate end of TAL.
annot_b += b'\x14' + text.encode('utf-8', 'strict') + b'\x14\x00'
# Add trailing bytes. Make sure to end up with a multiple of __bytes_per_sample bytes.
len_bytes = max(num_bytes, len(annot_b))
len_bytes = int(np.ceil(len_bytes/self.__bytes_per_sample)*self.__bytes_per_sample)
annot_b = annot_b.ljust(len_bytes, b'\x00')
# Replace old with new annotation.
annot_bytes[idx] = annot_b
# Keep track of longest annotation.
if len(annot_b) > longest:
longest = len(annot_b)
# Check if the longest annotation still fits.
if longest > num_bytes:
# Change the lengths of the annotations to make the longest one fit.
num_bytes = longest
annot_bytes = [annot_b.ljust(num_bytes, b'\x00') for annot_b in annot_bytes]
return annot_bytes
def _read_all_digital_data(self):
"""
Read the raw, digital data in all datarecords to int16 (without conversion to physical values).
Returns:
(np.ndarray): (num_datarecords, size_datarecord) array with the int16 values of the digital data.
"""
# Extract relevant file header information.
size_header = self.file_header['size_header']
num_datarecords = self.file_header['num_datarecords']
# Set pointer to beginning of first datarecord.
fid = self._get_fid()
fid.seek(size_header, os.SEEK_SET)
# Read all datarecords at once.
data = np.fromfile(fid, dtype=np.int16)
# Compute size (i.e. number of samples) of one datarecord.
size_datarecord = int(len(data) / num_datarecords)
# Reshape data array.
data.shape = (num_datarecords, size_datarecord)
return data
def _read_annotation_bytes_max_speed(self, annot_channel):
"""
Read the raw annotations bytes in the EDF+ file with optimized speed.
Uses an algortihm that should be fast for large files, however it will read the entire raw file into the memory,
so the algortihm requires that the entire EDF+ file fits in the memory. If the EDF+ does not fit into the memory
consider using self._read_annotation_bytes_min_memory instead.
Args:
annot_channel (int): Specify the channel (index) of the EDF Annotation channel to read the annotations from.
Returns:
all_annot_bytes (list of bytes): The raw (bytes) EDF Annotations signal for each datarecord.
"""
# Extract relevant header information.
size_header = self.file_header['size_header'] # in bytes
num_datarecords = self.file_header['num_datarecords']
num_samples = self.signal_headers['num_samples']
# Compute size (number of bytes) of one datarecord.
size_datarecord = sum(num_samples) * self.__bytes_per_sample
# Set the pointer in the file at the start of the first datarecord, i.e. skip the file and signal headers.
fid = self._get_fid()
fid.seek(size_header, 0)
# Read the all raw datarecords into memory.
byte_decimal_matrix = np.fromfile(fid, dtype=np.dtype(np.int8))
# Reshape raw data array to a matrix.
byte_decimal_matrix.shape = (num_datarecords, size_datarecord)
# Find the start and stop index of the annotation channel in a datarecord. The matrix contains single bytes, so
# we multiply the offset in samples with the number of bytes per sample.
idx_start = sum(num_samples[:annot_channel]) * self.__bytes_per_sample
idx_stop = (idx_start + num_samples[annot_channel]) * self.__bytes_per_sample
# Extract the slice of the data matrix belonging to the annotations.
byte_decimal_matrix_annot = byte_decimal_matrix[:, idx_start:idx_stop]
# Convert the annotation entry in each datarecord back to bytes and collect them in a list.
all_annot_bytes = []
for byte_decimal_annot in byte_decimal_matrix_annot:
# Strip the trailing \x00 bytes.
all_annot_bytes.append(byte_decimal_annot.tobytes().rstrip(b'\x00'))
assert len(all_annot_bytes) == num_datarecords
return all_annot_bytes
def _read_annotation_bytes_min_memory(self, annot_channel):
"""
Read the raw annotations bytes in the EDF+ file with minimized memory requirement.
Uses an algortihm that should minimize the memory requirement, however it might be slow for large files since it
skips over large chunks in the files repeatedly. For an implementation optimized for speed, see
self._read_annotation_bytes_max_speed.
Args:
annot_channel (int): Specify the channel (index) of the EDF Annotation channel to read the annotations from.
Returns:
all_annot_bytes (list of bytes): The raw (bytes) EDF Annotations signal for each datarecord.
"""
# Extract relevant header information.
num_datarecords = self.file_header['num_datarecords']
num_samples = self.signal_headers['num_samples']
# Number of bytes in the annotation channel in one datarecord.
num_bytes_annot = num_samples[annot_channel] * self.__bytes_per_sample
# Compute the offset in the file (in bytes) where we should start reading (start reading at the first datarecord
# to read all annotations).
offset = self._compute_offset_in_file(channel=annot_channel, datarecord_start=0)
# Compute the number of bytes to skip when skipping over the other signals in the file (in bytes).
skip_bytes = sum(num_samples) * self.__bytes_per_sample - num_bytes_annot
# Set pointer to the start of the annotation channel in the first datarecord.
fid = self._get_fid()
fid.seek(offset, os.SEEK_SET)
# Loop over datarecords and collect each annotation signal in a datarecord in a list.
all_annot_bytes = []
for i in range(num_datarecords):
# Read annotations in current datarecord as bytes, strip the trailing \x00 bytes.
all_annot_bytes.append(fid.read(num_bytes_annot).rstrip(b'\x00'))
# Skip other signals.
fid.seek(skip_bytes, 1)
return all_annot_bytes
def _read_digital_data_max_speed(self, channel, datarecord_start=0, datarecord_stop=None):
"""
Read a (part of a) signal from the EDF file by holding the entire file raw in memory, maximizing the speed of
reading.
This is the fastest implementation for reading large portions of an EDF file. However, it requires that at least
the entire raw EDF file can be stored in RAM. If you want to read only a small part of a signal, you may want
to use self._read_digital_data_min_memory instead, which requires less memory and might be faster depending on
the amount of data to read.
Args:
channel (int): Specify which signal to read, by specifying its channel.
datarecord_start (int, optional): Specify the datarecord to start reading from (counting from 0).
datarecord_stop (int, optional): Specify the datarecord to stop reading (the specified datarecord will not
be read, when counting from 0)
Returns:
signal_data (np.ndarray): Array holding the digital values of the specified signal.
"""
# Default stop datarecord is the last datarecord.
datarecord_stop = self.file_header['num_datarecords'] if datarecord_stop is None else datarecord_stop
# Check input.
if datarecord_start >= datarecord_stop:
raise ValueError('datarecord_start ({}) must be less than datarecord_stop ({}).'
.format(datarecord_start, datarecord_stop))
# Number of samples (all signals).
num_samples = self.signal_headers['num_samples']
# Compute the sample indices in a datarecord where the signal starts and ends.
idx_start = sum(num_samples[:channel])
idx_stop = idx_start + num_samples[channel]
# Extract the digital signal data from the matrix containing all digital data.
signal_digital_matrix = self._get_all_digital_data()[datarecord_start: datarecord_stop,
idx_start: idx_stop]
# Flatten the matrix (we can choose from np.flatten(), np.reshape(-1), np.ravel())
return signal_digital_matrix.flatten()
def _read_digital_data_min_memory(self, channel, datarecord_start=0, datarecord_stop=None, verbose=0):
"""
Read a (part of a) signal from the EDF file by reading it directly from the file, minimizing the amount of
memory needed.
This is not the fastest way to read an entire signal (self.read_signal is faster), but this implementation
requires less memory since it only reads the specific part of the file into the memory and is therefore suited
for cases when only small parts of a signal are to be read.
Args:
channel (int): Specify which signal to read, by specifying its channel.
datarecord_start (int, optional): Specify the datarecord to start reading from (counting from 0).
datarecord_stop (int, optional): Specify the datarecord to stop reading (the specified datarecord will not
be read, when counting from 0)
verbose (int): if 1, shows progress bar.
Returns:
signal_data (np.ndarray): Array holding the digital values of the specified signal.
"""
# Default stop datarecord is the last datarecord.
datarecord_stop = self.file_header['num_datarecords'] if datarecord_stop is None else datarecord_stop
# Check input.
if datarecord_start >= datarecord_stop:
raise ValueError('datarecord_start ({}) must be smaller than datarecord_stop ({}).'
.format(datarecord_start, datarecord_stop))
# Number of samples (all signals).
num_samples = self.signal_headers['num_samples']
# Number of samples of the specified signal in one datarecord.
num_samples_channel = num_samples[channel]
# Compute the number of datarecords to read.
num_datarecords = datarecord_stop - datarecord_start
# Allocate an array to be filled with the digital data values, and initiate an index pointer for this array.
signal_digital, idx = np.zeros((num_datarecords * num_samples_channel), dtype=np.int16), 0
# Compute the offset in the file (in bytes) where we should start reading.
offset = self._compute_offset_in_file(channel, datarecord_start)
# Set pointer to the start of the signal in the first datarecord to read.
fid = self._get_fid()
fid.seek(offset, os.SEEK_SET)
# Compute the number of bytes to skip when skipping over the other signals in the file (in bytes).
skip_bytes = (sum(num_samples) - num_samples_channel) * self.__bytes_per_sample
# Loop until we have read the requested number of datarecords.
bar = pyprind.ProgBar(num_datarecords, stream=sys.stdout)
for i in range(num_datarecords):
# Read the signal values in current datarecord and store them in the corresponding location in the
# signal_digital array.
signal_digital[idx:idx + num_samples_channel] = np.fromfile(fid, dtype=np.int16,
count=num_samples_channel)
# Update the index pointer for the signal_digital array.
idx += num_samples_channel
# Skip other signals.
fid.seek(skip_bytes, os.SEEK_CUR)
if verbose:
bar.update()
return signal_digital
def _read_file_header(self, convert_bytes=True):
"""
Read the EDF file header and return as a dictionary.
For EDF specs, see https://www.edfplus.info/specs/edf.html
Args:
convert_bytes (bool, optional): If True (default), convert the raw bytes of each header field to suitable
objects (e.g. str, int, float).
Returns:
(dict): dictionary containing the information in the EDF file header.
"""
# Initialize header dictionary.
hdr = default_edf_file_header()
# Go to the beginning of the EDF file.
fid = self._get_fid()
fid.seek(0, os.SEEK_SET)
# Read raw bytes from EDF file, piece by piece, as defined in the EDF specs.
hdr['version'] = fid.read(8)
hdr['patient_id'] = fid.read(80)
hdr['recording_id'] = fid.read(80)
hdr['startdate'] = fid.read(8)
hdr['starttime'] = fid.read(8)
hdr['size_header'] = fid.read(8)
hdr['reserved'] = fid.read(44)
hdr['num_datarecords'] = fid.read(8)
hdr['duration_datarecord'] = fid.read(8)
hdr['num_signals'] = fid.read(4)
# Convert raw bytes to appropriate data types if requested.
if convert_bytes:
hdr = self._convert_raw_header(hdr)
# Check for unusual values in header and maybe fix them.
self._file_header = hdr
hdr = self._check_file_header()
return hdr
def _read_filetype(self):
"""
Read the type of the file.
File types:
'EDF': original EDF file type.
EDFPLUS_TYPES (see in .config).
Returns:
filetype (str): File type.
"""
reserved = self.file_header['reserved']
if len(reserved) >= 3 and reserved[:3].lower() == 'edf':
# File is EDF+. Find out if EDF+ is continuous (EDF+C) or discontinuous (EDF+D).
if len(reserved) >= 5 and reserved[:5] in EDFPLUS_TYPES:
filetype = reserved[:5]
elif 'EDF' in reserved:
filetype = reserved.strip()
else:
raise NotImplementedError('Unexpected reserved field: "{}"'.format(self.file_header['reserved']))
else:
# Assume original EDF file type if no EDF+ type specified in reserved field.
filetype = 'EDF'
if 'edf+' in filetype.lower():
# Check if there is a EDF Annotations signal.
if 'EDF Annotations' not in self.signal_headers['label']:
msg = '\nNo EDF Annotations found in "{}" file. Treating as regular EDF.'.format(filetype)
warnings.warn(msg)
filetype = 'EDF'
return filetype
def _read_signal_headers(self):
"""
Read all signal headers from EDF file.
Returns:
hdr (dict):
"""
# Number of signals in file.
ns = self.file_header['num_signals']
# Initialize header dictionary.
hdr = default_edf_signal_header()
# Wrap the function that converts the bytes to string.
b2s = partial(self._bytes_to_string, encoding='ascii', strip=True)
# Go to the beginning of the first signal headers in EDF file (257th byte).
fid = self._get_fid()
fid.seek(256, os.SEEK_SET)
# Read raw bytes, convert to appropriate data type and store as lists in header dictionary.
hdr['label'] = [b2s(fid.read(16)) for i in range(ns)]
hdr['transducer'] = [b2s(fid.read(80)) for i in range(ns)]
hdr['physical_dimension'] = [b2s(fid.read(8)) for i in range(ns)]
hdr['physical_min'] = [float(b2s(fid.read(8))) for i in range(ns)]
hdr['physical_max'] = [float(b2s(fid.read(8))) for i in range(ns)]
hdr['digital_min'] = [int(b2s(fid.read(8))) for i in range(ns)]
hdr['digital_max'] = [int(b2s(fid.read(8))) for i in range(ns)]
hdr['prefilter'] = [b2s(fid.read(80)) for i in range(ns)]
hdr['num_samples'] = [int(b2s(fid.read(8))) for i in range(ns)]
hdr['reserved'] = [b2s(fid.read(32)) for i in range(ns)]
# Verify that we read the correct amount of bytes.
assert fid.tell() == 256 * (ns + 1)
# Check for unusual values in header and maybe fix them.
hdr = self._check_signal_headers(hdr)
return hdr
@staticmethod
def _read_timestamp(annot):
"""
Read the first onset in the annotation.
The first onset in an annotation of a datarecord represents the timestamp of the corresponding datarecord if
annot is the raw annotation signal from the first! EDF Annotations channel.
Args:
annot (bytes): all raw bytes of the annotation signal in a datarecord (trailing \x00 bytes may be stripped).
Returns:
(float): first onset time in the annotation.
"""
# The first characters until unprintable char 20 is the onset relative to the starttime of recording in file
# header.
return float(annot[:annot.find(b'\x14')].decode('utf-8', 'strict'))
def _get_timestamps_datarecords(self, efficiency='speed'):
"""
Return the timestamps at the start of datarecord.
If normal EDF, assumes starttime is 0s and uses number of datarecords and duration of datarecord
to compute time array.
If EDF+, take the timestamps from the annotations.
Returns:
t (np.ndarray): array with timestamps (in seconds).
"""
if self.additional_info['filetype'] not in EDFPLUS_TYPES:
t = np.arange(self.file_header['num_datarecords']) * self.file_header['duration_datarecord']
else:
t = self._read_timestamps_datarecords(efficiency=efficiency)
return t
def _read_timestamps_datarecords(self, efficiency='speed'):
"""
Read the timestamps for the datarecords in an EDF+ file.
Note: EDF+ only.
Args:
efficiency (str, optional): Specify which algorithm to use: 'speed' uses an algorithm optimized for speed
when reading annotations from a large file (see _read_annotations_max_speed), 'memory' uses an
algorithm that requires the least amount of memory (see _read_annotations_min_memory).
Returns:
starttime_datarecords (np.ndarray): Array containing the (fractional) starttimes of each datarecord
w.r.t. the starttime (which is in whole seconds) in the EDF+ file header.
"""
# Verify that we are dealing with EDF+.
if self.additional_info['filetype'] not in EDFPLUS_TYPES:
raise ValueError('Filetype {} does not support Annotations. Filetype must be EDF+ for Annotations.'
.format(self.additional_info['filetype']))
# Find the index (channel) of the first EDF Annotations signal.
annot_channel = self.signal_headers['label'].index('EDF Annotations')
# Read the annotations as bytes and collect the annotation bytes per datarecord in a list (annot_bytes).
if efficiency == 'speed':
# Read with maximum speed.
annot_bytes = self._read_annotation_bytes_max_speed(annot_channel)
elif efficiency == 'memory':
# Read with minimum memory requirement.
annot_bytes = self._read_annotation_bytes_min_memory(annot_channel)
else:
raise ValueError('Invalid efficiency value "{}". Chose from "speed", "memory".'.format(efficiency))
# Collect the timestamps of the datarecords. These timestamps is the onset (starttime) of the datarecord w.r.t.
# the starttime in the file header.
starttime_datarecords = np.zeros(len(annot_bytes))
for i, annot in enumerate(annot_bytes):
starttime_datarecords[i] = self._read_timestamp(annot)
# Make sure that the annotation corresponding to this onset is empty, to verify it represents the starttime.
assert annot[annot.find(b'\x14') + 1] == 20, \
'Did not expect character {} after the time-stamp!.'.format(annot[annot.find(b'\x14') + 1])
return starttime_datarecords
@staticmethod
def _save(filepath_out, hdr_bytes, sig_hdr, all_data, verbose=1, encoding='ascii'):
"""
Save header and data to a new EDF file.
Collects all header information and data in bytes into one line and saves it to a file.
"""
# Create hdr in bytes.
hdr_bytes_line = b''.join([
hdr_bytes['version'],
hdr_bytes['patient_id'],
hdr_bytes['recording_id'],
hdr_bytes['startdate'],
hdr_bytes['starttime'],
hdr_bytes['size_header'],
hdr_bytes['reserved'],
hdr_bytes['num_datarecords'],
hdr_bytes['duration_datarecord'],
hdr_bytes['num_signals']])
# Check length.
if len(hdr_bytes_line) != 256:
raise AssertionError(
f'File hdr should consists of 256 bytes, but got a header of {len(hdr_bytes_line)} bytes.')
# Create signal header in bytes.
sig_hdr_bytes_line = b''.join(
[bytes(str(element).ljust(16)[:16], encoding=encoding) for element in sig_hdr['label']] +
[bytes(str(element).ljust(80)[:80], encoding=encoding) for element in sig_hdr['transducer']] +
[bytes(str(element).ljust(8)[:8], encoding=encoding) for element in sig_hdr['physical_dimension']] +
[bytes('{:.0f}'.format(element).ljust(8)[:8], encoding=encoding) for element in sig_hdr['physical_min']] +
[bytes('{:.0f}'.format(element).ljust(8)[:8], encoding=encoding) for element in sig_hdr['physical_max']] +
[bytes('{:.0f}'.format(element).ljust(8)[:8], encoding=encoding) for element in sig_hdr['digital_min']] +
[bytes('{:.0f}'.format(element).ljust(8)[:8], encoding=encoding) for element in sig_hdr['digital_max']] +
[bytes(str(element).ljust(80)[:80], encoding=encoding) for element in sig_hdr['prefilter']] +
[bytes('{:.0f}'.format(element).ljust(8)[:8], encoding=encoding) for element in sig_hdr['num_samples']] +
[bytes(str(element).ljust(32)[:32], encoding=encoding) for element in sig_hdr['reserved']]
)
# Check that the total header size matches the header and signal header.
size_header = int(EdfReader._bytes_to_string(hdr_bytes['size_header'], encoding='ascii', strip=True))
if len(hdr_bytes_line) + len(sig_hdr_bytes_line) != size_header:
raise AssertionError('Size of header as in header ({}) does not match actual size ({}).'
.format(size_header, len(hdr_bytes_line) + len(sig_hdr_bytes_line)))
# Make sure the data is 16-bits, reshape data to one line, and convert to bytes.
all_data_bytes_line = all_data.astype(np.int16).reshape(-1).tobytes()
# Join all parts.
file_bytes_line = b''.join([hdr_bytes_line, sig_hdr_bytes_line, all_data_bytes_line])
# Write to file.
if verbose:
print('Writing...')
with open(filepath_out, 'wb') as f:
f.write(file_bytes_line)
if verbose:
print('Saved to {}!'.format(filepath_out))
def _annotation_generator_edfplus(annotation):
"""
Generator that scans (part of) the annotation 'signal' and yields an Annotation object for every annotation it
encounters.
See the EDF+ specs for a description of the format of annotations: https://www.edfplus.info/specs/edfplus.html#tal
Args:
annotation (bytes): raw bytes of (a part of an) the annotation signal. The given signal should start at the
beginning of a datarecord.
Yields:
(edfreadpy.Annotation): an Annotation object is yielded each time an annotation is encountered while scanning
the bytes.
"""
# Initialize variables to be set with default values.
previous_upbyte = None # Previous unprintable byte.
idx_previous_upbyte = -1
onset = None
default_duration = np.nan
duration = default_duration
# Loop over bytes in annotation.
for idx_current_byte, current_byte in enumerate(annotation):
# Stop when we encounter a special byte, indicating the end of a subfield (either onset, duration or annotation
# text).
if current_byte in [0, 20, 21]: # special bytes are the unprintable ASCII characters 0, 20, 21.
# Decode the subfield in between the special bytes using utf-8 as specified in the EDF+ specs.
part = annotation[idx_previous_upbyte + 1: idx_current_byte].decode('utf-8', 'strict')
if previous_upbyte is None and current_byte == 20:
# First part of the annotation -> onset.
onset = float(part)
elif previous_upbyte == 20 and current_byte == 20:
# Annotation text is between two non printable 20 bytes.
text = part
# Check if text is not empty and yield Annotation(otherwise ignore it).
if text:
assert onset is not None
yield Annotation(onset, duration, text)
elif previous_upbyte == 21 and current_byte == 20:
# Duration.
duration = float(part)
elif previous_upbyte == 0:
# End of TAL -> new onset
onset = float(part)
elif previous_upbyte == 20 and current_byte == 0:
# End of an annotation -> reset onset and duration.
onset = None
duration = default_duration
else:
raise NotImplementedError('Unexpected sequence of unprintable character "{}" and "{}".'
.format(previous_upbyte, current_byte))
# Update for next iteration.
previous_upbyte = current_byte
idx_previous_upbyte = idx_current_byte
def _check_filepath_edf(filepath, overwrite=False):
# Check if directory in filepath exists. Create the directory if not.
directory = os.path.split(filepath)[0]
if not os.path.exists(directory):
# Create directory.
os.makedirs(directory)
# Check if filepath already exists and raise an error if it does.
if os.path.exists(filepath) and not overwrite:
raise FileExistsError('File "{}" already exists. Overwriting is not permitted.'
.format(filepath))
# Check validity of extension of a filename for writing data file.
valid_extensions = EDF_EXTENSIONS
if type(valid_extensions) is str:
# Convert to list.
valid_extensions = [valid_extensions]
# Get file extension.
file_extension = os.path.splitext(filepath)[1]
if not file_extension:
raise ValueError('No extension in filepath "{}"'.format(filepath))
# Remove leading dot and check if it is in the list of valid extensions.
if file_extension[1:] not in valid_extensions:
raise ValueError('Invalid extension in filepath "{}". Use one of: .{}'.
format(filepath, ' .'.join(valid_extensions)))