Source code for nnsa.io.readers

"""
Module for reading data files (builds on edfreadpy).

Supports EDF, EDF+, EDF+D, EDF+C.

"""
import sys
import warnings

import pyprind

import nnsa.edfreadpy as edfreadpy
from nnsa.edfreadpy.io.config import EEG_LABELS
from nnsa.containers.time_series import TimeSeries
from nnsa.edfreadpy.io.utils import standardize_and_check_eeg_label
from nnsa.containers.datasets import EegDataset, BaseDataset, OxygenDataset
import numpy as np

__all__ = [
    'EdfReader',
]


[docs]class EdfReader(edfreadpy.EdfReader): """ Child of edfreadpy.EdfReader, which includes interfaces to load data into nnsa containers. Args: filepath (str): path to the EDF(+) file to read (see edfreadpy.EdfReader). """
[docs] def read_bp_dataset(self, begin=0, end=None, patterns=None, **kwargs): """ Read all blood-pressure signals into a Dataset object. Args: begin (float, optional): begin time (in seconds) of the signal to read. By default reading starts at the beginning of the signal (i.e. at 0 seconds). end (float, optional): end time (in seconds) of the signal to read. By default reading ends at the end of the signal. patterns (list): optional list of patterns to look for in the EDF signal labels. If None, a default list of patterns will be used. **kwargs (optional): optional additional keyword arguments for self.read_time_series. Returns: (nnsa.BaseDataset or list): nnsa BaseDataset containing the signals from the EDF file. In case of EDF+D file and output == 'all', a list with BaseDataset objects is returned. """ labels = self.signal_headers['label'] ds = BaseDataset() if patterns is None: units = self.signal_headers['physical_dimension'] for lab, un in zip(labels, units): if 'BP' in lab or 'mmhg' in un.lower(): ts = self.read_time_series(begin=begin, end=end, channel=lab, check_label=False, check_unit=False, **kwargs) ds.append(ts) else: if isinstance(patterns, str): patterns = [patterns] for lab in labels: if any([pat in lab for pat in patterns]): ts = self.read_time_series(begin=begin, end=end, channel=lab, check_label=False, check_unit=False, **kwargs) ds.append(ts) return ds
[docs] def read_eeg_dataset(self, channels=None, begin=0, end=None, exclude_channels=None, discontinuous_mode='longest', verbose=0, **kwargs): """ Read EEG data from the EDF file as TimeSeries object per EEG channel and return them collectively in an EegDataset object. Args: channels (list, optional): list of signal labels, specifying which channels to read. If None, all EEG channels are read. Default is None. begin (float, optional): begin time (in seconds) of the signal to read. By default reading starts at the beginning of the signal (i.e. at 0 seconds). end (float, optional): end time (in seconds) of the signal to read. By default reading ends at the end of the signal. exclude_channels (list, optional): list of signal labels, specifying which EEG channels not to read. If None, no EEG channels are excluded. Default is None. discontinuous_mode (str, optional): how to handle discontinuous data (EDF+D). If 'longest', return the longest continuous segment. If 'all', return a list of all continuous segments. If 'fill' or 'full', merge all sements filling up the gaps with nan. If 'ignore', the discontinuous signal is returned as if it is continuous. See also self._handle_discontinuous_signal(). Defaults to 'longest'. verbose (int): verbosity level. If 1, a progress bar is shown. **kwargs (optional): optional keyword arguments for specifying TimeSeries parameters. Returns: (nnsa.EegDataset or list): nnsa EegDataset containing the EEG signals from the EDF file. In case of EDF+D file and output == 'all', a list with EegDataset objects is returned. """ # Default options. if channels is None: # Consider all channels. channels = self.signal_headers['label'] channels_specified = False else: # Use specified channels. channels_specified = True if exclude_channels is None: # By default, do not exclude any channels. exclude_channels = [] else: # Check that the specified channels are valid and warn the user if not. for label in exclude_channels: if not standardize_and_check_eeg_label(label)[1]: warnings.warn('\nChannel to be excluded "{}" is not a valid EEG channel.' .format(label)) if begin is None: begin = 0 # Collect all EEG channels to read. channels_to_read = [] # Read EEG data. for label in channels: if label in exclude_channels: # Skip this channel. continue # Check if label is a valid EEG label. valid_eeg_label = standardize_and_check_eeg_label(label)[1] # In case of a label specified by the user, raise an error if the label is not valid or not available. if channels_specified: if not valid_eeg_label: raise ValueError('Invalid EEG channel "{}". Valid EEG channels are: {}.' .format(label, EEG_LABELS)) if label not in self.signal_headers['label']: raise ValueError('Channel "{}" not in EDF file. Channels in EDF file: {}.' .format(label, self.signal_headers['label'])) # Read channel if it is an EEG channel. if valid_eeg_label: channels_to_read.append(label) # Read EEG signals from EDF. eeg_signals = [] bar = pyprind.ProgBar(len(channels_to_read), stream=sys.stdout) for label in channels_to_read: eeg_signals.append(self.read_time_series(label, begin=begin, end=end, discontinuous_mode=discontinuous_mode, check_label=True, check_unit=True, **kwargs)) if verbose: bar.update() # Create EegDataset with EEG data. if isinstance(eeg_signals[0], list): # In case of discontinuous data in the EDF+ file, multiple continuous subsignals may be read from the file. # Return each element in the signal list as a separate EegDataset. ds = [] for i in range(len(eeg_signals[0])): ds_i = EegDataset() for eeg_sig in eeg_signals: ds_i.append(eeg_sig[i]) ds.append(ds_i) else: ds = EegDataset(eeg_signals) return ds
[docs] def read_rso2_dataset(self, **kwargs): """ Read all NIRS-related signals into a Dataset object. Args: kwargs: optional keyword arguments for self.read_time_series(). Returns: (nnsa.OxygenDataset): Dataset object with all NIRS signals. """ labels = self.signal_headers['label'] units = self.signal_headers['physical_dimension'] ds = OxygenDataset() # List with patterns. patterns = ['rSo'] # Not case sensitive. for lab, un in zip(labels, units): if any([p.lower() in lab.lower() for p in patterns]) and un.lower().strip() in ['nos', '%']: ts = self.read_time_series(channel=lab, check_label=False, check_unit=False, **kwargs) ds.append(ts) return ds
[docs] def read_spo2_dataset(self, **kwargs): """ Read all arterial oxygen-related signals into a Dataset object. Args: kwargs: optional keyword arguments for TimeSeries. Returns: (nnsa.OxygenDataset): Dataset object with all SaO2 signals. """ labels = self.signal_headers['label'] units = self.signal_headers['physical_dimension'] ds = OxygenDataset() # List with patterns. patterns = ['SaO2 Sa', 'SaO2 Sp', 'SpO2'] # Not case sensitive. for lab, un in zip(labels, units): if any([p.lower() in lab.lower() for p in patterns]) and '%' in un: ts = self.read_time_series(channel=lab, check_label=False, check_unit=False, **kwargs) ds.append(ts) return ds
[docs] def read_temperature(self, **kwargs): """ Read temperature signal into a TimeSeries object. Raises an error if none or more than 1 temprature signal is found. Args: kwargs: optional keyword arguments for self.read_time_series(). Returns: (nnsa.TimeSeries): TimeSeries with temparature signal. """ labels = self.signal_headers['label'] units = self.signal_headers['physical_dimension'] # List with patterns. patterns = ['temp', 'thuid'] # Not case sensitive. idx = [] for i, (lab, un) in enumerate(zip(labels, units)): if any([p.lower() in lab.lower() for p in patterns]): idx.append(i) ts = self.read_time_series(channel=lab, check_label=False, check_unit=False, **kwargs) if len(idx) != 1: raise ValueError('Found {} temperature channels: {}.'.format(len(idx), np.array(labels)[idx])) return ts
[docs] def read_time_series(self, channel, begin=0, end=None, discontinuous_mode='longest', efficiency='speed', **kwargs): """ Return time series object for given channel. Args: channel (int or str): the channel specifying the signal to read. May either be an integer specifying the order in which the signal appeard in the EDF file (i.e. index), or a string specifying the signal label. begin (float, optional): begin time (in seconds) of the signal to read. By default reading starts at the beginning of the signal (i.e. at 0 seconds). end (float, optional): end time (in seconds) of the signal to read. By default reading ends at the end of the signal. discontinuous_mode (str, optional): how to handle discontinuous data (EDF+D). If 'longest', return the longest continuous segment. If 'all', return a list of all continuous segments. If 'fill' or 'full', merge all sements filling up the gaps with nan. If 'ignore', the distontinuous signal is returned as if it is continuous. See also self._handle_discontinuous_signal(). Defaults to 'longest'. efficiency (str, optional): the algorithm to use for reading: 'speed' uses an algorithm optimized for speed when loading a large portion of the signal (see _read_digital_data_max_speed), 'memory' uses an algorithm that requires the least amount of memory (see _read_digital_data_min_memory). Note that the 'memory' option may be faster than the 'speed' option when reading only a small part of the signal. However, when reading multiple times from the same file (e.g. read multiple signals), 'speed' is probably fastest, even when reading only small parts, since this algorithm stores the raw data of the entire file in memory the first time it's called. **kwargs (optional): optional keyword arguments for the TimeSeries object. Returns: (nnsa.TimeSeries or list): TimeSeries object that holds the (specified part of the) data of the specified channel. In case of EDF+D file and output == 'all', a list with TimeSeries objects is returned. """ # full is same as fill. if discontinuous_mode == 'full': discontinuous_mode = 'fill' # Verify specified channel is in file and convert channel label to channel index if needed. channel = self._check_channel(channel) # Extract information for the creation of a TimeSeries object. label = self.signal_headers['label'][channel] unit = self.signal_headers['physical_dimension'][channel] fs = self.additional_info['fs'][channel] timestamps_datarecord = self._get_timestamps_datarecords(efficiency=efficiency) # Convert the begin and end time to sample indices. if begin is None: begin = 0 if end is None or end > timestamps_datarecord[-1]: end = timestamps_datarecord[-1] tmask_begin = timestamps_datarecord >= begin tmask_end = timestamps_datarecord >= end assert np.any(tmask_begin) and np.any(tmask_end) datarecord_start = np.argmax(tmask_begin) start = datarecord_start * self.signal_headers['num_samples'][channel] stop = np.argmax(tmask_end) * self.signal_headers['num_samples'][channel] # If stop is larger than file (stop will be 0), read until the end of the file. if stop == 0: stop = None # Read the signal. signal = self.read_signal(channel, start=start, stop=stop, discontinuous_mode=discontinuous_mode if discontinuous_mode != 'fill' else 'all', # Fill later in this function. efficiency=efficiency) # Create the info dictionary for TimeSeries. info = {'source': self.filepath} if isinstance(signal, list): # In case of discontinuous data in the EDF+ file, multiple continuous subsignals may be read from the file. time_offsets = self._get_discontinuous_timestamps(begin=begin, end=end) ts_all = [TimeSeries(signal=s, fs=fs, label=label, unit=unit, info=info, time_offset=tos, **kwargs) for s, tos in zip(signal, time_offsets)] if discontinuous_mode == 'fill': # Merge into one TimeSeries. ts = ts_all[0] for ts_i in ts_all[1:]: ts.merge(ts_i, inplace=True) else: # discontinuous_mode == 'all' # Return list of TimeSeries. ts = ts_all else: if self.is_discontinuous: time_offsets = self._get_discontinuous_timestamps(begin=begin, end=end) idx_longest = self._get_idx_longest(begin=begin, end=end, efficiency=efficiency) time_offset = time_offsets[idx_longest] else: time_offset = timestamps_datarecord[datarecord_start] ts = TimeSeries(signal=signal, fs=fs, label=label, unit=unit, info=info, time_offset=time_offset, **kwargs) return ts