#!/user/bin/env python # coding=utf-8 """ @author: yannansu @created at: 28.09.22 10:25 Recover model profiles with MLE by lmfit, Version 2: different pdf_c profiles for I and E conditions. """ import numpy as np import matplotlib.pyplot as plt import pandas as pd import seaborn as sns from scipy.stats import gamma # from scipy.optimize import least_squares, minimize, differential_evolution, basinhopping, shgo, brute from lmfit import Parameters, minimize, Minimizer, fit_report import json import time import datetime def gaussian(x, mu, sigma): y = 1. / (np.sqrt(2. * np.pi) * sigma) * np.exp(-np.power((x - mu) / sigma, 2.) / 2) return y / np.sum(y) def mix_gamma(loc, a, scale): w = np.zeros(shape=(2, nss)) w[0, :] = gamma.pdf(ss, loc=loc, a=a, scale=scale) # cw w[1, :] = gamma.pdf(-ss, loc=loc, a=a, scale=scale) # ccw w /= np.sum(w) return np.sum(w, axis=0) def simul_estimate(stim, n_sample, pars): sigma, loc, a, scale, truc_p, motor_bias = pars[0], pars[1], pars[2], pars[3], pars[4], pars[5] pdf_c = mix_gamma(loc, a, scale) pdf_c_cw = np.hstack([np.repeat(0, sum(ss < 0)), +pdf_c[ss >= 0]]) pdf_c_ccw = np.hstack([pdf_c[ss < 0], np.repeat(0, sum(ss >= 0))]) if sampling: non_truc_n = int(n_sample * (1 - truc_p)) np.random.seed(i_btrp) measure_samples = np.hstack([np.random.normal(s_i, sigma, n_sample) for s_i in stim]) pdf_s = np.vstack([gaussian(x=ss, mu=m, sigma=sigma) for m in measure_samples]) samples_size = np.array([len(stim), n_sample]) # reshape to n_stim x n_sample x n_ss pdf_s = np.reshape(pdf_s, (samples_size[0], samples_size[1], len(ss))) measure_samples = np.reshape(measure_samples, samples_size) along_ax = 1 if cmb_type == 'mix': pdf_cmb_marginal = np.vstack([pdf_s[sidx[0], sidx[1], :] * (1 - w_c) + pdf_c * w_c if sidx[1] < non_truc_n \ else ( pdf_s[sidx, :] * (1 - w_c) + pdf_c_ccw * w_c if s <= 0 else ( pdf_s[sidx[0], sidx[1], :] * (1 - w_c) + pdf_c_cw * w_c)) for sidx, s in np.ndenumerate(measure_samples)]) elif cmb_type == 'mul': pdf_cmb_marginal = np.vstack([pdf_s[sidx[0], sidx[1], :] * pdf_c if sidx[1] < non_truc_n \ else ( (pdf_s[sidx[0], sidx[1], :] * pdf_c_ccw) if s <= 0 else (pdf_s[sidx[0], sidx[1], :] * pdf_c_cw)) for sidx, s in np.ndenumerate(measure_samples)]) else: raise ValueError("combination type not defined!") pdf_cmb_marginal = np.reshape(pdf_cmb_marginal, (samples_size[0], samples_size[1], len(ss))) else: pdf_s = np.vstack([gaussian(x=ss, mu=s, sigma=sigma) for s in stim]) # pdf_s = np.vstack([gaussian(x=ss, mu=m, sigma=sigma) for m in mm]) # measure_samples = stim along_ax = 0 if truc_p not in [0, 1]: raise ValueError("Truncation portion must be 0 or 1 due to no sampling!") if cmb_type == 'mix': if truc_p == 0: pdf_cmb_marginal = np.vstack([pdf_s[sidx[0], sidx[1], :] * (1 - w_c) + pdf_c * w_c for sidx, s in np.ndenumerate(stim)]) else: pdf_cmb_marginal = np.vstack([pdf_s[sidx, :] * (1 - w_c) + pdf_c_cw * w_c if s <= 0 else ( pdf_s[sidx, :] * (1 - w_c) + pdf_c_ccw * w_c) for sidx, s in np.ndenumerate(stim)]) elif cmb_type == 'mul': if truc_p == 0: pdf_cmb_marginal = np.vstack([pdf_s[sidx, :] * pdf_c for sidx, s in np.ndenumerate(stim)]) else: pdf_cmb_marginal = np.vstack([(pdf_s[sidx, :] * pdf_c_ccw) if s <= 0 else (pdf_s[sidx, :] * pdf_c_cw) for sidx, s in np.ndenumerate(stim)]) else: raise ValueError("combination type not defined!") pdf_cmb = np.array([(p.T / np.sum(p, axis=along_ax)).T for p in pdf_cmb_marginal]) # MAP = np.array([np.max(p, axis=along_ax) for p in pdf_cmb]) # MAP_s = np.array([ss[np.argmax(p, axis=along_ax)] for p in pdf_cmb]) # median_s = np.array([ss[np.argmin(np.abs(np.cumsum(p, axis=along_ax) - .5), axis=along_ax)] for p in pdf_cmb]) # mean_s = np.array([np.sum(ss * p, axis=along_ax) for p in pdf_cmb]) # # pred = mean_s + motor_bias if sampling: pdf_cmb = np.mean(pdf_cmb, axis=1) return pdf_cmb def find_nearest(arr, vals): ids = [(np.abs(arr - v)).argmin() for v in vals] return np.array(ids) def negloglik(pars, df): # sigma_l, sigma_h = pars[0], pars[1] # loc_i, loc_e = pars[2], pars[3] # a_i, a_e = pars[4], pars[5] # scale_i, scale_e = pars[6], pars[7] # truc_p_i, truc_p_e = pars[8]/10., pars[9]/10. # motor_bias_i, motor_bias_e = pars[10], pars[11] # if I and E only differ in truncation portion vals = pars.valuesdict() sigma_l, sigma_h = vals['sigma_l'], vals['sigma_h'] loc_i, loc_e = vals['loc_i'], vals['loc_e'] a_i, a_e = vals['a_i'], vals['a_e'] scale_i, scale_e = vals['scale_i'], vals['scale_e'] truc_p_i, truc_p_e = vals['truc_p_i'] / 10., vals['truc_p_e'] / 10. motor_bias_i, motor_bias_e = vals['motor_bias'], vals['motor_bias'] noises = np.sort(df['noise'].unique()) unique_stim = df['relative_stimulus'].unique() n_obs = int(df['relative_stimulus'].value_counts()[0] / 4) lik_i = [] lik_e = [] err = 0 for idx, sigma in enumerate([sigma_l, sigma_h]): df_i = df[(df.condition == 'I') & (df.noise == noises[idx])] df_e = df[(df.condition == 'E') & (df.noise == noises[idx])] # stim_i = df_i['relative_stimulus'].to_numpy() # stim_e = df_e['relative_stimulus'].to_numpy() obs_i = np.reshape(df_i['relative_response'].to_numpy(), (len(unique_stim), n_obs)) - motor_bias_i obs_e = np.reshape(df_e['relative_response'].to_numpy(), (len(unique_stim), n_obs)) - motor_bias_e # bootstrap sampling from measured data np.random.seed(i_btrp) # print(np.random.normal(1)) obs_i_btrp = np.apply_along_axis(lambda x: np.random.choice(x, replace=True, size=n_obs), arr=obs_i, axis=1) obs_e_btrp = np.apply_along_axis(lambda x: np.random.choice(x, replace=True, size=n_obs), arr=obs_e, axis=1) # obs_i_btrp = obs_i # obs_e_btrp = obs_e pdf_cmb_i = simul_estimate(unique_stim, n_sample, [sigma, loc_i, a_i, scale_i, truc_p_i, motor_bias_i]) pdf_cmb_e = simul_estimate(unique_stim, n_sample, [sigma, loc_e, a_e, scale_e, truc_p_e, motor_bias_e]) lik_i.append([np.sum(np.log10(pdf[find_nearest(ss, obs)])) for obs, pdf in zip(obs_i_btrp, pdf_cmb_i)]) lik_e.append([np.sum(np.log10(pdf[find_nearest(ss, obs)])) for obs, pdf in zip(obs_e_btrp, pdf_cmb_e)]) print(-(np.sum(lik_i) + np.sum(lik_e))) return -(np.sum(lik_i) + np.sum(lik_e)) # def negloglik(pars, stim, obs): # n_stim = len(stim) # residuals = obs - simul_estimate(stim, pars) # ll = -(n_stim * 1 / 2) * (1 + np.log(2 * np.pi)) - (n_stim / 2) * np.log(residuals.dot(residuals) / n_stim) # # print(-ll/n_stim) # return -ll/n_stim """ =========================================================== Run modeling from here =========================================================== """ # save directory save_dir = 'data_analysis/recover_model_v4/' # simulus grids ss = np.arange(-60., 60., .5) nss = len(ss) # measurement grids # mm = np.arange(-80, 80, .5) # nmm = len(mm) # model settings n_sample = 100 sampling = False cmb_type = 'mul' w_c = None n_btrp = 100 # """ model_cfg = { 'date': datetime.datetime.now().strftime('%c'), 'n_sample': n_sample, 'sampling': sampling, 'cmb_type': cmb_type, 'w_c': None, 'n_btrp': n_btrp } json.dump(model_cfg, open(save_dir + f'model_cfg.json', 'w')) # param settings params = Parameters() # add with tuples: (NAME VALUE VARY MIN MAX EXPR BRUTE_STEP) params.add_many( ('sigma_l', 3, True, 1, 20, None, .1), ('sigma_h', 5, True, 1, 20, None, .1), ('loc_i', -.5, True, -10, 10, None, .1), ('loc_e', -.5, True, -10, 10, None, .1), ('a_i', 3, True, 2, 30, None, .1), ('a_e', 3, True, 2, 30, None, .1), ('scale_i', 10, True, 1, 50, None, .1), ('scale_e', 10, True, 1, 50, None, .1), ('truc_p_i', 0, False, 0, 10, None, .1), ('truc_p_e', 0, False, 0, 10, None, .1), ('motor_bias', -.5, True, -5, 5, None, .1), ) params.pretty_print() params.dump(open(save_dir + f'param_cfg.json', 'w')) all_dat = pd.read_csv('data/dat.csv') # for sub in ['sPool']: for sub in ['S1', 'S2', 'S3', 'S4', 'S5']: if sub != 'sPool': dat = all_dat.query("sub == @sub").sort_values(by=['noise', 'condition', 'relative_stimulus']) else: dat = all_dat.sort_values(by=['noise', 'condition', 'relative_stimulus']) res_list = [] stim_finer = np.arange(-20, 20, .5) res_pdf = np.zeros(shape=[4, n_btrp, len(stim_finer), nss]) start = time.time() # bootstrapping for i_btrp in range(n_btrp): # np.random.seed(i_btrp) fitter = Minimizer(negloglik, params, fcn_args=[dat], nan_policy='omit') res = fitter.minimize(method='nelder') print(fit_report(res)) print(f"time:{time.time() - start}") res_df = pd.DataFrame([{'sub': sub, 'i_btrp': i_btrp, 'sigma_l': res.params['sigma_l'].value, 'sigma_h': res.params['sigma_h'].value, 'loc_i': res.params['loc_i'].value, 'loc_e': res.params['loc_e'].value, 'a_i': res.params['a_i'].value, 'a_e': res.params['a_e'].value, 'scale_i': res.params['scale_i'].value, 'scale_e': res.params['scale_e'].value, 'truc_p_i': res.params['truc_p_i'].value/10., 'truc_p_e': res.params['truc_p_e'].value/10., 'motor_bias': res.params['motor_bias'].value, 'negloglik': negloglik(res.params, dat), 'chi-square': res.chisqr, 'aic': res.aic, 'bic': res.bic }]) # res_df.to_csv(save_dir + f'{sub}_pars.csv', index=False) res_list.append(res_df) # low+I, low+E, high+I, high+E res_pdf[0, i_btrp, :, :] = simul_estimate(stim_finer, n_sample, res_df[ ['sigma_l', 'loc_i', 'a_i', 'scale_i', 'truc_p_i', 'motor_bias']].values.flatten()) res_pdf[1, i_btrp, :, :] = simul_estimate(stim_finer, n_sample, res_df[ ['sigma_l', 'loc_e', 'a_e', 'scale_e', 'truc_p_e', 'motor_bias']].values.flatten()) res_pdf[2, i_btrp, :, :] = simul_estimate(stim_finer, n_sample, res_df[ ['sigma_h', 'loc_i', 'a_i', 'scale_i', 'truc_p_i', 'motor_bias']].values.flatten()) res_pdf[3, i_btrp, :, :] = simul_estimate(stim_finer, n_sample, res_df[ ['sigma_h', 'loc_e', 'a_e', 'scale_e', 'truc_p_e', 'motor_bias']].values.flatten()) # Save parameters pd.concat(res_list, ignore_index=True).to_csv(save_dir + f'{sub}_pars.csv', index=False) # Save simulation results np.save(save_dir + f"{sub}_pdf_cmb.npy", res_pdf) # """