Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

eigenangle_test.py 10 KB


  1. import numpy as np
  2. import scipy as sc
  3. from scipy.stats import norm
  4. from scipy.linalg import eigh, eig
  5. from scipy.integrate import quad
  6. from scipy.special import erf
  7. import math
  8. import json
  9. from pathlib import Path
  10. import argparse
  11. import matplotlib.pyplot as plt
  12. class EigenangleTest():
  13. """
  14. Class to create the null hypothesis for the eigenangle test comparison of
  15. correlation matrices or connectivity matrices (for networks as described by
  16. Rajan and Abbott 2006), and conduct the comparison for two given matrices.
  17. """
  18. def __init__(self, is_connectivity=False, **params):
  19. for key, value in params.items():
  20. setattr(self, key, value)
  21. if 'N' not in params:
  22. raise AttributeError('Number of neurons "N" must be given!')
  23. self.is_connectivity = is_connectivity
  24. if is_connectivity: # compare connectivity matrices
  25. for param in ['f', 'mu', 'epsilon', 'sigma_ex', 'sigma_in']:
  26. if not hasattr(self, param) and getattr(self, param) is not None:
  27. raise AttributeError(f'{param} not given!')
  28. self.mu_ex = self.mu
  29. self.mu_in = -self.f * self.mu_ex / (1-self.f)
  30. self.var_J_ex = self.var_J(self.mu_ex, self.sigma_ex, self.epsilon)
  31. self.var_J_in = self.var_J(self.mu_in, self.sigma_in, self.epsilon)
  32. self.beta = self.var_J_in / self.var_J_ex
  33. self.chi = self.var_J_in * self.N
  34. self.weight_dist = self.rajan_abbott
  35. self.critical_radius = np.sqrt(1 - self.f + self.f/self.beta) \
  36. * np.sqrt(self.chi)
  37. else: # compare correlation matrices
  38. if not hasattr(self, 'bin_num') and getattr(self, 'bin_num') is not None:
  39. raise AttributeError('"bin_num" not given!')
  40. self.alpha = self.bin_num / self.N
  41. if self.alpha < 1:
  42. raise ValueError('There need to be more bins than neurons!')
  43. self.weight_dist = self.marchenko_pastur
  44. def compare(self, matrix_a, matrix_b):
  45. if self.N != len(matrix_a) or self.N != len(matrix_b):
  46. raise ValueError(f'Matrices are of wrong size (!={self.N})')
  47. if self.is_connectivity:
  48. eval_a, evec_a = eig(matrix_a)
  49. eval_b, evec_b = eig(matrix_b)
  50. eval_a, eval_b = np.real(eval_a), np.real(eval_b)
  51. eval_a += self.critical_radius
  52. eval_b += self.critical_radius
  53. eval_a[eval_a<0], eval_b[eval_b<0] = 0, 0
  54. else:
  55. eval_a, evec_a = eigh(matrix_a)
  56. eval_b, evec_b = eigh(matrix_b)
  57. # sort eigenvalues and -vector in ascending order
  58. sort_idx_a, sort_idx_b = np.argsort(eval_a), np.argsort(eval_b)
  59. sort_idx_a, sort_idx_b = sort_idx_a[::-1], sort_idx_b[::-1]
  60. eval_a, eval_b = eval_a[sort_idx_a], eval_b[sort_idx_b]
  61. evec_a, evec_b = evec_a.T[sort_idx_a], evec_b.T[sort_idx_b]
  62. for i, (eva, evb) in enumerate(zip(evec_a, evec_b)):
  63. evec_a[i] = eva * np.sign(eva[np.argmax(np.absolute(eva))])
  64. evec_b[i] = evb * np.sign(evb[np.argmax(np.absolute(evb))])
  65. evec_a[i] /= np.linalg.norm(eva)
  66. evec_b[i] /= np.linalg.norm(evb)
  67. M = np.dot(evec_a, np.conjugate(evec_b).T)
  68. M = np.real(M)
  69. M[np.argwhere(M > 1)] = 1.
  70. if len(M) == 1:
  71. angles = np.arccos(M[0])
  72. else:
  73. angles = np.arccos(np.diag(M))
  74. weights = np.sqrt((eval_a**2 + eval_b**2) / 2.)
  75. smallness = 1 - angles / (np.pi/2.)
  76. weighted_smallness = smallness * weights
  77. similarity_score = np.mean(weighted_smallness)
  78. self.init_null_distribution()
  79. pvalue = self.pvalue(similarity_score)
  80. return similarity_score, pvalue
  81. def init_null_distribution(self):
  82. # init min/max eigenvalues
  83. if self.is_connectivity:
  84. self.eigenvalue_min = 0
  85. self.eigenvalue_max = 2*self.critical_radius
  86. else:
  87. self.eigenvalue_min = (1 - np.sqrt(1. / self.alpha)) ** 2
  88. self.eigenvalue_max = (1 + np.sqrt(1. / self.alpha)) ** 2
  89. # init angle distribution norm
  90. N = 2*self.N if self.is_connectivity else self.N
  91. if N <= 170:
  92. self.angle_dist_norm = math.gamma(N/2.) / (np.sqrt(np.pi) \
  93. * math.gamma((N-1)/2))
  94. else:
  95. angle_dist = lambda x: np.sin(x)**(N-2)
  96. self.angle_dist_norm = 1 / quad(angle_dist, 0, np.pi)[0]
  97. # calc norm for rajan_abbott eigenvalue dist
  98. if self.is_connectivity:
  99. self.eigenvalue_dist_norm = quad(self.eigenvalue_real_dist,
  100. -self.critical_radius,
  101. self.critical_radius)[0]
  102. # init variance of similarity score distribution
  103. integrand = lambda x: x**2 * self.weighted_smallness_dist(x)
  104. var = quad(integrand, -np.infty, np.infty)[0]
  105. self.similarity_score_sigma = np.sqrt(var/self.N)
  106. return None
  107. def angle_smallness_dist(self, D):
  108. if D < -1 or D > 1:
  109. return 0
  110. N = 2*self.N if self.is_connectivity else self.N
  111. return self.angle_dist_norm * np.pi/2 * np.cos(D*np.pi/2)**(N-2)
  112. def angle_dist(self, phi):
  113. N = 2*self.N if self.is_connectivity else self.N
  114. if phi < 0 or phi > np.pi:
  115. return 0
  116. func = lambda p: np.sin(p)**(N-2)
  117. if N < 170:
  118. norm = sc.special.gamma(N/2.) / (np.sqrt(np.pi) \
  119. * sc.special.gamma((N-1)/2))
  120. else:
  121. norm = 1/sc.integrate.quad(func, 0, np.pi)[0]
  122. return norm * func(phi)
  123. def weighted_smallness_dist(self, D):
  124. integrand = lambda x: self.angle_smallness_dist(D/float(x)) \
  125. * self.weight_dist(x) * 1. / np.abs(x)
  126. return quad(integrand, self.eigenvalue_min, self.eigenvalue_max)[0]
  127. def similarity_score_distribution(self, eta):
  128. return sc.stats.norm.pdf(eta, 0, self.similarity_score_sigma)
  129. def pvalue(self, eta):
  130. sigma = self.similarity_score_sigma
  131. # equal to integration of similarity_score_distribution from eta to inf
  132. return .5 * (1 + erf(-eta / (sigma * np.sqrt(2))))
  133. def marchenko_pastur(self, x):
  134. x_min, x_max = self.eigenvalue_min, self.eigenvalue_max
  135. y = self.alpha / (2 * np.pi * x) * np.sqrt((x_max - x) * (x - x_min))
  136. if np.isnan(y):
  137. return 0
  138. else:
  139. return y
  140. def rajan_abbott(self, w):
  141. w_shift = w - self.critical_radius
  142. return 1/self.eigenvalue_dist_norm * self.eigenvalue_real_dist(w_shift)
  143. def eigenvalue_real_dist(self, x):
  144. # transform dist for abs(ev) to dist for Re(ev)
  145. def transform_func(d):
  146. if d**2-x**2 <= 0:
  147. return 0
  148. return self.eigenvalue_radius_dist(d) * d/np.sqrt(d**2-x**2)
  149. return quad(transform_func, np.abs(x), self.critical_radius)[0]
  150. def eigenvalue_radius_dist(self, w):
  151. w = w / np.sqrt(self.chi)
  152. r = w**2
  153. critical_value = 1 - self.f + self.f/self.beta
  154. if r > critical_value:
  155. return 0
  156. else:
  157. phi_p = self.phi(r, self.beta, self.f, dev=1)
  158. phi_pp = self.phi(r, self.beta, self.f, dev=2)
  159. return 1/np.pi * (r*phi_pp + phi_p) / np.sqrt(self.chi)
  160. def q_func(self, r, a, f, dev=0):
  161. ma = 1-a
  162. mf = 2*(1-f)
  163. A = ma*r + 2*f - 1
  164. B = np.sqrt((ma*r - 1)**2 + 4*f*ma*r)
  165. if dev == 0:
  166. return (A+B)/mf
  167. elif dev == 1:
  168. return ma/mf * (1 + A/B)
  169. elif dev == 2:
  170. return ma**2 / (mf*B) * (1 - A**2/B**2)
  171. else:
  172. return np.nan
  173. def phi(self, r, a, f, dev=1):
  174. q = self.q_func(r, a, f, dev=0)
  175. qp = self.q_func(r, a, f, dev=1)
  176. qpp = self.q_func(r, a, f, dev=2)
  177. mq = q + 1
  178. maq = (a*q + 1) / mq
  179. A = 1/mq - f/q + a*r/mq - r*(a*q+1)/mq**2
  180. if dev == 1:
  181. return qp*A + maq
  182. elif dev == 2:
  183. return qpp*A \
  184. + 2*qp*(a/mq - maq/mq) \
  185. + qp**2*(-1/mq**2 + f/q**2 - 2*a*r/mq**2 + 2*r*maq/mq**2)
  186. else:
  187. return np.nan
  188. def var_J(self, mu, sigma, epsilon):
  189. return epsilon*sigma**2 + epsilon*(1-epsilon)*mu**2
  190. def none_or_X(value, dtype):
  191. if value is None or not bool(value) or value == 'None':
  192. return None
  193. try:
  194. return dtype(value)
  195. except ValueError:
  196. return None
  197. none_or_int = lambda v: none_or_X(v, int)
  198. none_or_float = lambda v: none_or_X(v, float)
  199. bool_arg = lambda s: False if s == 'False' else True
  200. if __name__ == '__main__':
  201. CLI = argparse.ArgumentParser()
  202. CLI.add_argument("--matrix_a", nargs='?', type=Path, required=True)
  203. CLI.add_argument("--matrix_b", nargs='?', type=Path, required=True)
  204. CLI.add_argument("--output", nargs='?', type=Path, required=True)
  205. CLI.add_argument("--N", nargs='?', type=int, required=True)
  206. CLI.add_argument("--bin_num", nargs='?', type=none_or_int, default=None)
  207. CLI.add_argument("--f", nargs='?', type=none_or_float, default=None)
  208. CLI.add_argument("--mu", "--mu_ex", nargs='?', type=none_or_float, default=None)
  209. CLI.add_argument("--epsilon", nargs='?', type=none_or_float, default=None)
  210. CLI.add_argument("--sigma_ex", nargs='?', type=none_or_float, default=None)
  211. CLI.add_argument("--sigma_in", nargs='?', type=none_or_float, default=None)
  212. CLI.add_argument("--is_connectivity", nargs='?', type=bool_arg, default=False)
  213. CLI.add_argument("--shuffle_neuron_ids", nargs='?', type=bool_arg, default=False)
  214. args, unknown = CLI.parse_known_args()
  215. params = dict([(k,v) for k,v in vars(args).items()
  216. if k not in ['matrix_a', 'matrix_b', 'output']])
  217. matrix_a = np.nan_to_num(np.load(args.matrix_a))
  218. matrix_b = np.nan_to_num(np.load(args.matrix_b))
  219. if args.shuffle_neuron_ids:
  220. neuron_ids = np.arange(args.N)
  221. np.random.shuffle(neuron_ids)
  222. matrix_b = matrix_b[neuron_ids, :][:, neuron_ids]
  223. eigenangle_test = EigenangleTest(**params)
  224. score, pvalue = eigenangle_test.compare(matrix_a, matrix_b)
  225. # print(score, pvalue)
  226. result = {'score':score, 'pvalue':pvalue}
  227. args.output.parent.mkdir(exist_ok=True, parents=True)
  228. json.dump(result, open(args.output, 'w'))