import numpy as np import scipy as sc from scipy.stats import norm from scipy.linalg import eigh, eig from scipy.integrate import quad from scipy.special import erf import math import json from pathlib import Path import argparse import matplotlib.pyplot as plt class EigenangleTest(): """ Class to create the null hypothesis for the eigenangle test comparison of correlation matrices or connectivity matrices (for networks as described by Rajan and Abbott 2006), and conduct the comparison for two given matrices. """ def __init__(self, is_connectivity=False, **params): for key, value in params.items(): setattr(self, key, value) if 'N' not in params: raise AttributeError('Number of neurons "N" must be given!') self.is_connectivity = is_connectivity if is_connectivity: # compare connectivity matrices for param in ['f', 'mu', 'epsilon', 'sigma_ex', 'sigma_in']: if not hasattr(self, param) and getattr(self, param) is not None: raise AttributeError(f'{param} not given!') self.mu_ex = self.mu self.mu_in = -self.f * self.mu_ex / (1-self.f) self.var_J_ex = self.var_J(self.mu_ex, self.sigma_ex, self.epsilon) self.var_J_in = self.var_J(self.mu_in, self.sigma_in, self.epsilon) self.beta = self.var_J_in / self.var_J_ex self.chi = self.var_J_in * self.N self.weight_dist = self.rajan_abbott self.critical_radius = np.sqrt(1 - self.f + self.f/self.beta) \ * np.sqrt(self.chi) else: # compare correlation matrices if not hasattr(self, 'bin_num') and getattr(self, 'bin_num') is not None: raise AttributeError('"bin_num" not given!') self.alpha = self.bin_num / self.N if self.alpha < 1: raise ValueError('There need to be more bins than neurons!') self.weight_dist = self.marchenko_pastur def compare(self, matrix_a, matrix_b): if self.N != len(matrix_a) or self.N != len(matrix_b): raise ValueError(f'Matrices are of wrong size (!={self.N})') if self.is_connectivity: eval_a, evec_a = eig(matrix_a) eval_b, evec_b = eig(matrix_b) eval_a, eval_b = np.real(eval_a), np.real(eval_b) eval_a += self.critical_radius eval_b += self.critical_radius eval_a[eval_a<0], eval_b[eval_b<0] = 0, 0 else: eval_a, evec_a = eigh(matrix_a) eval_b, evec_b = eigh(matrix_b) # sort eigenvalues and -vector in ascending order sort_idx_a, sort_idx_b = np.argsort(eval_a), np.argsort(eval_b) sort_idx_a, sort_idx_b = sort_idx_a[::-1], sort_idx_b[::-1] eval_a, eval_b = eval_a[sort_idx_a], eval_b[sort_idx_b] evec_a, evec_b = evec_a.T[sort_idx_a], evec_b.T[sort_idx_b] for i, (eva, evb) in enumerate(zip(evec_a, evec_b)): evec_a[i] = eva * np.sign(eva[np.argmax(np.absolute(eva))]) evec_b[i] = evb * np.sign(evb[np.argmax(np.absolute(evb))]) evec_a[i] /= np.linalg.norm(eva) evec_b[i] /= np.linalg.norm(evb) M = np.dot(evec_a, np.conjugate(evec_b).T) M = np.real(M) M[np.argwhere(M > 1)] = 1. if len(M) == 1: angles = np.arccos(M[0]) else: angles = np.arccos(np.diag(M)) weights = np.sqrt((eval_a**2 + eval_b**2) / 2.) smallness = 1 - angles / (np.pi/2.) weighted_smallness = smallness * weights similarity_score = np.mean(weighted_smallness) self.init_null_distribution() pvalue = self.pvalue(similarity_score) return similarity_score, pvalue def init_null_distribution(self): # init min/max eigenvalues if self.is_connectivity: self.eigenvalue_min = 0 self.eigenvalue_max = 2*self.critical_radius else: self.eigenvalue_min = (1 - np.sqrt(1. / self.alpha)) ** 2 self.eigenvalue_max = (1 + np.sqrt(1. / self.alpha)) ** 2 # init angle distribution norm N = 2*self.N if self.is_connectivity else self.N if N <= 170: self.angle_dist_norm = math.gamma(N/2.) / (np.sqrt(np.pi) \ * math.gamma((N-1)/2)) else: angle_dist = lambda x: np.sin(x)**(N-2) self.angle_dist_norm = 1 / quad(angle_dist, 0, np.pi)[0] # calc norm for rajan_abbott eigenvalue dist if self.is_connectivity: self.eigenvalue_dist_norm = quad(self.eigenvalue_real_dist, -self.critical_radius, self.critical_radius)[0] # init variance of similarity score distribution integrand = lambda x: x**2 * self.weighted_smallness_dist(x) var = quad(integrand, -np.infty, np.infty)[0] self.similarity_score_sigma = np.sqrt(var/self.N) return None def angle_smallness_dist(self, D): if D < -1 or D > 1: return 0 N = 2*self.N if self.is_connectivity else self.N return self.angle_dist_norm * np.pi/2 * np.cos(D*np.pi/2)**(N-2) def angle_dist(self, phi): N = 2*self.N if self.is_connectivity else self.N if phi < 0 or phi > np.pi: return 0 func = lambda p: np.sin(p)**(N-2) if N < 170: norm = sc.special.gamma(N/2.) / (np.sqrt(np.pi) \ * sc.special.gamma((N-1)/2)) else: norm = 1/sc.integrate.quad(func, 0, np.pi)[0] return norm * func(phi) def weighted_smallness_dist(self, D): integrand = lambda x: self.angle_smallness_dist(D/float(x)) \ * self.weight_dist(x) * 1. / np.abs(x) return quad(integrand, self.eigenvalue_min, self.eigenvalue_max)[0] def similarity_score_distribution(self, eta): return sc.stats.norm.pdf(eta, 0, self.similarity_score_sigma) def pvalue(self, eta): sigma = self.similarity_score_sigma # equal to integration of similarity_score_distribution from eta to inf return .5 * (1 + erf(-eta / (sigma * np.sqrt(2)))) def marchenko_pastur(self, x): x_min, x_max = self.eigenvalue_min, self.eigenvalue_max y = self.alpha / (2 * np.pi * x) * np.sqrt((x_max - x) * (x - x_min)) if np.isnan(y): return 0 else: return y def rajan_abbott(self, w): w_shift = w - self.critical_radius return 1/self.eigenvalue_dist_norm * self.eigenvalue_real_dist(w_shift) def eigenvalue_real_dist(self, x): # transform dist for abs(ev) to dist for Re(ev) def transform_func(d): if d**2-x**2 <= 0: return 0 return self.eigenvalue_radius_dist(d) * d/np.sqrt(d**2-x**2) return quad(transform_func, np.abs(x), self.critical_radius)[0] def eigenvalue_radius_dist(self, w): w = w / np.sqrt(self.chi) r = w**2 critical_value = 1 - self.f + self.f/self.beta if r > critical_value: return 0 else: phi_p = self.phi(r, self.beta, self.f, dev=1) phi_pp = self.phi(r, self.beta, self.f, dev=2) return 1/np.pi * (r*phi_pp + phi_p) / np.sqrt(self.chi) def q_func(self, r, a, f, dev=0): ma = 1-a mf = 2*(1-f) A = ma*r + 2*f - 1 B = np.sqrt((ma*r - 1)**2 + 4*f*ma*r) if dev == 0: return (A+B)/mf elif dev == 1: return ma/mf * (1 + A/B) elif dev == 2: return ma**2 / (mf*B) * (1 - A**2/B**2) else: return np.nan def phi(self, r, a, f, dev=1): q = self.q_func(r, a, f, dev=0) qp = self.q_func(r, a, f, dev=1) qpp = self.q_func(r, a, f, dev=2) mq = q + 1 maq = (a*q + 1) / mq A = 1/mq - f/q + a*r/mq - r*(a*q+1)/mq**2 if dev == 1: return qp*A + maq elif dev == 2: return qpp*A \ + 2*qp*(a/mq - maq/mq) \ + qp**2*(-1/mq**2 + f/q**2 - 2*a*r/mq**2 + 2*r*maq/mq**2) else: return np.nan def var_J(self, mu, sigma, epsilon): return epsilon*sigma**2 + epsilon*(1-epsilon)*mu**2 def none_or_X(value, dtype): if value is None or not bool(value) or value == 'None': return None try: return dtype(value) except ValueError: return None none_or_int = lambda v: none_or_X(v, int) none_or_float = lambda v: none_or_X(v, float) bool_arg = lambda s: False if s == 'False' else True if __name__ == '__main__': CLI = argparse.ArgumentParser() CLI.add_argument("--matrix_a", nargs='?', type=Path, required=True) CLI.add_argument("--matrix_b", nargs='?', type=Path, required=True) CLI.add_argument("--output", nargs='?', type=Path, required=True) CLI.add_argument("--N", nargs='?', type=int, required=True) CLI.add_argument("--bin_num", nargs='?', type=none_or_int, default=None) CLI.add_argument("--f", nargs='?', type=none_or_float, default=None) CLI.add_argument("--mu", "--mu_ex", nargs='?', type=none_or_float, default=None) CLI.add_argument("--epsilon", nargs='?', type=none_or_float, default=None) CLI.add_argument("--sigma_ex", nargs='?', type=none_or_float, default=None) CLI.add_argument("--sigma_in", nargs='?', type=none_or_float, default=None) CLI.add_argument("--is_connectivity", nargs='?', type=bool_arg, default=False) CLI.add_argument("--shuffle_neuron_ids", nargs='?', type=bool_arg, default=False) args, unknown = CLI.parse_known_args() params = dict([(k,v) for k,v in vars(args).items() if k not in ['matrix_a', 'matrix_b', 'output']]) matrix_a = np.nan_to_num(np.load(args.matrix_a)) matrix_b = np.nan_to_num(np.load(args.matrix_b)) if args.shuffle_neuron_ids: neuron_ids = np.arange(args.N) np.random.shuffle(neuron_ids) matrix_b = matrix_b[neuron_ids, :][:, neuron_ids] eigenangle_test = EigenangleTest(**params) score, pvalue = eigenangle_test.compare(matrix_a, matrix_b) # print(score, pvalue) result = {'score':score, 'pvalue':pvalue} args.output.parent.mkdir(exist_ok=True, parents=True) json.dump(result, open(args.output, 'w'))