123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- import numpy as np
- import functools
- import mrestimator as mre
- from scipy import stats
- def exclude_inconsistencies(mr_results):
- rejected_inconsistencies = ['H_nonzero', 'H_linear']
- for incon in rejected_inconsistencies:
- mr_results = mr_results[[incon not in mr_results[
- 'inconsistencies'].tolist()[i] for i in range(
- len(mr_results['inconsistencies'].tolist()))]]
- return mr_results
- def consistency_checks(activity, rk, fitres):
- """ Checks the consistency of MR estimates
- We only use H_nonzero and H_linear as exclusion criteria.
- """
- tau_max = 2000 # unusually large tau
- tau_min = 0
- inconsistencies = []
- if fitres.tau > tau_max or fitres.tau < tau_min:
- inconsistencies.append('H_tau')
- if count_nonzero(activity) < 1000:
- inconsistencies.append('H_nonzero')
- if check_linear(rk, fitres.rsquared):
- inconsistencies.append('H_linear')
- if not random_residuals(rk, fitres.fitfunc, *fitres.popt):
- inconsistencies.append('H_residual')
- if not random_residuals_alternative(rk, fitres.fitfunc, *fitres.popt):
- inconsistencies.append('H_residual_alt')
- if Ftest05(rk, fitres.rsquared, fitres.popt):
- inconsistencies.append('H_F05')
- if Ftest10(rk, fitres.rsquared, fitres.popt):
- inconsistencies.append('H_F10')
- return inconsistencies
- def Ftest05(rk, adjusted_rsquared, popt):
- n_obs = len(rk.coefficients)
- n_param = len(popt)
- r_squared = 1.0- (1.0 - adjusted_rsquared) * (n_obs-1-n_param)/(n_obs-1)
- F = (r_squared / (n_param-1)) / ((1-r_squared)/(n_obs-n_param))
- df1 = n_param-1 #n_obs
- df2 = n_obs-n_param
- p_value = 1 - stats.f.cdf(F, df1, df2)
- if p_value > 0.05:
- # complex model is not significanty better in explaining the data
- return True
- else:
- return False
- def Ftest10(rk, adjusted_rsquared, popt):
- n_obs = len(rk.coefficients)
- n_param = len(popt)
- r_squared = 1.0- (1.0 - adjusted_rsquared) * (n_obs-1-n_param)/(n_obs-1)
- F = (r_squared / (n_param-1)) / ((1-r_squared)/(n_obs-n_param))
- df1 = n_param-1 #n_obs
- df2 = n_obs-n_param
- p_value = 1 - stats.f.cdf(F, df1, df2)
- if p_value > 0.10:
- # complex model is not significanty better in explaining the data
- return True
- else:
- return False
- def check_linear(rk, rsquared):
- """ Check if linear function better fits the rk than the fit that was
- done (typically exponential offset). Done by comparing the rsquared of
- both fits, which are weighted with the number of parameters i the
- fitfunction.
- :return: bool
- True: linear fit better (Inconsistency!)
- False: linear fit worse
- """
- try:
- linear_fit = mre.fit(rk, fitfunc=mre.f_linear, description='linear')
- if linear_fit.rsquared is not None and rsquared is not None:
- if linear_fit.rsquared > rsquared:
- return True
- else:
- return False
- else:
- return True
- except RuntimeError:
- return False
- def random_residuals(rk, fitfunc, *args):
- """ Check if residuals (fit minus data) are consistent with noise. If
- this is not the case, i.e. if there are systematic trends in the
- residuals, the fit is inconsistent.
- """
- residuals = rk.coefficients - fitfunc(rk.steps*rk.dt, *args)
- k_win = 20
- for k_1 in range(len(rk.steps)-(k_win+1)):
- sign = []
- for k in range(k_1, k_1+k_win):
- if residuals[k] > 1*np.median(abs(residuals)):
- sign.append(1)
- elif residuals[k] < -1*np.median(abs(residuals)):
- sign.append(-1)
- else:
- sign.append(0)
- if len(set(sign)) <= 1 and sign[0] != 0:
- return False
- return True
- def random_residuals_alternative(rk, fitfunc, *args):
- """ Check if residuals (fit minus data) are consistent with noise. If
- this is not the case, i.e. if there are systematic trends in the
- residuals, the fit is inconsistent.
- """
- residuals = rk.coefficients - fitfunc(rk.steps*rk.dt, *args)
- k_win = 10
- k_range = 50
- sign = []
- for k in range(rk.steps[0], rk.steps[-k_win]):
- res_mean = np.mean(residuals[k:k+k_win])
- if res_mean > 1*np.median(abs(residuals)):
- sign.append(1)
- elif res_mean < -1*np.median(abs(residuals)):
- sign.append(-1)
- else:
- sign.append(0)
- for si in range(len(sign)-k_range):
- if len(set(sign[si:si+k_range])) <= 1 and sign[si] != 0:
- return False
- return True
- def list_preictrecordings(patient):
- listfile = '../Data/preIct_recordings_patients.txt'
- list = np.loadtxt(listfile, dtype=str)
- indices = np.where(list[:, 1] == str(patient))
- recordings = []
- for i in indices:
- recordings.append(list[i, 0])
- return recordings
- def count_nonzero(activity):
- activity_nonzero = 0
- for t in activity:
- if t != 0:
- activity_nonzero += 1
- return activity_nonzero
- def conjunction(*conditions):
- return functools.reduce(np.logical_and, conditions)
- def string_to_func(fitmethod):
- if fitmethod == 'exp':
- fitfunc = mre.f_exponential
- elif fitmethod == 'offset':
- fitfunc = mre.f_exponential_offset
- elif fitmethod == 'complex':
- fitfunc = mre.f_complex
- else:
- print(
- 'Error: fitmethod {} does not exist'.format(fitmethod))
- exit()
- return fitfunc
- def twosided_pvalue(value, diff_distribution):
- diff_distribution = np.array(diff_distribution)
- if value < 0:
- p_value = len(diff_distribution[diff_distribution <=
- value])/len(
- diff_distribution) + len(diff_distribution[diff_distribution >=
- (-1)*value])/len(
- diff_distribution)
- else:
- p_value = len(diff_distribution[diff_distribution >=
- value])/len(
- diff_distribution) + len(diff_distribution[diff_distribution <=
- (-1)*value])/len(
- diff_distribution)
- return p_value
- def bootstrap_distribution(sample, statistic=np.var, nboot=5000):
- sample_stat = []
- for bssample in range(nboot):
- sample_vals = np.random.choice(sample, size=len(sample),
- replace=True)
- sample_stat.append(statistic(sample_vals))
- return sample_stat
- def binning_focus(binning, patient):
- binning_focus = 'X'
- focifile = '../Data/patients.txt'
- f = open(focifile, 'r')
- foci = [line.rstrip('\n').split('\t') for line in f.readlines()]
- focus = 'NA'
- for i, idf in enumerate(foci[1:]):
- if int(idf[0]) == int(patient):
- focus = idf[1]
- if focus == 'NA':
- print('Error: patient {} not in foci list.'.format(patient))
- return 1
- if binning.startswith('left') and focus == 'L' or \
- binning.startswith('right') and \
- focus == 'R':
- binning_focus = 'focal'
- elif binning.startswith('left') and focus == 'R' or \
- binning.startswith('right') and \
- focus == 'L':
- binning_focus = 'contra-lateral'
- elif focus == 'B' and binning.startswith('left'):
- binning_focus = 'B_L'
- elif focus == 'B' and binning.startswith('right'):
- binning_focus = 'B_R'
- elif '?' in focus and binning.startswith('right'):
- binning_focus = 'U_R'
- elif '?' in focus and binning.startswith('left'):
- binning_focus = 'U_L'
- elif (binning.startswith('L') and focus == 'L') or (
- binning.startswith('R') and focus == 'R'):
- binning_focus = 'focal'
- elif (binning.startswith('L') and focus == 'R') or (
- binning.startswith('R') and focus == 'L'):
- binning_focus = 'contra-lateral'
- return binning_focus
|