"""
Module with Result classes that provide high-level interface for the results of feature extraction algortihms.
"""
import copy
import csv
import os
import pickle
import warnings
import h5py
import numpy as np
from nnsa.utils.segmentation import get_segment_times
from nnsa.io.hdf5 import write_dict_to_hdf5, read_dict_from_hdf5
from nnsa.parameters.parameters import Parameters
from nnsa.utils.dictionaries import flatten_dict, unflatten_dict
from nnsa.utils.objects import basic_repr, convert_to_nnsa_class_callable
from nnsa.utils.paths import check_file_extension, check_filename_exists, check_directory_exists
from nnsa.utils.other import convert_string
__all__ = [
'SUPPORTED_RESULT_FILE_TYPES',
'read_result_from_file',
'ResultBase',
]
# Specify implemented file types for writing and reading ResultBase-derived objects.
SUPPORTED_RESULT_FILE_TYPES = [
'csv',
'hdf5', 'h5', # HDF5
'pickle', 'pkl' # Pickle
]
[docs]def read_result_from_file(filepath, verbose=1):
"""
Read result from file as created by the save_to_file() method of the ResultBase class.
Automatically calls the corresponding read function for the detected file type.
Args:
filepath (str): path to the file containing the result as created by the save_to_file() method of the ResultBase
class. The object returned by this function will be equal to the object that created the file.
The file extension must be included in filepath.
verbose (int, optional): verbose level.
Defaults to 1.
Returns:
result (nnsa.ResultBase): instance of a ResultBase-derived class containing the result.
"""
# Get the file type (raises error if invalid file extension).
file_extension = check_file_extension(filepath, SUPPORTED_RESULT_FILE_TYPES)
if verbose > 0:
print('Reading results from {}...'.format(filepath))
# Call the function corresponding to the file type that does the actual reading.
if file_extension == 'csv':
result = _read_result_from_csv(filepath)
elif file_extension == 'hdf5' or file_extension == 'h5':
result = _read_result_from_hdf5(filepath)
elif file_extension == 'pickle' or file_extension == 'pkl':
result = _read_result_from_pickle(filepath)
else:
# Error is already raised by check_file_extension if invalid extension. If we reach here, we have a bug.
raise AssertionError
if verbose > 0:
print('Done reading! Created {} object.'.format(result.__class__.__name__))
return result
[docs]class ResultBase(object):
"""
Base class for high-level interface for the results of feature extraction algortihms.
For each class in feature_extraction, a corresponding Result class is made to manipulate/visualize the results.
Args:
algorithm_parameters (nnsa.Parameters): Parameters object of the corresponding feature_extraction class.
data_info (str, optional): optional string with information about the data (e.g. source file, preprocessing).
segment_start_times (np.array, optional): start times (in seconds) for the segments.
If None, it is assumed that the segments form a continuous series and start at 0 s. The segment times are
then derived from the segmentation parameters (if present in algorithm parameters,
see self._get_segment_start_times()).
Defaults to None.
segment_end_times (np.array, optional): end times (in seconds) for the segments.
If None, the segment times are derived from segment_start_times and the segment length parameter in the
segmentation parameters (if present in algorithm parameters, see self._get_segment_end_times()).
Defaults to None.
fs (float, optional): sample frequency corresponding to the segments axis, i.e. 1/segment length. In some cases,
the feature extracted is on the same time scale as the original data. In that case, it makes sense to pass
a sample frequency instead of segment start and end times. If fs is given while segment_start_times is not
and no segmentation parameters are found, the time array corresponding to the segments axis will be derived
from fs, assuming the samples are continuous and starting at 0 s.
Defaults to None.
time_offset (float, optional): optional time offset in seconds for the time arrays.
Useful to set the define the time array together with `fs`.
This offset will also be added to the segment start times, even if the segment times are defined explicitly.
Defaults to 0.
Notes:
For the time array corresponding to the segments (if applicable), either specify
- fs and time_offset (if the datapoints are equidstant, continuous and starting at time_offset); or
- segment_start_times and fs (if the datapoints are not equidistant, but do correspond to the same length, (fs=1/segment_length); or
- segment_start_times and segment_end_times (in all other cases. NOTE: do not set segment_start_times == segment_end_times, since
this will be interpreted as a discontinuous signal, see self.is_discontinuous()).
"""
def __init__(self, algorithm_parameters=None, data_info=None, segment_start_times=None,
segment_end_times=None, fs=None, time_offset=0):
if algorithm_parameters is None:
algorithm_parameters = Parameters()
self._algorithm_parameters = algorithm_parameters
self._segment_start_times = segment_start_times
self._segment_end_times = segment_end_times
self.data_info = data_info
self._fs = fs
self._time_offset = time_offset
# Input check.
if segment_start_times is not None and segment_end_times is not None:
if np.array_equal(segment_start_times, segment_end_times):
raise ValueError('`segment_start_times` and `segment_end_times` cannot be the same.')
def __repr__(self):
return basic_repr(self)
def __len__(self):
# Return the number of segment.
return self.num_segments
@property
def algorithm_parameters(self):
"""
Return the parameters of the corresponding feature extraction algortihm.
Returns:
(nnsa.Parameters): Parameters object of the corresponding feature_extraction class.
"""
return self._algorithm_parameters
@property
def fs(self):
"""
Return the sample freqeuncy of the segments.
Returns:
(float): sample frequency (Hz).
"""
if self._fs is None:
# Try to set it using segmentation parameters.
try:
if self.algorithm_parameters['segmentation']['segment_length'] is not None:
seg_len = self.algorithm_parameters['segmentation']['segment_length']
seg_overlap = self.algorithm_parameters['segmentation']['overlap']
self._fs = 1/(seg_len - seg_overlap)
except KeyError:
pass
return self._fs
@property
def num_segments(self):
"""
Return the number of segments/samples.
Returns:
(int): number of segments/samples.
"""
raise NotImplementedError
@property
def segment_end_times(self):
"""
Return the end times of the segments in seconds (starting at 0 s).
Returns:
(np.array): end times (in seconds) for the segments.
"""
if self.num_segments is None:
# No segments.
return None
if self._segment_end_times is None:
# Use default.
return self._get_segment_end_times()
else:
return self._segment_end_times
@segment_end_times.setter
def segment_end_times(self, segment_end_times):
"""
Set the end times of the segments in seconds (starting at 0 s).
Args:
segment_end_times (np.array): end times (in seconds) for the segments.
"""
if self.num_segments != len(segment_end_times):
raise ValueError('Length of segment_end_times ({}) does not match the number of segments ({}).'
.format(len(segment_end_times), self.num_segments))
self._segment_end_times = segment_end_times
@property
def segment_length(self):
"""
Return the segment length in seconds (if the segment lengths is consistent/fixed).
Raises a ValueError if the segment length is not constant.
Returns:
segment_length (float): segment length (in seconds).
"""
if self.num_segments is None:
# No segments.
return None
all_segment_lengths = np.unique(self.segment_end_times - self.segment_start_times)
if np.all(np.abs(all_segment_lengths - all_segment_lengths[0]) < 1e-10): # Due to precision issues, we need to check with a tolerance.
segment_length = np.mean(all_segment_lengths)
else:
raise ValueError('Segment length is not constant.')
return segment_length
@property
def segment_start_times(self):
"""
Return the start times of the segments in seconds (starting at 0 s).
Returns:
(np.array): start times (in seconds) for the segments.
"""
if self.num_segments is None:
# No segments.
return None
if self._segment_start_times is None:
# Use default.
return self._get_segment_start_times()
else:
return self._segment_start_times
@segment_start_times.setter
def segment_start_times(self, segment_start_times):
"""
Set the start times of the segments in seconds (starting at 0 s).
Args:
segment_start_times (np.array): start times (in seconds) for the segments.
"""
if self.num_segments != len(segment_start_times):
raise ValueError('Length of segment_start_times ({}) does not match the number of segments ({}).'
.format(len(segment_start_times), self.num_segments))
self._segment_start_times = segment_start_times
if len(segment_start_times) > 0:
self._time_offset = segment_start_times[0]
@property
def segment_times(self):
"""
Compute the segments time array corresponding to the segment dimensions of the feature array.
Returns:
segment_times (np.ndarray): time (in seconds) array for the axis corresponding to segments.
"""
if self.num_segments is None:
# No segments.
return None
# Take the time in the middle of the segment.
segment_times = (self.segment_start_times + self.segment_end_times) / 2
return segment_times
@property
def time(self):
"""
Time array starting containing start times of the segments.
Shortcut when result is sampled with self.fs.
Returns:
self.segment_start_times
"""
return self.segment_start_times
@property
def time_offset(self):
"""
Return the time offset in seconds for the time array.
Returns:
(float): time offset in seconds.
"""
return self._time_offset
@time_offset.setter
def time_offset(self, time_offset):
"""
Set the time offset in seconds for the time array.
Args:
time_offset (float): time offset in seconds.
"""
if time_offset != self._time_offset:
# Update segment start and end times if needed
if self._segment_start_times is not None:
self._segment_start_times = self._segment_start_times + (time_offset - self._time_offset)
if self._segment_end_times is not None:
self._segment_end_times = self._segment_end_times + (time_offset - self._time_offset)
self._time_offset = time_offset
[docs] def compute_global_features(self, **kwargs):
"""
Compute global features.
Args:
**kwargs (optional): keyword arguments for pd.DataFrame.
Returns:
df (pd.DataFrame): dataframe with one row, and feature values in columns.
"""
raise NotImplementedError
[docs] def is_discontinuous(self):
"""
Check if the result is continuous in time. I.e., the time/segment array does not include jumps in time.
Returns:
(bool): True is the result is discontinuous. False if not.
"""
if self._segment_start_times is not None:
# Do some tests.
disc_tests = list()
# Check if start of new segment is ahead of previous segment's end.
disc_tests.append(any(self.segment_end_times[:-1] - self.segment_start_times[1:] < -1e-8))
# Check if we go back in time.
disc_tests.append(any(np.diff(self.segment_start_times) < 0))
return any(disc_tests)
else:
return False
[docs] def merge(self, other, inplace=False):
"""
Merge other Result objects into one object.
A warning is given if the algorithm parameters of the to be merged results are not the same.
The algorithm parameters and data info of the current object are taken.
Args:
other (list): list with Result objects (all of the same type).
inplace (bool, optional): if True, merges the data inplace by adding it to the current object.
If False, a new object is returned, leaving the original ones unchanged.
Defaults to False.
Returns:
(ResultBase-derived): new Result object containing the merged result (if inplace is False).
"""
if not isinstance(other, list):
other = [other]
if inplace:
result = self
else:
result = copy.deepcopy(self)
for item in other:
# Check class type.
if not isinstance(item, result.__class__):
raise ValueError('Object of type "{}" cannot be merged with object of type "{}".'
.format(type(item), type(result)))
# Check algorithm parameters.
if result.algorithm_parameters != item.algorithm_parameters:
warnings.warn('\nAlgorithm parameters of results to be merged are not equal.')
both_continuous = not result.is_discontinuous() and not item.is_discontinuous()
same_sample_rate = result._fs is not None and result._fs == item._fs
if both_continuous and same_sample_rate:
# Append two equidistantly sampled signals.
# Get the sample frequency and set the time offset.
fs = result._fs
time_offset = result.segment_start_times[0]
result._time_offset = time_offset
# Remove segment times (we dont need them if we have fs and time offset).
result._segment_start_times = None
result._segment_end_times = None
# Get the time offset of the other.
time_offset_other = item.segment_start_times[0]
# Determine the index in the signal array that the first sample of other should be placed.
index = int(round((time_offset_other - time_offset)*fs))
if index < 0:
if inplace:
raise ValueError('`other` should occur after `self` in time to append inplace.')
else:
result = item.merge(result)
else:
# Merge data accordingly (must be implemented in the child class).
try:
result._merge(item, index=index)
except TypeError:
raise NotImplementedError(f'Implement the _merge function in the {result.__class__.__name__} class.')
else:
# Get the new time arrays.
new_segment_start_times = np.concatenate((result.segment_start_times, item.segment_start_times))
new_segment_end_times = np.concatenate((result.segment_end_times, item.segment_end_times))
# Check fs.
if result.fs != item.fs:
warnings.warn('\nfs of results to be merged are not equal.')
# Reset fs to None.
result.fs = None
# Merge data (must be implemented in the child class).
result._merge(item)
# Reset the time arrays.
result.segment_start_times = new_segment_start_times
result.segment_end_times = new_segment_end_times
if not inplace:
return result
[docs] @staticmethod
def read_file(filepath, verbose=1):
# Shortcut.
return read_result_from_file(filepath, verbose=verbose)
[docs] def save_to_file(self, filepath, overwrite=False, verbose=1):
"""
Save ResultBase-derived object to a file.
Automatically selects the corresponding writer function for the detected file type.
The created file can later be read with the function read_result_from_file().
Args:
filepath (str): path to the file to which to save the ResultBase-derived object to.
If no file extension is included in the filepath, the .hdf5 will be added by default
and the result will be saved to an hdf5 file. To save as a different file, specify
a compatible file extension (see SUPPORTED_RESULT_FILE_TYPES).
overwrite (bool): if True, overwrites exisiting file if filepath exists.
If False, raises an error if filepath already exists.
Defaults to False.
verbose (int, optional): verbose level.
"""
if not overwrite:
# Check if the filename exists and raise an error if it does.
check_filename_exists(filepath)
# If no file extension is given, use hdf5 by default.
file_extension = os.path.splitext(filepath)[1]
if not file_extension:
# No extension specified.
filepath = '{}.hdf5'.format(filepath)
# Get the file type (raises error if invalid file extension).
file_extension = check_file_extension(filepath, SUPPORTED_RESULT_FILE_TYPES)
# Check the directory and create if it does not exist.
check_directory_exists(filepath=filepath)
if verbose:
print('Saving {} object to {}...'.format(self.__class__.__name__, filepath))
# Call the function corresponding to the file type that does the actual writing.
if file_extension == 'csv':
self._write_to_csv(filepath)
elif file_extension == 'hdf5' or file_extension == 'h5':
self._write_to_hdf5(filepath)
elif file_extension == 'pickle' or file_extension == 'pkl':
self._write_to_pickle(filepath)
else:
# Error is already raised by check_file_extension if invalid extension. If we reach here, we have a bug.
raise AssertionError
if verbose:
print(f'Saved!')
[docs] def to_file(self, *args, **kwargs):
"""
Shortcut to save_to_file().
"""
return self.save_to_file(*args, **kwargs)
def _extract_epoch(self, mask):
"""
Extracts the segments for which `mask` is True (inplace). Does not return anything.
Notes:
Do not merge the time arrays, this happens in self.extract_epoch().
"""
raise NotImplementedError
def _get_segment_start_times(self):
"""
Return the default start times of the segments in seconds (starting at 0 s).
Assumes that the segments are continuous.
Returns:
(np.array): start times (in seconds) for the segments.
"""
try:
seg_pars = self.algorithm_parameters['segmentation']
segment_start_times = get_segment_times(num_segments=self.num_segments,
segment_length=seg_pars['segment_length'],
overlap=seg_pars['overlap'],
offset=self.time_offset)
except KeyError:
if self.fs is not None:
# If self.fs is specified, use this to determine the time array.
segment_start_times = np.arange(self.num_segments) / self.fs + self.time_offset
else:
raise NotImplementedError('Cannot determine the segment start times automatically. '
'Set the segment_start_times property manually.')
return segment_start_times
def _get_segment_end_times(self):
"""
Return the default end times of the segments in seconds (starting at 0 s).
Returns:
(np.array): end times (in seconds) for the segments.
"""
try:
segment_length = self.algorithm_parameters['segmentation']['segment_length']
except KeyError:
if self.fs is not None:
# If self.fs is specified, use this to determine the time array.
segment_length = 1 / self.fs
else:
raise NotImplementedError('Cannot determine the segment end times automatically. '
'Set the segment_end_times property manually.')
return self.segment_start_times + segment_length
def _merge(self, other, *args, **kwargs):
"""
Merge a result object of the same class as `self` to `self` (inplace).
After calling this function, the object self must contain the concatenated data of `self` and `other`.
Include also some check to see if the `self` and `other` are compatible (e.g. channel labels, data shape).
Notes:
The order of concatenation is important! Concatenate the data in `other` to the end of the data in `self`.
E.g., np.concatenate((self.data, other.data), axis=-1).
Do not merge the time arrays, this happens in self.merge().
Args:
other (ResultBase-derived): ResultBase child of the same class as `self`.
*args, **kwargs: optional arguments depending on the child class.
"""
raise NotImplementedError
@staticmethod
def _read_csv_header(filepath):
"""
Read standard header from csv: class name and algorithm parameters (lines 1-4 of the csv file).
Args:
filepath (str): path to the file containing the result as created by the corresponding ResultBase-derived
class.
Returns:
class_name (str): string specifying the class name of the ResultBase-derived object that is saved in the
file.
algorithm_parameters (nnsa.Parameters): feature extraction Parameters as read from the file.
data_info (str or None): data_info as read from the file.
fs (float or None): sample frequency of the segments as read from the file.
"""
# Open the file and read line by line.
with open(filepath, 'r') as f:
reader = csv.reader(f)
# Line 1: Class name and data_info.
class_name, data_info, fs = next(reader)
data_info = data_info if data_info != '' else None # None is written as '' to csv.
fs = float(fs) if fs != '' else None # None is written as '' to csv.
# Line 2: Flattened Parameters keys.
flat_keys = next(reader)
# Line 3: Parameters value types.
values_types = next(reader)
# Line 4: Parameters values.
values_row = next(reader)
# Convert the values (which are read as strings) to corresponding datatype.
values = [convert_string(v, t) for v, t in zip(values_row, values_types)]
# Create flattened dict.
flat_dict = dict(zip(flat_keys, values))
# Convert to ordinary nested dict.
unflat_dict = unflatten_dict(flat_dict)
# Convert to a Parameters object.
algorithm_parameters = Parameters(**unflat_dict)
return class_name, algorithm_parameters, data_info, fs
@staticmethod
def _read_hdf5_header(filepath):
"""
Read standard header from hdf5.
Args:
filepath (str): path to the file containing the result as created by the corresponding ResultBase-derived
class.
Returns:
class_name (str): string specifying the class name of the ResultBase-derived object that is saved in the
file.
algorithm_parameters (nnsa.Parameters): feature extraction Parameters as read from the file.
data_info (str or None): data_info as read from the file.
segment_start_times (np.array or None): start time of the segments as read from the file.
segment_end_times (np.array or None): end time of the segments as read from the file.
fs (float or None): sample frequency of the segments as read from the file.
time_offset (float or None): time offset in seconds.
"""
with h5py.File(filepath, 'r') as f:
# Read class_name.
class_name = f['header'].attrs['class_name'].decode()
# Read data_info.
data_info = f['header'].attrs['data_info'].decode()
data_info = data_info if data_info != 'None' else None # None is written as 'None' to hdf5.
# Read algorithm parameters as flattened dict.
pars_dict = read_dict_from_hdf5(f, 'header/algorithm_parameters')
# Unflatten dict and convert to a Parameters object.
algorithm_parameters = Parameters(**pars_dict)
# Read segment start times (if exists).
if 'segment_start_times' in f['header']:
segment_start_times = f['header']['segment_start_times'][:]
else:
segment_start_times = None
# Read segment end times (if exists).
if 'segment_end_times' in f['header']:
segment_end_times = f['header']['segment_end_times'][:]
else:
segment_end_times = None
# Read fs (if exists).
if 'fs' in f['header'].attrs:
fs = f['header'].attrs['fs']
else:
fs = None
# Read time offset (if exists).
if 'time_offset' in f['header'].attrs:
time_offset = f['header'].attrs['time_offset']
else:
time_offset = 0
return class_name, algorithm_parameters, data_info, segment_start_times, segment_end_times, fs, time_offset
def _write_csv_header(self, filepath):
"""
Write standard header to csv: class name and algorithm parameters (lines 1-4 of the csv file).
The header consists of the first 4 lines of the csv file:
Line 1: Class name, self.data_info.
Line 2: Flattened Parameters keys.
Line 3: Parameters value types.
Line 4: Parameters values.
Args:
filepath (str): filepath to write to. Must include the .csv file extension.
"""
if self._segment_start_times is not None or self._segment_end_times is not None:
raise NotImplementedError('_segment_start_times and _segment_end_times cannot be saved to CSV. '
'Use HDF5 or pickle.')
# Verify that file extension is csv.
check_file_extension(filepath, valid_extensions='csv')
# Convert algorithm_parameters to flattened dict to make it easier to write.
pars_flat = flatten_dict(self.algorithm_parameters)
# Open the file and write line by line.
with open(filepath, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
# Line 1: Class name, data_info and fs.
writer.writerow([self.__class__.__name__,
self.data_info,
self.fs])
# Line 2: Flattened Parameters keys.
writer.writerow(pars_flat.keys())
# Line 3: Parameters value types.
writer.writerow((type(v).__name__ for v in pars_flat.values()))
# Line 4: Parameters values.
writer.writerow(pars_flat.values())
def _write_hdf5_header(self, filepath):
"""
Write standard header to a new hdf5: class name and algorithm parameters.
The following information is saved to the hdf5 file f:
f['header'].attrs['class_name']: Class name.
f['header'].attrs['data_info']: self.data_info.
f['header/algorithm_parameters'].attrs: Parameters values (with corresponding flattened keys).
f['header/algorithm_parameters_types'].attrs: Parameters types (with corresponding flattened keys).
Args:
filepath (str): filepath to write to. Must include the file extension (.hdf5 or .h5).
"""
with h5py.File(filepath, 'w') as f:
# Create header group for saving class_name and algorithm_parameters.
hdr = f.create_group('header')
# Write class_name (convert to np.string_ type as recommended for compatibility).
hdr.attrs['class_name'] = np.string_(self.__class__.__name__)
# Write data_info (convert to np.string_ type as recommended for compatibility).
hdr.attrs['data_info'] = np.string_(self.data_info)
# Write algorithm_parameters.
write_dict_to_hdf5(f, self.algorithm_parameters, 'header/algorithm_parameters')
# Write segment start times (if defined).
if self._segment_start_times is not None:
hdr['segment_start_times'] = self._segment_start_times
# Write segment end times (if defined).
if self._segment_end_times is not None:
hdr['segment_end_times'] = self._segment_end_times
# Write fs (if defined).
if self._fs is not None:
hdr.attrs['fs'] = self.fs
# Write time offset (if defined).
if self.time_offset is not None:
hdr.attrs['time_offset'] = self.time_offset
def _write_to_pickle(self, filepath):
"""
Write the results to a pickle file. Does not require definition on subclass level as pickle can save any object.
Be careful of using pickles for long-term storage where the underlying code is not highly stable.
Pickles can cause problems if you save a pickle, then update your code and read the pickle in. Attributes added
to your __init__ may not be present in the unpickled object; also, if pickle can't find your class and module
(e.g., if you renamed the module) you will get errors. Fortunately, there exists workarounds.
See https://wiki.python.org/moin/UsingPickle for more info on using pickle.
Args:
filepath (str): path of the file to write to.
The file extension (.pkl or .pickle) must be included in filepath.
"""
with open(filepath, 'wb') as pkl:
pickle.dump(self, pkl)
@staticmethod
def _read_from_csv(filepath):
"""
Read result from csv file (as created by _write_to_csv) and return the corresponding Result-derived class
containing the result.
Args:
filepath (str): path to the file containing the result as created by the save_to_file() method of the
ResultBase class. The object returned by this function will be equal to the object that created the
file. The file extension (.csv) must be included in filepath.
Returns:
result (nnsa.ResultBase-derived): instance of class derived from ResultBase containing the result.
"""
raise NotImplementedError
@staticmethod
def _read_from_hdf5(filepath):
"""
Read result from hdf5 file (as created by _write_to_hdf5) and return the corresponding Result-derived class
containing the result.
Args:
filepath (str): path to the file containing the result as created by the save_to_file() method of the
ResultBase class. The object returned by this function will be equal to the object that created the
file. The file extension (.hdf5 or .h5) must be included in filepath.
Returns:
result (nnsa.ResultBase-derived): instance of a ResultBase-derived class containing the result.
"""
raise NotImplementedError
def _write_to_csv(self, filepath):
"""
Write the contents of the object to a csv file.
Args:
filepath (str): filepath to write to.
The file extension (.csv) must be included in filepath.
"""
raise NotImplementedError
def _write_to_hdf5(self, filepath):
"""
Write the contents of the object to an hdf5 file.
Args:
filepath (str): path of the file to write to.
The file extension (.hdf5 or .h5) must be included in filepath.
"""
raise NotImplementedError
def _read_result_from_csv(filepath):
"""
Read result from csv file (as created by _write_to_csv) and return the corresponding Result-derived class
containing the result.
Args:
filepath (str): path to the csv file containing the result as created by a ResultBase-derived class. The object
returned by this function will be equal to the object that created the file.
Returns:
result (nnsa.ResultBase-derived): instance of a ResultBase-derived class containing the result.
"""
# Read class name from csv header.
class_name = ResultBase._read_csv_header(filepath)[0]
# Convert string to class callable.
result_class = convert_to_nnsa_class_callable(class_name)
# Create instance of the corresponding class, reading the data from the csv.
result = result_class._read_from_csv(filepath)
return result
def _read_result_from_hdf5(filepath):
"""
Read result from hdf5 file (as created by _write_to_hdf5) and return the corresponding Result-derived class
containing the result.
Args:
filepath (str): path to the hdf5 file containing the result as created by a ResultBase-derived class. The object
returned by this function will be equal to the object that created the file.
Returns:
result (nnsa.ResultBase-derived): instance of a ResultBase-derived class containing the result.
"""
# Read class name from hdf5 header.
class_name = ResultBase._read_hdf5_header(filepath)[0]
# Convert string to class callable.
result_class = convert_to_nnsa_class_callable(class_name)
# Create instance of the corresponding class, reading the data from the hdf5.
result = result_class._read_from_hdf5(filepath)
return result
def _read_result_from_pickle(filepath):
"""
Read result from pickle file (as created by _write_to_pickle) and return the corresponding Result-derived class
containing the result.
Args:
filepath (str): path to the pickle file containing the result as created by a ResultBase-derived class. The
object returned by this function will be equal to the object that created the file.
Returns:
result (nnsa.ResultBase-derived): instance of a ResultBase-derived class containing the result.
"""
with open(filepath, 'rb') as pkl:
result = pickle.load(pkl)
return result