123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- import numpy as np
- import scipy as sc
- from scipy.stats import norm
- from scipy.linalg import eigh, eig
- from scipy.integrate import quad
- from scipy.special import erf
- import math
- import json
- from pathlib import Path
- import argparse
- import matplotlib.pyplot as plt
- class EigenangleTest():
- """
- Class to create the null hypothesis for the eigenangle test comparison of
- correlation matrices or connectivity matrices (for networks as described by
- Rajan and Abbott 2006), and conduct the comparison for two given matrices.
- """
- def __init__(self, is_connectivity=False, **params):
- for key, value in params.items():
- setattr(self, key, value)
- if 'N' not in params:
- raise AttributeError('Number of neurons "N" must be given!')
- self.is_connectivity = is_connectivity
- if is_connectivity: # compare connectivity matrices
- for param in ['f', 'mu', 'epsilon', 'sigma_ex', 'sigma_in']:
- if not hasattr(self, param) and getattr(self, param) is not None:
- raise AttributeError(f'{param} not given!')
- self.mu_ex = self.mu
- self.mu_in = -self.f * self.mu_ex / (1-self.f)
- self.var_J_ex = self.var_J(self.mu_ex, self.sigma_ex, self.epsilon)
- self.var_J_in = self.var_J(self.mu_in, self.sigma_in, self.epsilon)
- self.beta = self.var_J_in / self.var_J_ex
- self.chi = self.var_J_in * self.N
- self.weight_dist = self.rajan_abbott
- self.critical_radius = np.sqrt(1 - self.f + self.f/self.beta) \
- * np.sqrt(self.chi)
- else: # compare correlation matrices
- if not hasattr(self, 'bin_num') and getattr(self, 'bin_num') is not None:
- raise AttributeError('"bin_num" not given!')
- self.alpha = self.bin_num / self.N
- if self.alpha < 1:
- raise ValueError('There need to be more bins than neurons!')
- self.weight_dist = self.marchenko_pastur
- def compare(self, matrix_a, matrix_b):
- if self.N != len(matrix_a) or self.N != len(matrix_b):
- raise ValueError(f'Matrices are of wrong size (!={self.N})')
- if self.is_connectivity:
- eval_a, evec_a = eig(matrix_a)
- eval_b, evec_b = eig(matrix_b)
- eval_a, eval_b = np.real(eval_a), np.real(eval_b)
- eval_a += self.critical_radius
- eval_b += self.critical_radius
- eval_a[eval_a<0], eval_b[eval_b<0] = 0, 0
- else:
- eval_a, evec_a = eigh(matrix_a)
- eval_b, evec_b = eigh(matrix_b)
- # sort eigenvalues and -vector in ascending order
- sort_idx_a, sort_idx_b = np.argsort(eval_a), np.argsort(eval_b)
- sort_idx_a, sort_idx_b = sort_idx_a[::-1], sort_idx_b[::-1]
- eval_a, eval_b = eval_a[sort_idx_a], eval_b[sort_idx_b]
- evec_a, evec_b = evec_a.T[sort_idx_a], evec_b.T[sort_idx_b]
- for i, (eva, evb) in enumerate(zip(evec_a, evec_b)):
- evec_a[i] = eva * np.sign(eva[np.argmax(np.absolute(eva))])
- evec_b[i] = evb * np.sign(evb[np.argmax(np.absolute(evb))])
- evec_a[i] /= np.linalg.norm(eva)
- evec_b[i] /= np.linalg.norm(evb)
- M = np.dot(evec_a, np.conjugate(evec_b).T)
- M = np.real(M)
- M[np.argwhere(M > 1)] = 1.
- if len(M) == 1:
- angles = np.arccos(M[0])
- else:
- angles = np.arccos(np.diag(M))
- weights = np.sqrt((eval_a**2 + eval_b**2) / 2.)
- smallness = 1 - angles / (np.pi/2.)
- weighted_smallness = smallness * weights
- similarity_score = np.mean(weighted_smallness)
- self.init_null_distribution()
- pvalue = self.pvalue(similarity_score)
- return similarity_score, pvalue
- def init_null_distribution(self):
- # init min/max eigenvalues
- if self.is_connectivity:
- self.eigenvalue_min = 0
- self.eigenvalue_max = 2*self.critical_radius
- else:
- self.eigenvalue_min = (1 - np.sqrt(1. / self.alpha)) ** 2
- self.eigenvalue_max = (1 + np.sqrt(1. / self.alpha)) ** 2
- # init angle distribution norm
- N = 2*self.N if self.is_connectivity else self.N
- if N <= 170:
- self.angle_dist_norm = math.gamma(N/2.) / (np.sqrt(np.pi) \
- * math.gamma((N-1)/2))
- else:
- angle_dist = lambda x: np.sin(x)**(N-2)
- self.angle_dist_norm = 1 / quad(angle_dist, 0, np.pi)[0]
- # calc norm for rajan_abbott eigenvalue dist
- if self.is_connectivity:
- self.eigenvalue_dist_norm = quad(self.eigenvalue_real_dist,
- -self.critical_radius,
- self.critical_radius)[0]
- # init variance of similarity score distribution
- integrand = lambda x: x**2 * self.weighted_smallness_dist(x)
- var = quad(integrand, -np.infty, np.infty)[0]
- self.similarity_score_sigma = np.sqrt(var/self.N)
- return None
- def angle_smallness_dist(self, D):
- if D < -1 or D > 1:
- return 0
- N = 2*self.N if self.is_connectivity else self.N
- return self.angle_dist_norm * np.pi/2 * np.cos(D*np.pi/2)**(N-2)
- def angle_dist(self, phi):
- N = 2*self.N if self.is_connectivity else self.N
- if phi < 0 or phi > np.pi:
- return 0
- func = lambda p: np.sin(p)**(N-2)
- if N < 170:
- norm = sc.special.gamma(N/2.) / (np.sqrt(np.pi) \
- * sc.special.gamma((N-1)/2))
- else:
- norm = 1/sc.integrate.quad(func, 0, np.pi)[0]
- return norm * func(phi)
- def weighted_smallness_dist(self, D):
- integrand = lambda x: self.angle_smallness_dist(D/float(x)) \
- * self.weight_dist(x) * 1. / np.abs(x)
- return quad(integrand, self.eigenvalue_min, self.eigenvalue_max)[0]
- def similarity_score_distribution(self, eta):
- return sc.stats.norm.pdf(eta, 0, self.similarity_score_sigma)
- def pvalue(self, eta):
- sigma = self.similarity_score_sigma
- # equal to integration of similarity_score_distribution from eta to inf
- return .5 * (1 + erf(-eta / (sigma * np.sqrt(2))))
- def marchenko_pastur(self, x):
- x_min, x_max = self.eigenvalue_min, self.eigenvalue_max
- y = self.alpha / (2 * np.pi * x) * np.sqrt((x_max - x) * (x - x_min))
- if np.isnan(y):
- return 0
- else:
- return y
- def rajan_abbott(self, w):
- w_shift = w - self.critical_radius
- return 1/self.eigenvalue_dist_norm * self.eigenvalue_real_dist(w_shift)
- def eigenvalue_real_dist(self, x):
- # transform dist for abs(ev) to dist for Re(ev)
- def transform_func(d):
- if d**2-x**2 <= 0:
- return 0
- return self.eigenvalue_radius_dist(d) * d/np.sqrt(d**2-x**2)
- return quad(transform_func, np.abs(x), self.critical_radius)[0]
- def eigenvalue_radius_dist(self, w):
- w = w / np.sqrt(self.chi)
- r = w**2
- critical_value = 1 - self.f + self.f/self.beta
- if r > critical_value:
- return 0
- else:
- phi_p = self.phi(r, self.beta, self.f, dev=1)
- phi_pp = self.phi(r, self.beta, self.f, dev=2)
- return 1/np.pi * (r*phi_pp + phi_p) / np.sqrt(self.chi)
- def q_func(self, r, a, f, dev=0):
- ma = 1-a
- mf = 2*(1-f)
- A = ma*r + 2*f - 1
- B = np.sqrt((ma*r - 1)**2 + 4*f*ma*r)
- if dev == 0:
- return (A+B)/mf
- elif dev == 1:
- return ma/mf * (1 + A/B)
- elif dev == 2:
- return ma**2 / (mf*B) * (1 - A**2/B**2)
- else:
- return np.nan
- def phi(self, r, a, f, dev=1):
- q = self.q_func(r, a, f, dev=0)
- qp = self.q_func(r, a, f, dev=1)
- qpp = self.q_func(r, a, f, dev=2)
- mq = q + 1
- maq = (a*q + 1) / mq
- A = 1/mq - f/q + a*r/mq - r*(a*q+1)/mq**2
- if dev == 1:
- return qp*A + maq
- elif dev == 2:
- return qpp*A \
- + 2*qp*(a/mq - maq/mq) \
- + qp**2*(-1/mq**2 + f/q**2 - 2*a*r/mq**2 + 2*r*maq/mq**2)
- else:
- return np.nan
- def var_J(self, mu, sigma, epsilon):
- return epsilon*sigma**2 + epsilon*(1-epsilon)*mu**2
- def none_or_X(value, dtype):
- if value is None or not bool(value) or value == 'None':
- return None
- try:
- return dtype(value)
- except ValueError:
- return None
- none_or_int = lambda v: none_or_X(v, int)
- none_or_float = lambda v: none_or_X(v, float)
- bool_arg = lambda s: False if s == 'False' else True
- if __name__ == '__main__':
- CLI = argparse.ArgumentParser()
- CLI.add_argument("--matrix_a", nargs='?', type=Path, required=True)
- CLI.add_argument("--matrix_b", nargs='?', type=Path, required=True)
- CLI.add_argument("--output", nargs='?', type=Path, required=True)
- CLI.add_argument("--N", nargs='?', type=int, required=True)
- CLI.add_argument("--bin_num", nargs='?', type=none_or_int, default=None)
- CLI.add_argument("--f", nargs='?', type=none_or_float, default=None)
- CLI.add_argument("--mu", "--mu_ex", nargs='?', type=none_or_float, default=None)
- CLI.add_argument("--epsilon", nargs='?', type=none_or_float, default=None)
- CLI.add_argument("--sigma_ex", nargs='?', type=none_or_float, default=None)
- CLI.add_argument("--sigma_in", nargs='?', type=none_or_float, default=None)
- CLI.add_argument("--is_connectivity", nargs='?', type=bool_arg, default=False)
- CLI.add_argument("--shuffle_neuron_ids", nargs='?', type=bool_arg, default=False)
- args, unknown = CLI.parse_known_args()
- params = dict([(k,v) for k,v in vars(args).items()
- if k not in ['matrix_a', 'matrix_b', 'output']])
- matrix_a = np.nan_to_num(np.load(args.matrix_a))
- matrix_b = np.nan_to_num(np.load(args.matrix_b))
- if args.shuffle_neuron_ids:
- neuron_ids = np.arange(args.N)
- np.random.shuffle(neuron_ids)
- matrix_b = matrix_b[neuron_ids, :][:, neuron_ids]
- eigenangle_test = EigenangleTest(**params)
- score, pvalue = eigenangle_test.compare(matrix_a, matrix_b)
- # print(score, pvalue)
- result = {'score':score, 'pvalue':pvalue}
- args.output.parent.mkdir(exist_ok=True, parents=True)
- json.dump(result, open(args.output, 'w'))
|