123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- #!/user/bin/env python
- # coding=utf-8
- """
- @author: yannansu
- @created at: 28.09.22 10:25
- Recover model profiles with MLE by lmfit,
- Version 2: different pdf_c profiles for I and E conditions.
- """
- import numpy as np
- import matplotlib.pyplot as plt
- import pandas as pd
- import seaborn as sns
- from scipy.stats import gamma
- # from scipy.optimize import least_squares, minimize, differential_evolution, basinhopping, shgo, brute
- from lmfit import Parameters, minimize, Minimizer, fit_report
- import json
- import time
- import datetime
- def gaussian(x, mu, sigma):
- y = 1. / (np.sqrt(2. * np.pi) * sigma) * np.exp(-np.power((x - mu) / sigma, 2.) / 2)
- return y / np.sum(y)
- def mix_gamma(loc, a, scale):
- w = np.zeros(shape=(2, nss))
- w[0, :] = gamma.pdf(ss, loc=loc, a=a, scale=scale) # cw
- w[1, :] = gamma.pdf(-ss, loc=loc, a=a, scale=scale) # ccw
- w /= np.sum(w)
- return np.sum(w, axis=0)
- def simul_estimate(stim, n_sample, pars):
- sigma, loc, a, scale, truc_p, motor_bias = pars[0], pars[1], pars[2], pars[3], pars[4], pars[5]
- pdf_c = mix_gamma(loc, a, scale)
- pdf_c_cw = np.hstack([np.repeat(0, sum(ss < 0)), +pdf_c[ss >= 0]])
- pdf_c_ccw = np.hstack([pdf_c[ss < 0], np.repeat(0, sum(ss >= 0))])
- if sampling:
- non_truc_n = int(n_sample * (1 - truc_p))
- np.random.seed(i_btrp)
- measure_samples = np.hstack([np.random.normal(s_i, sigma, n_sample) for s_i in stim])
- pdf_s = np.vstack([gaussian(x=ss, mu=m, sigma=sigma) for m in measure_samples])
- samples_size = np.array([len(stim), n_sample])
- # reshape to n_stim x n_sample x n_ss
- pdf_s = np.reshape(pdf_s, (samples_size[0], samples_size[1], len(ss)))
- measure_samples = np.reshape(measure_samples, samples_size)
- along_ax = 1
- if cmb_type == 'mix':
- pdf_cmb_marginal = np.vstack([pdf_s[sidx[0], sidx[1], :] * (1 - w_c) + pdf_c * w_c if sidx[1] < non_truc_n \
- else (
- pdf_s[sidx, :] * (1 - w_c) + pdf_c_ccw * w_c if s <= 0 else (
- pdf_s[sidx[0], sidx[1], :] * (1 - w_c) + pdf_c_cw * w_c))
- for sidx, s in np.ndenumerate(measure_samples)])
- elif cmb_type == 'mul':
- pdf_cmb_marginal = np.vstack([pdf_s[sidx[0], sidx[1], :] * pdf_c if sidx[1] < non_truc_n \
- else (
- (pdf_s[sidx[0], sidx[1], :] * pdf_c_ccw) if s <= 0 else (pdf_s[sidx[0], sidx[1], :] * pdf_c_cw))
- for sidx, s in np.ndenumerate(measure_samples)])
- else:
- raise ValueError("combination type not defined!")
- pdf_cmb_marginal = np.reshape(pdf_cmb_marginal, (samples_size[0], samples_size[1], len(ss)))
- else:
- pdf_s = np.vstack([gaussian(x=ss, mu=s, sigma=sigma) for s in stim])
- # pdf_s = np.vstack([gaussian(x=ss, mu=m, sigma=sigma) for m in mm])
- # measure_samples = stim
- along_ax = 0
- if truc_p not in [0, 1]:
- raise ValueError("Truncation portion must be 0 or 1 due to no sampling!")
- if cmb_type == 'mix':
- if truc_p == 0:
- pdf_cmb_marginal = np.vstack([pdf_s[sidx[0], sidx[1], :] * (1 - w_c) + pdf_c * w_c for sidx, s in
- np.ndenumerate(stim)])
- else:
- pdf_cmb_marginal = np.vstack([pdf_s[sidx, :] * (1 - w_c) + pdf_c_cw * w_c if s <= 0 else (
- pdf_s[sidx, :] * (1 - w_c) + pdf_c_ccw * w_c)
- for sidx, s in np.ndenumerate(stim)])
- elif cmb_type == 'mul':
- if truc_p == 0:
- pdf_cmb_marginal = np.vstack([pdf_s[sidx, :] * pdf_c for sidx, s in np.ndenumerate(stim)])
- else:
- pdf_cmb_marginal = np.vstack([(pdf_s[sidx, :] * pdf_c_ccw) if s <= 0 else (pdf_s[sidx, :] * pdf_c_cw)
- for sidx, s in np.ndenumerate(stim)])
- else:
- raise ValueError("combination type not defined!")
- pdf_cmb = np.array([(p.T / np.sum(p, axis=along_ax)).T for p in pdf_cmb_marginal])
- # MAP = np.array([np.max(p, axis=along_ax) for p in pdf_cmb])
- # MAP_s = np.array([ss[np.argmax(p, axis=along_ax)] for p in pdf_cmb])
- # median_s = np.array([ss[np.argmin(np.abs(np.cumsum(p, axis=along_ax) - .5), axis=along_ax)] for p in pdf_cmb])
- # mean_s = np.array([np.sum(ss * p, axis=along_ax) for p in pdf_cmb])
- #
- # pred = mean_s + motor_bias
- if sampling:
- pdf_cmb = np.mean(pdf_cmb, axis=1)
- return pdf_cmb
- def find_nearest(arr, vals):
- ids = [(np.abs(arr - v)).argmin() for v in vals]
- return np.array(ids)
- def negloglik(pars, df):
- # sigma_l, sigma_h = pars[0], pars[1]
- # loc_i, loc_e = pars[2], pars[3]
- # a_i, a_e = pars[4], pars[5]
- # scale_i, scale_e = pars[6], pars[7]
- # truc_p_i, truc_p_e = pars[8]/10., pars[9]/10.
- # motor_bias_i, motor_bias_e = pars[10], pars[11]
- # if I and E only differ in truncation portion
- vals = pars.valuesdict()
- sigma_l, sigma_h = vals['sigma_l'], vals['sigma_h']
- loc_i, loc_e = vals['loc_i'], vals['loc_e']
- a_i, a_e = vals['a_i'], vals['a_e']
- scale_i, scale_e = vals['scale_i'], vals['scale_e']
- truc_p_i, truc_p_e = vals['truc_p_i'] / 10., vals['truc_p_e'] / 10.
- motor_bias_i, motor_bias_e = vals['motor_bias'], vals['motor_bias']
- noises = np.sort(df['noise'].unique())
- unique_stim = df['relative_stimulus'].unique()
- n_obs = int(df['relative_stimulus'].value_counts()[0] / 4)
- lik_i = []
- lik_e = []
- err = 0
- for idx, sigma in enumerate([sigma_l, sigma_h]):
- df_i = df[(df.condition == 'I') & (df.noise == noises[idx])]
- df_e = df[(df.condition == 'E') & (df.noise == noises[idx])]
- # stim_i = df_i['relative_stimulus'].to_numpy()
- # stim_e = df_e['relative_stimulus'].to_numpy()
- obs_i = np.reshape(df_i['relative_response'].to_numpy(), (len(unique_stim), n_obs)) - motor_bias_i
- obs_e = np.reshape(df_e['relative_response'].to_numpy(), (len(unique_stim), n_obs)) - motor_bias_e
- # bootstrap sampling from measured data
- np.random.seed(i_btrp)
- # print(np.random.normal(1))
- obs_i_btrp = np.apply_along_axis(lambda x: np.random.choice(x, replace=True, size=n_obs), arr=obs_i, axis=1)
- obs_e_btrp = np.apply_along_axis(lambda x: np.random.choice(x, replace=True, size=n_obs), arr=obs_e, axis=1)
- # obs_i_btrp = obs_i
- # obs_e_btrp = obs_e
- pdf_cmb_i = simul_estimate(unique_stim, n_sample, [sigma, loc_i, a_i, scale_i, truc_p_i, motor_bias_i])
- pdf_cmb_e = simul_estimate(unique_stim, n_sample, [sigma, loc_e, a_e, scale_e, truc_p_e, motor_bias_e])
- lik_i.append([np.sum(np.log10(pdf[find_nearest(ss, obs)])) for obs, pdf in zip(obs_i_btrp, pdf_cmb_i)])
- lik_e.append([np.sum(np.log10(pdf[find_nearest(ss, obs)])) for obs, pdf in zip(obs_e_btrp, pdf_cmb_e)])
- print(-(np.sum(lik_i) + np.sum(lik_e)))
- return -(np.sum(lik_i) + np.sum(lik_e))
- # def negloglik(pars, stim, obs):
- # n_stim = len(stim)
- # residuals = obs - simul_estimate(stim, pars)
- # ll = -(n_stim * 1 / 2) * (1 + np.log(2 * np.pi)) - (n_stim / 2) * np.log(residuals.dot(residuals) / n_stim)
- # # print(-ll/n_stim)
- # return -ll/n_stim
- """
- ===========================================================
- Run modeling from here
- ===========================================================
- """
- # save directory
- save_dir = 'data_analysis/recover_model_v4/'
- # simulus grids
- ss = np.arange(-60., 60., .5)
- nss = len(ss)
- # measurement grids
- # mm = np.arange(-80, 80, .5)
- # nmm = len(mm)
- # model settings
- n_sample = 100
- sampling = False
- cmb_type = 'mul'
- w_c = None
- n_btrp = 100
- # """
- model_cfg = {
- 'date': datetime.datetime.now().strftime('%c'),
- 'n_sample': n_sample,
- 'sampling': sampling,
- 'cmb_type': cmb_type,
- 'w_c': None,
- 'n_btrp': n_btrp
- }
- json.dump(model_cfg, open(save_dir + f'model_cfg.json', 'w'))
- # param settings
- params = Parameters()
- # add with tuples: (NAME VALUE VARY MIN MAX EXPR BRUTE_STEP)
- params.add_many(
- ('sigma_l', 3, True, 1, 20, None, .1),
- ('sigma_h', 5, True, 1, 20, None, .1),
- ('loc_i', -.5, True, -10, 10, None, .1),
- ('loc_e', -.5, True, -10, 10, None, .1),
- ('a_i', 3, True, 2, 30, None, .1),
- ('a_e', 3, True, 2, 30, None, .1),
- ('scale_i', 10, True, 1, 50, None, .1),
- ('scale_e', 10, True, 1, 50, None, .1),
- ('truc_p_i', 0, False, 0, 10, None, .1),
- ('truc_p_e', 0, False, 0, 10, None, .1),
- ('motor_bias', -.5, True, -5, 5, None, .1),
- )
- params.pretty_print()
- params.dump(open(save_dir + f'param_cfg.json', 'w'))
- all_dat = pd.read_csv('data/dat.csv')
- # for sub in ['sPool']:
- for sub in ['S1', 'S2', 'S3', 'S4', 'S5']:
- if sub != 'sPool':
- dat = all_dat.query("sub == @sub").sort_values(by=['noise', 'condition', 'relative_stimulus'])
- else:
- dat = all_dat.sort_values(by=['noise', 'condition', 'relative_stimulus'])
- res_list = []
- stim_finer = np.arange(-20, 20, .5)
- res_pdf = np.zeros(shape=[4, n_btrp, len(stim_finer), nss])
- start = time.time()
- # bootstrapping
- for i_btrp in range(n_btrp):
- # np.random.seed(i_btrp)
- fitter = Minimizer(negloglik, params, fcn_args=[dat], nan_policy='omit')
- res = fitter.minimize(method='nelder')
- print(fit_report(res))
- print(f"time:{time.time() - start}")
- res_df = pd.DataFrame([{'sub': sub,
- 'i_btrp': i_btrp,
- 'sigma_l': res.params['sigma_l'].value,
- 'sigma_h': res.params['sigma_h'].value,
- 'loc_i': res.params['loc_i'].value,
- 'loc_e': res.params['loc_e'].value,
- 'a_i': res.params['a_i'].value,
- 'a_e': res.params['a_e'].value,
- 'scale_i': res.params['scale_i'].value,
- 'scale_e': res.params['scale_e'].value,
- 'truc_p_i': res.params['truc_p_i'].value/10.,
- 'truc_p_e': res.params['truc_p_e'].value/10.,
- 'motor_bias': res.params['motor_bias'].value,
- 'negloglik': negloglik(res.params, dat),
- 'chi-square': res.chisqr,
- 'aic': res.aic,
- 'bic': res.bic
- }])
- # res_df.to_csv(save_dir + f'{sub}_pars.csv', index=False)
- res_list.append(res_df)
- # low+I, low+E, high+I, high+E
- res_pdf[0, i_btrp, :, :] = simul_estimate(stim_finer, n_sample, res_df[
- ['sigma_l', 'loc_i', 'a_i', 'scale_i', 'truc_p_i', 'motor_bias']].values.flatten())
- res_pdf[1, i_btrp, :, :] = simul_estimate(stim_finer, n_sample, res_df[
- ['sigma_l', 'loc_e', 'a_e', 'scale_e', 'truc_p_e', 'motor_bias']].values.flatten())
- res_pdf[2, i_btrp, :, :] = simul_estimate(stim_finer, n_sample, res_df[
- ['sigma_h', 'loc_i', 'a_i', 'scale_i', 'truc_p_i', 'motor_bias']].values.flatten())
- res_pdf[3, i_btrp, :, :] = simul_estimate(stim_finer, n_sample, res_df[
- ['sigma_h', 'loc_e', 'a_e', 'scale_e', 'truc_p_e', 'motor_bias']].values.flatten())
- # Save parameters
- pd.concat(res_list, ignore_index=True).to_csv(save_dir + f'{sub}_pars.csv', index=False)
- # Save simulation results
- np.save(save_dir + f"{sub}_pdf_cmb.npy", res_pdf)
- # """
|