"""
Algorithms for extraction of connectivity features.
"""
import copy
import csv
import os
import sys
import warnings
import h5py
import networkx as nx
import numpy as np
import pyprind
import scipy.signal
import matplotlib.pyplot as plt
import pandas as pd
from nnsa.artefacts.artefact_detection import default_eeg_signal_quality_criteria
from nnsa.feature_extraction.common import check_multichannel_data_matrix, preprocess_segment, \
aggregate_segment_features
from nnsa.feature_extraction.frequency_analysis import _determine_welch_kwargs
from nnsa.feature_extraction.result import ResultBase, read_result_from_file
from nnsa.parameters.parameters import ClassWithParameters, Parameters
from nnsa.utils.segmentation import compute_n_segments, segment_generator, get_segment_times
from nnsa.utils.config import HORIZONTAL_RULE
from nnsa.utils.other import enumerate_label, convert_string_auto
__all__ = [
'CoherenceGraph',
'CoherenceGraphResult',
'compute_coherence_graph',
'compute_average_path_length',
'compute_dist_matrix',
'draw_graph'
]
[docs]class CoherenceGraph(ClassWithParameters):
"""
Class for computing a connectivity graph of multi-channel EEG based on coherence.
References:
This code was partly ported from Mario Lavanga and Ofelie de Wel's MATLAB code.
@Article{LAVANGA2018,
author = {M Lavanga and O De Wel and A Caicedo and K Jansen and A Dereymaeker and G Naulaers and S Van Huffel},
title = {A brain-age model for preterm infants based on functional connectivity},
journal = {Physiological Measurement},
year = {2018},
volume = {39},
number = {4},
pages = {044006},
month = {apr},
abstract = {Objective: In this study, the development of EEG functional connectivity during early development has been investigated in order to provide a predictive age model for premature infants. Approach: The functional connectivity has been assessed via the coherency function (its imaginary part (ImCoh) and its mean squared magnitude (MSC)), the phase locking value () and the Hilbert–Schimdt dependence (HSD) in a dataset of 30 patients, partially described and employed in previous studies (Koolen et al 2016 Neuroscience 322 298–307; Lavanga et al 2017 Complexity 2017 1–13). Infants’ post-menstrual age (PMA) ranges from 27 to 42 weeks. The topology of the EEG couplings has been investigated via graph-theory indices. Main results: Results show a sharp decrease in ImCoh indices in θ, (4–8) Hz and α, (8–16) Hz bands and MSC in β, (16–32) Hz band with maturation, while a more modest positive correlation with PMA is found for HSD, and MSC in , θ, α bands. The best performances for the PMA prediction were mean absolute error equal to 1.51 weeks and adjusted coefficient of determination equal to 0.8. Significance: The reported findings suggest a segregation of the cortex connectivity, which favours a diffused tasks architecture on the brain scalp. In summary, the results indicate that the neonates’ brain development can be described via lagged-interaction network features.},
doi = {10.1088/1361-6579/aabac4},
file = {:LAVANGA2018 - A Brain Age Model for Preterm Infants Based on Functional Connectivity.pdf:PDF},
publisher = {{IOP} Publishing},
url = {https://doi.org/10.1088%2F1361-6579%2Faabac4},
}
Main method: coherence_graph()
Examples:
>>> np.random.seed(0)
>>> x = np.random.rand(8, 10000)
>>> cg = CoherenceGraph()
>>> print(type(cg.parameters).__name__)
Parameters
>>> cg_result = cg.coherence_graph(x, fs=250, verbose=0)
>>> print(type(cg_result).__name__)
CoherenceGraphResult
>>> cg_result.mean_coh['theta'][1, 0, 0]
0.07487598638814418
"""
[docs] @staticmethod
def default_parameters():
"""
Return the default parameters.
Returns:
(nnsa.Parameters): a default set of parameters for the object.
"""
# Parameters for segmentation of EEG recording into small time segments/epochs.
segmentation = Parameters(**{
# Segment length in seconds:
'segment_length': 60,
# Overlap in segments in seconds:
'overlap': 0,
})
# Parameters for artefact detection/exclusion, see
# nnsa.artefacts.artefact_detection.default_eeg_signal_quality_criteria()
artefact_criteria = Parameters(**default_eeg_signal_quality_criteria())
artefact_criteria.update(max_fraction_of_artefact_channels=0)
# Specify a filter for filtering the segments, see nnsa.preprocessing.filter.Filter().
# Specify None to not filter the segments.
segment_filter_specification = None
# Frequency bands for which to compute the power.
frequency_bands = Parameters(**{
'delta_1': [0, 2],
'delta_2': [2, 4],
'theta': [4, 8],
'alpha': [8, 16],
'beta': [16, 32],
'all': [0, 32],
})
# Parameters for (cross-)power spectral density.
# See nnsa.feature_extraction.frequency_analysis._determine_welch_kwargs().
csd = Parameters(**{
# Window to use. MATLAB's default is 'hamming' (note that scipy's default is 'hanning'):
'window': 'hamming',
# Length of window for Welch's method in seconds:
'window_length': 1,
# Fraction of overlap between windows:
'overlap_fraction': 0.7,
# Maximum accepted/desired frequency resolution in Hz (will determine the amount of zero-padding for DFT):
'df_max': 0.015625,
# Detrend (see scipy.signal.welch()). In MATLAB there is no detrending, so default to False.
'detrend': False
})
pars = {
'segmentation': segmentation,
'artefact_criteria': artefact_criteria,
'segment_filter_specification': segment_filter_specification,
'frequency_bands': frequency_bands,
'csd': csd,
}
return Parameters(**pars)
[docs] def coherence_graph(self, *args, **kwargs):
"""
Alias for self.process().
"""
return self.process(*args, **kwargs)
[docs] def process(self, data_matrix, fs, channel_labels=None, verbose=1):
"""
Compute the coherence graph of multichannel data as designed by Mario Lavanga and Ofelie De Wel.
Code was ported from MATLAB and constsist of 6 steps:
1) Segmentation
2) Optional filtering
3) Artefact exclusion
4) PSD estimation
5) Computation of coherence in specified frequency bands per segment
6) Averaging the coherence in the specified frequency bands
Args:
data_matrix (np.ndarray): see check_multichannel_data_matrix().
fs (float): sample frequency of the EEG signals.
channel_labels (list of str, optional): see check_multichannel_data_matrix().
verbose (int, optional): verbose level.
Defaults to 1.
Returns:
(nnsa.CoherenceGraphResult): object containing the adjacency matrices per frequency band per segment,
where the channels are the nodes.
"""
# Check input.
data_matrix, channel_labels = check_multichannel_data_matrix(data_matrix, channel_labels)
if verbose > 0:
print(HORIZONTAL_RULE)
print('Running coherence_graph with parameters:')
print(self.parameters)
# Extract some parameters.
seg_pars = self.parameters['segmentation']
filter_specification = self.parameters['segment_filter_specification']
artefact_criteria = self.parameters['artefact_criteria']
frequency_bands = self.parameters['frequency_bands']
n_channels = data_matrix.shape[0]
n_bands = len(frequency_bands)
dtype = data_matrix.dtype
# Determine the Welch kwargs from the csd parameters.
welch_kwargs = _determine_welch_kwargs(self.parameters['csd'], fs)
# Segment the data in the time axis (create a generator).
seg_generator = segment_generator(data_matrix, segment_length=seg_pars['segment_length'],
overlap=seg_pars['overlap'], fs=fs, axis=-1)
n_segments = compute_n_segments(data_matrix, segment_length=seg_pars['segment_length'],
overlap=seg_pars['overlap'], fs=fs, axis=-1)
# Initialize progress bar.
bar = pyprind.ProgBar(n_segments, stream=sys.stdout)
# Loop over segments.
mean_coh_tensor = np.empty((n_bands, n_channels, n_channels, n_segments), dtype=dtype)
im_coh_tensor = np.empty_like(mean_coh_tensor)
for i_seg, seg in enumerate(seg_generator):
# Preprocess segment (demean, optionally filter, find channels to exclude).
seg, exclude_mask = preprocess_segment(seg, fs,
filter_specification=filter_specification,
artefact_criteria=artefact_criteria)
# Estimate PSD for each channel.
if not all(exclude_mask):
frequencies, psd_mat = scipy.signal.welch(seg, fs=fs, axis=-1, **welch_kwargs)
df = np.diff(frequencies[:2])[0]
else:
frequencies, psd_mat = (None, None)
# Loop over channels.
mean_coh_matrix = np.empty((n_bands, n_channels, n_channels), dtype=dtype)
im_coh_matrix = np.empty_like(mean_coh_matrix)
for j_channel, excl, signal in zip(range(n_channels), exclude_mask, seg):
# If channel is to be excluded, skip and use NaN to indicate artefacted channel.
if excl:
mean_coh_matrix[:, j_channel] = np.nan
im_coh_matrix[:, j_channel] = np.nan
else:
# Extract psd's.
pow_xx = psd_mat[j_channel]
pow_yy = psd_mat
# Welch's method for estimation of cross-power spectral density.
pow_xy = scipy.signal.csd(signal, seg, fs=fs, axis=-1, **welch_kwargs)[1]
# Magnitude squared coherence estimate.
coh_xy = np.abs(pow_xy) ** 2 / pow_xx / pow_yy
# Imaginary part of the coherence (lagged interactions).
im_coh = np.imag(pow_xy/np.sqrt(pow_xx * pow_yy))
# Loop over frequency bands and compute mean coh_xy and maximum lagged interaction.
for i_band, band_limits in enumerate(frequency_bands.values()):
f_low, f_high = band_limits
mean_coh_matrix[i_band, j_channel] = self._compute_mean_coh(coh_xy, f_low=f_low,
f_high=f_high, df=df)
im_coh_matrix[i_band, j_channel] = self._compute_max_im_coh(im_coh, f_low=f_low,
f_high=f_high, df=df)
mean_coh_tensor[:, :, :, i_seg] = mean_coh_matrix
im_coh_tensor[:, :, :, i_seg] = im_coh_matrix
# Update progress bar.
if verbose > 0:
bar.update()
# Create dictionary with frequency band labels as keys and (channels, segments) arrays as values.
mean_coh = dict(zip(frequency_bands.keys(), (mc for mc in mean_coh_tensor)))
im_coh = dict(zip(frequency_bands.keys(), (ic for ic in im_coh_tensor)))
# Return as a CoherenceGraphResult object.
return CoherenceGraphResult(mean_coh, im_coh, channel_labels=channel_labels,
algorithm_parameters=self.parameters)
@staticmethod
def _compute_mean_coh(coh_xy, f_low, f_high, df):
"""
Compute the mean absolute squared coherence per channel in the specified frequency band.
Args:
coh_xy (np.ndarray): absolute squared cross-power spectral coherence with dimensions
(channels, frequencies).
f_low (float): lower limit of the frequency band.
f_high (float): upper limit of the frequency band.
df (float): frequency resolution (assumes that the frequencies corresponding to the 2nd axis of coh_xy
start at 0 and increase with increments of df).
Returns:
(np.ndarray): array with dimensions (channels,) containing the mean coherence per channel.
"""
# Find indices in coh_xy that corresponds to f_low and f_high.
idx_low = int(np.ceil(f_low / df))
idx_high = int(np.floor(f_high / df))
# Extract the frequency band values and take the mean.
coh_band = coh_xy[:, idx_low: idx_high]
return np.mean(coh_band, axis=-1)
@staticmethod
def _compute_max_im_coh(im_coh, f_low, f_high, df):
"""
Compute the largest (- or +) imaginary part of the coherence per channel in the specified frequency band.
Args:
im_coh (np.ndarray): imaginary part of the coherence with dimensions (channels, frequencies).
f_low (float): lower limit of the frequency band.
f_high (float): upper limit of the frequency band.
df (float): frequency resolution (assumes that the frequencies corresponding to the 2nd axis of im_cho
start at 0 and increase with increments of df).
Returns:
im_coh_max (np.ndarray): array with dimensions (channels,) containing the imaginary part of the coherence
with the largest magnitude per channel.
"""
# Find indices in coh_xy that corresponds to f_low and f_high.
idx_low = int(np.ceil(f_low / df))
idx_high = int(np.floor(f_high / df))
# Extract the frequency band values.
im_coh_band = im_coh[:, idx_low: idx_high]
# Find the values with the largest magnitude.
idx = np.argmax(np.abs(im_coh_band), axis=-1)
im_coh_max = [c[i] for c, i in zip(im_coh_band, idx)]
return np.array(im_coh_max)
[docs]class CoherenceGraphResult(ResultBase):
"""
High-level interface for processing the results of a coherence analysis as created by nnsa.CoherenceGraph().
Args:
mean_coh (dict): dictionary with frequency band labels as keys and (channels, channels, segments) arrays with
channel-wise absolute squared coherence as values.
im_coh (dict): dictionary with frequency band labels as keys and (channels, channels, segments) arrays with
imaginary part of the channel-wise coherence as values.
algorithm_parameters (nnsa.Parameters): see ResultBase.
channel_labels (list of str, optional): labels of the channels corresponding to the rows of the values in power.
If None, default labels will be created.
Default is None.
data_info (str, optional): see ResultBase.
segment_start_times (np.array, optional): see ResultBase.
segment_end_times (np.array, optional): see ResultBase.
fs (flaot, optional): see ResultBase.
"""
def __init__(self, mean_coh, im_coh, algorithm_parameters, channel_labels=None, data_info=None,
segment_start_times=None, segment_end_times=None, fs=None):
super().__init__(algorithm_parameters=algorithm_parameters, data_info=data_info,
segment_start_times=segment_start_times, segment_end_times=segment_end_times, fs=fs)
# Input check.
data_shape = next((p for p in mean_coh.values())).shape
if channel_labels is None:
channel_labels = enumerate_label(data_shape[0], label='Channel')
elif len(channel_labels) != data_shape[0]:
raise ValueError('Length of channel_labels ({}) does not correspond to the shape of the data {}.'
.format(len(channel_labels), data_shape))
if segment_start_times is not None and len(segment_start_times) != data_shape[-1]:
raise ValueError('Length of segment_start_times ({}) is not equal to the number of segments in the data '
'({})'.format(len(segment_start_times), data_shape[-1]))
# Store variables that are not already stored by the parent class (ResultBase).
self.mean_coh = mean_coh
self.im_coh = im_coh
self.channel_labels = channel_labels
@property
def num_segments(self):
"""
Return the number of segments.
Returns:
(int): number of segments.
"""
return next((p.shape[-1] for p in self.mean_coh.values()))
[docs] def compute_average_coherence(self, graph_name):
"""
Compute average coherence value across channels per segment.
Args:
graph_name (str): see self._get_graphs().
Returns:
averages (dict): average coherence values of graphs per frequency band per segment. The items in the dict
correspond to the frequency bands. The arrays in the dict are 1D with the dimension corresponding to
the segments.
"""
# Get attribute with the requested graphs.
graphs_per_band = self._get_graphs(graph_name)
# Loop over frequency bands.
averages = dict()
for freq_band, graphs in graphs_per_band.items():
# Average across channels. Take the absolute values, so that we do not average out - and + values.
averages[freq_band] = np.nanmean(np.abs(graphs.reshape(-1, graphs.shape[-1])), axis=0)
return averages
[docs] def compute_average_path_lengths(self, graph_name, graph_type):
"""
Compute average path lengths of the graphs.
Args:
graph_name (str): name of the graphs to analyze: 'mean_coh' or 'im_coh'.
graph_type (str): see compute_average_path_length().
Returns:
path_lengths (dict): average path lengths of graphs per frequency band per segment. The items in the dict
correspond to the frequency bands. The arrays in the dict are 1D with the dimension corresponding to
the segments.
"""
# Get attribute with the requested graphs.
graphs_per_band = self._get_graphs(graph_name)
# Loop over frequency bands.
path_lengths = dict()
for freq_band, graphs in graphs_per_band.items():
# Loop over segments and compute path length for each segment.
path_lengths[freq_band] = np.array([compute_average_path_length(graphs[:, :, i], graph_type=graph_type)
for i in range(graphs.shape[-1])])
return path_lengths
[docs] def compute_global_features(self, **kwargs):
"""
Compute global features.
Args:
**kwargs (optional): keyword arguments for pd.DataFrame.
Returns:
df (pd.DataFrame): dataframe with one row, and feature values in columns.
"""
# Compute (local) features.
features = dict()
# The following features are dicts (items correspond to frequency bands) containing 1D arrays (corresponding to
# segments).
features['COH_average_mc'] = self.compute_average_coherence('mean_coh')
features['COH_average_ic'] = self.compute_average_coherence('im_coh')
# Combine all segments to get global features (do not distinguish sleep stages).
global_features = dict()
for feature_name, d in features.items():
for power_band, val in d.items():
# Take median across segments.
global_features['{}_{}'.format(feature_name, power_band)] = np.nanmedian(val)
df = pd.DataFrame(global_features, **kwargs)
return df
[docs] def plot(self, which):
"""
Plot abs(mean_coh), abs(im_coh) and binary im_coh graph (averaged over segments) in the current figure.
Args:
which (str): which result to plot. Should be one of the keys in self.mean_coh and self.im_coh dicts.
"""
channel_labels = self.channel_labels
mean_adj_mat = np.nanmean(self.mean_coh[which], axis=-1)
mean_adj_mat -= np.identity(len(mean_adj_mat)) # The ones on the diagonal influence the color range.
G = nx.Graph(np.array(abs(mean_adj_mat)))
nx.relabel_nodes(G, mapping=dict(enumerate(channel_labels)), copy=False)
plt.subplot(3, 1, 1)
draw_graph(G)
plt.title('mean_coh', fontsize=16)
mean_adj_mat = np.nanmean(self.im_coh[which], axis=-1)
G = nx.Graph(np.array(abs(mean_adj_mat)))
nx.relabel_nodes(G, mapping=dict(enumerate(channel_labels)), copy=False)
plt.subplot(3, 1, 2)
draw_graph(G)
plt.title('im_coh', fontsize=16)
mean_adj_mat = np.nanmean(self.im_coh[which], axis=-1)
adj_bin = np.sign(mean_adj_mat)
adj_bin[adj_bin <= 0] = 0
G = nx.DiGraph(np.array(adj_bin))
nx.relabel_nodes(G, mapping=dict(enumerate(channel_labels)), copy=False)
plt.subplot(3, 1, 3)
nx.draw_circular(G,
node_color='skyblue',
node_size=2000,
with_labels=True,
width=1.0)
plt.title('im_coh_bin', fontsize=16)
plt.suptitle(which+'\n')
def _get_graphs(self, graph_name):
"""
Helper function to select the requested graphs.
Args:
graph_name (str): name of the graphs to analyze: 'mean_coh' or 'im_coh'.
Returns:
graphs (dict): one of the attributes self.mean_coh or self.im_coh.
"""
if graph_name == 'mean_coh':
graphs = self.mean_coh
elif graph_name == 'im_coh':
graphs = self.im_coh
else:
raise ValueError('Invalid graph_name argument "{}". Choose from {}.'
.format(graph_name, ['mean_coh', 'im_coh']))
return graphs
def _merge(self, other, index=None):
"""
See ResultBase.
"""
# Check if the channel labels of self and other are the same.
if self.channel_labels != other.channel_labels:
raise ValueError('Cannot merge results with different channel labels.')
if index is not None:
for graph, other_graph in zip([self.mean_coh, self.im_coh],
[other.mean_coh, other.im_coh]):
for key in graph.keys():
n1, n2, n_segments = graph[key].shape
if index < n_segments:
# Cut piece off.
msg = 'Overwriting data while merging.'
warnings.warn(msg)
graph[key] = graph[key][:, :, :index]
else:
# Add nans.
graph[key] = np.concatenate(
[graph[key], np.full((n1, n2, index - n_segments),
fill_value=np.nan)], axis=-1)
for graph, other_graph in zip([self.mean_coh, self.im_coh],
[other.mean_coh, other.im_coh]):
for key in graph.keys():
graph[key] = np.concatenate((graph[key], other_graph[key]),
axis=-1)
@staticmethod
def _read_from_csv(filepath):
"""
Read result from csv file into a CoherenceGraphResult class.
Args:
filepath (str): see ResultBase._read_from_csv().
Returns:
result (nnsa.CoherenceGraphResult): instance of CoherenceGraphResult containing the CoherenceGraph result.
"""
# Lines 1-4: Standard csv header (use the ResultBase method).
algorithm_parameters, data_info, fs = ResultBase._read_csv_header(filepath)[1:]
# Re-open the file and read the rest of the file, line by line.
with open(filepath, 'r') as f:
reader = csv.reader(f)
# Lines 1-4: Standard csv header (already read, skip).
[next(reader) for i in range(4)]
# Line 5: Non-array data header (skip).
assert(next(reader) == ['channel_labels', 'graph[key].shape'])
# Line 6: Non-array data.
channel_labels, data_shape = [convert_string_auto(i) for i in next(reader)]
# Line 7: Array header.
frequency_bands = next(reader)
# Line 8-... : Array data (flattened). Each graph tensor per frequency band is saved on one row.
mean_coh = dict()
for band in frequency_bands:
array_as_list = [float(i) for i in next(reader)]
# Convert to numpy array and reshape.
mean_coh[band] = np.reshape(array_as_list, data_shape)
im_coh = dict()
for band in frequency_bands:
array_as_list = [float(i) for i in next(reader)]
# Convert to numpy array and reshape.
im_coh[band] = np.reshape(array_as_list, data_shape)
# Create a result object.
result = CoherenceGraphResult(mean_coh, im_coh, channel_labels=channel_labels,
algorithm_parameters=algorithm_parameters, data_info=data_info, fs=fs)
return result
@staticmethod
def _read_from_hdf5(filepath):
"""
Read result from hdf5 file into a CoherenceGraphResult class.
Args:
filepath (str): see ResultBase._read_from_hdf5().
Returns:
result (nnsa.CoherenceGraphResult): instance of CoherenceGraphResult containing the CoherenceGraph result.
"""
# Read standard csv header (use the ResultBase method).
algorithm_parameters, data_info, segment_start_times, segment_end_times, fs, time_offset =\
ResultBase._read_hdf5_header(filepath)[1:]
# Re-open the file and read the rest of the file.
with h5py.File(filepath, 'r') as f:
# Read array data.
mean_coh = dict()
for band in f['mean_coh'].keys():
mean_coh[band] = f['mean_coh'][band][:]
im_coh = dict()
for band in f['im_coh'].keys():
im_coh[band] = f['im_coh'][band][:]
# Read non-array data.
channel_labels = [label.decode() for label in f['mean_coh'].attrs['channel_labels']]
# Create a result object.
result = CoherenceGraphResult(mean_coh, im_coh, channel_labels=channel_labels,
algorithm_parameters=algorithm_parameters, data_info=data_info,
segment_start_times=segment_start_times,
segment_end_times=segment_end_times,
fs=fs)
return result
def _write_to_csv(self, filepath):
"""
Write the contents of the object to a csv file.
Args:
filepath (str): see ResultBase._write_to_csv().
"""
# Lines 1-4: Standard csv header (use the ResultBase method).
self._write_csv_header(filepath)
# Append attributes to the csv file, line by line.
with open(filepath, 'a', newline='') as csvfile:
writer = csv.writer(csvfile)
# Line 5: Non-array data header.
writer.writerow(['channel_labels', 'graph[key].shape'])
# Line 6: Non-array data.
data_shape = next((g for g in self.mean_coh.values())).shape
writer.writerow([self.channel_labels, data_shape])
# Line 7: Array header.
writer.writerow(self.mean_coh.keys())
# Line 8-... : Array data (flattened). Each graph tensor per frequency band is saved on one row.
for g in self.mean_coh.values():
writer.writerow(g.reshape(-1).tolist())
for g in self.im_coh.values():
writer.writerow(g.reshape(-1).tolist())
def _write_to_hdf5(self, filepath):
"""
Write the contents of the object to an hdf5 file.
Args:
filepath (str): see ResultBase._write_to_hdf5().
"""
# Write standard hdf5 header (use the ResultBase method).
self._write_hdf5_header(filepath)
# Append attributes to the hdf5 file.
with h5py.File(filepath, 'a') as f:
# Write array data.
for band, data in self.mean_coh.items():
f.create_dataset('mean_coh/{}'.format(band), data=data)
for band, data in self.im_coh.items():
f.create_dataset('im_coh/{}'.format(band), data=data)
# Write non-array data as attributes to the 'mean_coh' group.
# Convert strings to np.string_ type as recommended for compatibility.
f['mean_coh'].attrs['channel_labels'] = [np.string_(label) for label in self.channel_labels]
[docs]def compute_average_path_length(con_matrix, graph_type):
"""
Compute the average path length of connectivity matrix.
Args:
con_matrix (np.ndarray): connectivity matrix.
graph_type (str): specify type of graph: 'functional', 'effective', 'binary'.
Returns:
path_length (float): average path length.
Notes:
For 'functional' graph_type, the weights in the connectivity matrix are inverted, i.e. w -> 1/w.
For 'binary' graph_type, positive weights are converted to ones and zero and negative weights to zero.
"""
if graph_type == 'functional':
# Convert connectivity weights to (path) lengths, where high connectivity maps to short length.
length_matrix = np.empty(con_matrix.shape)
nonzero_mask = con_matrix != 0
length_matrix[nonzero_mask] = 1 / abs(con_matrix[nonzero_mask])
length_matrix[~nonzero_mask] = np.inf
# Compute distance matrix.
dist_matrix = compute_dist_matrix(length_matrix, weighted=True)
elif graph_type == 'effective':
raise NotImplementedError
elif graph_type == 'binary':
# Create binary/unweighted graph, where positive weight corresponds to an edge, and negative connectivity to
# no edge.
adj_bin = np.sign(con_matrix)
adj_bin[adj_bin <= 0] = 0
# Compute distance matrix.
dist_matrix = compute_dist_matrix(adj_bin, weighted=False)
else:
raise ValueError('Invalid graph_type argument "{}". Choose from {}'
.format(graph_type, ['functional', 'effective', 'binary']))
# Average path length/distance.
dist_matrix = _remove_diagonal(dist_matrix)
path_length = dist_matrix[dist_matrix != np.inf].mean()
return path_length
[docs]def compute_dist_matrix(adj_matrix, weighted=True):
"""
Compute the matrix containing the shortest paths between nodes in the graph with given adjacency matrix.
Args:
adj_matrix (np.ndarray): adjacency matrix of the graph.
weighted (bool): if True, use the Dijkstra algorithm for a weighted graph.
If False, use the breath-first search algorithm for an unweighted graph.
Defaults to True.
Returns:
dist_matrix (np.ndarray): distance matrix of the graph (same shape as adj_matrix).
Notes:
Distances between disconnected nodes are set to Inf.
Distances on the main diagonal are 0.
"""
G = nx.DiGraph(np.array(adj_matrix))
if weighted:
path_lengths = nx.all_pairs_dijkstra_path_length(G)
else:
path_lengths = nx.all_pairs_shortest_path_length(G)
dist_matrix = np.full(np.shape(adj_matrix), np.inf)
for i, p in path_lengths:
for j, v in p.items():
dist_matrix[i, j] = v
return dist_matrix
[docs]def draw_graph(G):
edge_weights = [d['weight'] for u, v, d in G.edges(data=True)]
nx.draw_circular(G,
node_color='skyblue',
node_size=2000,
edge_color=edge_weights,
with_labels=True,
width=10.0,
edge_cmap=plt.cm.Blues)
def _remove_diagonal(x):
return x[~np.eye(x.shape[0],dtype=bool)].reshape(x.shape[0], -1)