123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Mon Feb 22 15:18:09 2021
- @author: Oscar Portoles
- """
- import numpy as np
- import numba as nb
- import scipy.stats as st
- import scipy.io as sio
- import time
- from itertools import combinations
- import copy
- class WBLSmodel():
- """ whole-brain large-scale model and analysis methods """
- def __init__(self,stageIx=0,vel=5,zh=None):
- """ @ stageIx: int or list, with stage's indexes to simulate
- @ vel: float, velocity of spike-propagation velocity [m/s]
- @ zh: ndim array, iniital history of models pahses and amplitudes
- """
- self.zhist = zh # history of initial conditions
- self.localC = np.float64(0.6) # default Local coupling, L
- self.globalC = np.float64(0.15) # default Global coupling G
- self.velocity = np.float32(vel) # velocity [meter / second]
- self.interHsc = np.float32(1.0) # scaling factor between hemispheres
- self.tMax = np.float32(1.15) # time of simulation
- self.fs = np.float32(100.0) # samplig frequency for FC
- self.omega = np.float64(6) # Central natural frequency of ROIs [Hz]
- self.dlt = np.float64(1) # Spread of natural frequencies
- self.dt = np.float64(1e-3) # integration step
- self.stageSim = self._setStageSim(stageIx)
- self.iniCond = 2 # Initial conditions - 1: random (fiexd seed) , 2: emprical initial state, 3: testin nModel models identical, 4: empirical init State eover 3rd dimension
- self.log = []
- self.widthPulse = 0.030 # [seconds] duration of the pulse
- self.offMidBump = 0.01 # relative to the middel poit of a bump [sec]
- self.preBumpWin = 0.05 # 5ms, as long as bump [sec]
- self.bumpDur = 0.05 # duration of bump [sec]
- self.pathData = './data/'
- self.nameDTIm = 'mCons9Anato.mat' # structural connectome
- self.nameIni = 'OptiNoisy0p01IniState.npz' # inital condition first stage from pre-visual encoding + noise
- self.efcData = 'FCstagesDTIrois.mat' # MEG within-stage dpFC
- self.mapFile = 'mapFS2DTI.mat' # mapping frm FResurfer layout of ROIS to our visualization
- self.nameEnvs = 'meanEnvelopeByStage.mat' # relative change of envelope amplitude at the onset of stages
- self.nModel = np.int32(25) # number of model with different inital conditions to run simulataneously
- self.oneModel = False
- self.iniSeed = 1 # Seed for random intial conditions
- self._getAnatomicNetw()
- self._getEmpiricalInitialStateNP()
- self._importFCstage()
- self.setLocalParam(omega=self.omega, dlt=self.dlt)
- self.setNumbaThreaths()
-
- def setNumbaThreaths(self, n=1):
- nb.config.NUMBA_NUM_THREADS = n
-
- def get_mylogs(self):
- return self.log
-
- def _setStageSim(self, stSim):
- if isinstance(stSim, list):
- return np.array(stSim)
- else:
- return np.array([stSim])
- def _getAnatomicNetw(self):
- # load anatomical data
- loaddata = self.pathData + self.nameDTIm
- dti = sio.loadmat(loaddata) # C: structural connectivity, D: distance between areas.
- self.D = dti['mcdA'] # Distances beween nodes
- self.anato = dti['mcwA'].astype(np.float64) # Strucural/anatomical network
- self.C = np.shape(self.D)[1] # number of nodes/brain areas
- self.pairs = np.array(list(combinations(range(self.C),2)), dtype=np.int32)
- self.nFCs = self.pairs.shape[0]
- self.pairsC = np.vstack((self.pairs, np.fliplr(self.pairs)))
- # Distance to meters
- self.D = self.D / 1000.0
- def _getEmpiricalInitialStateNP(self):
- # load initial conditions
- loaddata = self.pathData + self.nameIni
- iniSta = np.load(loaddata,allow_pickle=True)
- self.eIniPhi = iniSta['iniState'][:,0,:]
- self.eIniAmp1 = iniSta['iniState'][:,1,:]
- self.eIniAmp2 = iniSta['iniState'][:,2,:]
- self.statesT = iniSta['edgeSt'][self.stageSim, :2]
- bump = iniSta['edgeSt'][self.stageSim,2]
- preBu = np.array([bump - self.bumpDur/2 - self.preBumpWin, bump - self.bumpDur/2])
- atBu = np.array([bump - self.bumpDur/2, bump + self.bumpDur/2])
- if self.stageSim.size == 1 :
- if self.stageSim > 0 :
- offset_stage_before = iniSta['edgeSt'][self.stageSim-1, 1] -self.dt
- self.statesT = self.statesT - offset_stage_before
- bump = bump - offset_stage_before
- preBu = preBu - offset_stage_before
- atBu = atBu - offset_stage_before
-
- self.edges_n = np.int32(self.fs * self.statesT)
- self.edges_dt = np.int32((1/self.dt) * self.statesT)
- self.nStates = self.statesT.shape[0]
- self.tMax = self.statesT.max() + 1/self.fs
- self.bump_n = np.int32(self.fs * bump)
- self.bump_dt = np.int32((1/self.dt) * bump)
- self.preBu_dt = np.int32(preBu / self.dt)
- self.atBu_dt = np.int32(atBu / self.dt)
-
- def _getEmpiReativEnv(self):
- loaddata = self.pathData + self.nameEnvs
- dti = sio.loadmat(loaddata)
- self.eRelEnv = dti['envRel'][:, self.stageSim] # [roois, stages]
-
- def setLocalParam(self, omega=6, dlt=1):
- omga_ = 2*np.pi * np.float64(omega) * self.dt
- dlt_ = np.float64(dlt) * self.dt
- self.locaDyn = np.complex64(-dlt_ + 1j* omga_)
-
- def _importFCstage(self):
- filepath = self.pathData + self.efcData
- self.eFC = np.float32(sio.loadmat(filepath)['fc1d'][2 + self.stageSim,:])
- self.eFCs = np.sign(self.eFC)
- self.neFCs = np.sum(np.abs(self.eFCs), axis=-1)
- self.inv_neFCs = 1.0 / self.neFCs
-
- @nb.vectorize([nb.float32(nb.complex64, nb.complex64)],cache=True,target='cpu',nopython=True)
- def _csd(zi, zj):
- csd = zi * np.conjugate(zj) # sample phase differences sign
- csdi = np.imag(csd)
- return np.sign( csdi )
- def dPLI3d_v(self, z):
- # PLI, cross spectral density
- z_i = z[self.pairs[:,0], self.edges_n[0,0]:self.edges_n[-1,-1], :]
- z_j = z[self.pairs[:,1], self.edges_n[0,0]:self.edges_n[-1,-1], :]
- # print('z_i shape: ', z_i.shape[1])
- # print('state chope: ',(self.edges_n[0,0], self.edges_n[-1,-1]))
- edges_n_0 = self.edges_n - self.edges_n[0,0]
- # sign phase difference
- siCsd = WBLSmodel._csd(z_i, z_j)
- self.pli = np.empty((self.nStates, self.nFCs), dtype=np.float32)
- # PLI over stages
- for i in range(self.nStates):
- # mean PLI over stage samples and model
- self.pli[i,:] = np.mean(siCsd[:, edges_n_0[i,0]:edges_n_0[i,1],:], axis=(1,2))
-
- def similarityAllPLIs(self):
- # fater than a numbazied function
- sigSimFC = np.sign(self.pli)
- return np.sum( self.eFCs * sigSimFC, axis=1) / self.neFCs
-
- def similarityAllPLIsWenergy(self):
- # fater than a numbazied function
- sigSimFC= np.sign(self.pli)
- simil = np.sum( self.eFCs * sigSimFC, axis=1) / self.neFCs
- energy = np.sum(np.abs(self.pulse))
- return simil - energy * self.inv_neFCs / self.maxEner
- def getPreAtBumpEnv(self):
- # change to z-score of the whole trial??
- self.preBu_env = np.zeros((self.C,self.nModel,self.nStates)) # ,int(self.bumpDur*self.dt)
- self.atBu_env = np.zeros((self.C,self.nModel,self.nStates))
- for iXs in range(self.nStates):
- z_pre = self.z[:, int(self.preBu_dt[0,iXs]+self.maxDlay):int(self.preBu_dt[1,iXs]+self.maxDlay), :]
- if self.preBu_dt[0,iXs]+self.maxDlay < 0:
- z_pre = self.z[:, :int(self.preBu_dt[1]+self.maxDlay), :]
- if z_pre.shape[1] < 5:
- z_pre = np.nan
- z_at = self.z[:, int(self.atBu_dt[0,iXs] +self.maxDlay):int(self.atBu_dt[1,iXs] +self.maxDlay), :]
- self.preBu_env[:,:,iXs] = np.mean(WBLSmodel._absZ( z_pre ), axis=1)
- self.atBu_env[:,:,iXs] = np.mean(WBLSmodel._absZ( z_at ), axis=1)
-
-
- def getScorrelRelEnv(self):
- self.sCor = np.zeros(self.nStates)
- for iXs in range(self.nStates):
- self.sCor[iXs], pval = st.spearmanr( self.eRelEnv[:,iXs], self.relEnv[:,iXs])
-
- def similarityPLIsWenergySperm(self):
- similFC = np.sum( self.eFCs * np.sign(self.pli), axis=1) / self.neFCs
- energyCoef = np.sum(np.abs(self.pulse)) / self.maxEner
- # print('fc: ',similFC[0],' energy: ', energyCoef ,' sCor: ', self.sCor)
- return similFC[0] - energyCoef * (1-self.sCor) * self.inv_neFCs
- def relativeEnvBuPre(self):
- self.getPreAtBumpEnv()
- self.relEnv = np.mean( (self.atBu_env - self.preBu_env) / self.preBu_env , axis=1) # mean over models of relative change
-
- def getAnatoCoupling(self,kG,kL):
- """Get anatomical network with couplings"""
- # normalization adjacency matrix
- self.anatoNorm = self.anato / np.mean(self.anato[~np.identity(self.C,dtype=bool)]) # normalize structural network to a mean = 1
- kS = self.anatoNorm * kG / self.C # Globa coupling
- np.fill_diagonal(kS,kL) # Local coupling
- return np.float64(kS)
-
- def getAnatoCouplingIHS(self,kG,kL,ihs):
- """Get anatomical network with couplings and inter-hemisphere scaling"""
- self.anatoScaleNormali(ihs) # IHS and normalization adjacency matrix
- kS = self.anatoSC * kG / self.C # Globa coupling
- np.fill_diagonal(kS,kL) # Local coupling
- return np.float64(kS)
-
- def anatoScaleNormali(self, h):
- self.anatoSC = np.zeros((self.C,self.C))
- hC = int(self.C/2)
- # scale from one hemisphere to another one
- self.anatoSC[hC:,:hC] = self.anato[hC:,:hC] * h
- self.anatoSC[:hC,hC:] = self.anato[:hC,hC:] * h
- # within hemsphere connections
- self.anatoSC[:hC,:hC] = self.anato[:hC,:hC]
- self.anatoSC[hC:,hC:] = self.anato[hC:,hC:]
- # normalization
- self.anatoSC = self.anatoSC / np.mean(self.anatoSC[~np.identity(self.C,dtype=bool)]) # normalize structural network to a mean = 1
- def _doCouplingLSBMsimu(self):
- """ get coupling matris with predefined local global cuplings ans scale by dt and 1/2
- So, it is ready to be used in teh the simulations """
- if self.interHsc == 1.0:
- K_ = self.getAnatoCoupling(self.globalC, self.localC)
- else:
- K_ = self.getAnatoCouplingIHS(self.globalC, self.localC, self.interHsc)
- ksim = np.float32( 0.5 * K_ * self.dt)
- self.Ksim = np.repeat(ksim[:,:,np.newaxis],self.nModel,axis=2)
-
- def getDelays(self,vel):
- """Return maximum delay and delay steps in samples"""
- # dlay = self.D / (1000.0 * vel) # [seconds] correct from mm to m
- self.dlay = self.D / vel # [seconds] correct from mm to m
- self.dlayStep = np.around(self.dlay / self.dt).astype(np.int32) # delay on steps backwards to be done
- self.maxDlay = np.int64(np.max(self.dlayStep)) # number of time steps for the longest dela
- self.iXc = np.tile( np.arange(self.C, dtype=np.int32), (self.C,1)).T # index for ROIs
- @nb.vectorize([nb.float32(nb.complex64)],cache=True,target='cpu',nopython=True)
- def _absZ(z):
- return np.abs(z) # normalize arguments to one
-
- # @profile
- def doKuramotoOrder(self,z):
- z_norm = z / self.z_abs # makes all modulus equal to one
- # global ordedr with local order == 1 at time t
- orderD = np.abs( np.mean( z_norm, axis = 0 ))
- # temporal mean and std, over models
- orderSD = np.mean( np.std( orderD, axis=0 ) ).astype(np.float16)
- order1 = np.mean( np.mean( orderD, axis=0 ) ).astype(np.float16)
- # mean local order over rois, and then over time, over models
- orderL = np.mean( np.mean(np.mean( self.z_abs, axis = 0 ), axis=0) ).astype(np.float16)
- # mean local variation (metastability), over models
- metaL = np.float16( np.mean(np.std( self.z_abs, axis=1), axis=1) )
- orders = np.array([orderL, order1, orderSD])
- return orders, metaL
-
- def impulsePerturb(self):
- """ Local couplings perturbation:
- (L + pulse) * Z_() = Z'_() wehre z_() is couplin variable in model
- becomes -> (1 + pulse/L) * z_() = Z'_() \ reshape to 2-dimensions"""
- # print('impulse ID: ', np.sum(self.pulse**2))
- self.impulse = (1 + self.pulse.reshape(self.C, self.nStates) / self.localC).astype(np.float32)
- def _edgesImpulse(self):
- self.edgePulse = np.zeros((self.nStates, 2))
- for i in range(self.nStates):
- self.edgePulse[i, 0] = self.bump_dt[i] - (self.offMidBump + 0.5*self.widthPulse) * 1/self.dt
- self.edgePulse[i, 1] = self.bump_dt[i] - (self.offMidBump - 0.5*self.widthPulse) * 1/self.dt
-
- @nb.vectorize([nb.complex64(nb.complex64, nb.complex64, nb.complex64)],cache=True,target='cpu',nopython=True)
- def _sumPartsASdt(param, z_n, couplingSum):
- return z_n + param * z_n - couplingSum
-
- @nb.vectorize([nb.complex64(nb.float32, nb.complex64, nb.complex64)],cache=True,target='cpu',nopython=True)
- def _coupling(kS, zn2, zp_del):#, res):
- # zn2 = zn * zn
- z_pro = np.conjugate(zp_del) * zn2
- return kS * (z_pro - zp_del)
-
- @nb.vectorize([nb.complex64(nb.complex64)],cache=True,target='cpu',nopython=True)
- def _zn2(z):
- return z * z
- def _KMAOcommuParZnV(z,maxDlay,dlayStep,fs,dt,kS, iXc,localPara):
- # best timing so far
- for n in range(maxDlay,z.shape[1]-1):
- zn = z[:,n,:] #
- iXdel = n-dlayStep
- zp_del = z[iXc, iXdel,:]
- zn2 = WBLSmodel._zn2( zn )
- # zn2_rep = np.repeat(zn2[:,np.newaxis,:], zn.shape[0], axis=1)
- zn2_rep = np.repeat(zn2[np.newaxis,:,:], zn.shape[0], axis=0)
- couplin = WBLSmodel._coupling(kS, zn2_rep, zp_del)#, couplin)
- kzpn = np.sum(couplin, axis=0)
- # add differntial step
- z[:,n+1,:] = WBLSmodel._sumPartsASdt(localPara, zn , kzpn)
- return z
-
- def _KMAOcommuParZnV_NoDel(z,fs,dt,kS,localPara):
- nMo = z.shape[2]
- # best timing so far
- for n in range(z.shape[1]-1):
- zn = z[:,n,:] #
- zn2 = WBLSmodel._zn2( zn )
- zn2_rep = np.lib.stride_tricks.as_strided(zn2,(68, 68, nMo),(0, zn2.strides[0], zn2.strides[1]))
- zp_rep = np.lib.stride_tricks.as_strided(zn,(68, 68, nMo),( zn.strides[0],0, zn.strides[1]))
- couplin = WBLSmodel._coupling(kS, zn2_rep, zp_rep)#, couplin)
- kzpn = np.sum(couplin, axis=0)
- # add differntial step
- z[:,n+1,:] = WBLSmodel._sumPartsASdt(localPara, zn , kzpn)
- return z
- #
- def _KMAOcommuParZnV_in(z,maxDlay,dlayStep,fs,dt,kS, iXc,localPara, impulse, edges_p):
- # best timing so far
- iXdiag = np.diag_indices(68, 2)
- for n in range(maxDlay,z.shape[1]-1):
- zn = z[:,n,:] #
- iXdel = n-dlayStep
- zp_del = z[iXc, iXdel,:]
- zn2 = WBLSmodel._zn2( zn )
- # zn2_rep = np.repeat(zn2[:,np.newaxis,:], zn.shape[0], axis=1)
- zn2_rep = np.repeat(zn2[np.newaxis,:,:], zn.shape[0], axis=0)
- couplin = WBLSmodel._coupling(kS, zn2_rep, zp_del)#, couplin)
- # index diagonal elements and add pulse in-place
- if (n >= edges_p[0,0]) and (n <= edges_p[0,1]):
- couplin[iXdiag] *= impulse[:,0,np.newaxis]
- kzpn = np.sum(couplin, axis=0)
- # add differntial step
- z[:,n+1,:] = WBLSmodel._sumPartsASdt(localPara, zn , kzpn)
- return z
- def _KMAOcommuParZnV_NoDel_in(z,fs,dt,kS,localPara, impulse, edges_p):
- nMo = z.shape[2]
- iXdiag = np.diag_indices(68, 2)
- # best timing so far
- for n in range(z.shape[1]-1):
- zn = z[:,n,:] #
- zn2 = WBLSmodel._zn2( zn )
- zn2_rep = np.lib.stride_tricks.as_strided(zn2,(68, 68, nMo),(0, zn2.strides[0], zn2.strides[1]))
- zp_rep = np.lib.stride_tricks.as_strided(zn,(68, 68, nMo),( zn.strides[0],0, zn.strides[1]))
- couplin = WBLSmodel._coupling(kS, zn2_rep, zp_rep)#, couplin)
- # index diagonal elements and add pulse in-place
- if (n >= edges_p[0,0]) and (n <= edges_p[0,1]):
- couplin[iXdiag] *= impulse[:,0,np.newaxis]
- kzpn = np.sum(couplin, axis=0)
- # add differntial step
- z[:,n+1,:] = WBLSmodel._sumPartsASdt(localPara, zn , kzpn)
- return z
-
- def _KMAOcommuParZnV_in_sequen(z,maxDlay,dlayStep,fs,dt,kS, iXc,localPara, impulse, edges_p):
- iXdiag = np.diag_indices(68, 2)
- for n in range(maxDlay,z.shape[1]-1):
- zn = z[:,n,:] #
- iXdel = n-dlayStep
- zp_del = z[iXc, iXdel,:]
- zn2 = WBLSmodel._zn2( zn )
- zn2_rep = np.repeat(zn2[np.newaxis,:,:], zn.shape[0], axis=0)
- couplin = WBLSmodel._coupling(kS, zn2_rep, zp_del)#, couplin)
- # index diagonal elements and add pulse in-place
- try:
- if (n >= edges_p[0,0]) and (n <= edges_p[0,1]):
- couplin[iXdiag] *= impulse[:,0,np.newaxis]
- except:
- pass
- try:
- if (n >= edges_p[1,0]) and (n <= edges_p[1,1]):
- couplin[iXdiag] *= impulse[:,1,np.newaxis]
- except:
- pass
- try:
- if (n >= edges_p[2,0]) and (n <= edges_p[2,1]):
- couplin[iXdiag] *= impulse[:,2,np.newaxis]
- except:
- pass
- try:
- if (n >= edges_p[3,0]) and (n <= edges_p[3,1]):
- couplin[iXdiag] *= impulse[:,3,np.newaxis]
- except:
- pass
- kzpn = np.sum(couplin, axis=0)
- # add differntial step
- z[:,n+1,:] = WBLSmodel._sumPartsASdt(localPara, zn , kzpn)
- return z
-
- def downsampleModel(self, z):
- # simple downsampling (there may be aliasing)
- z = z[:,self.maxDlay+1:,:] # remove history samples used in the begining
- return z[:,::np.int32(1./(self.fs*self.dt)),:]
-
- def _setHistDelayAndContainers(self):
- if self.velocity != 0:
- self.getDelays(self.velocity)
- else:
- self.maxDlay = 0
-
- if self.stageSim.size > 1:
- self.zIni = self._doNodeContainers(self.maxDlay)
- elif self.stageSim == 0:
- self.zIni = self._doNodeContainers(self.maxDlay)
- elif self.stageSim > 0 :
- self.zIni = self._doNodeContainerSimHistory()
-
- def _doNodeContainers(self, maxDlay):
- """ containers with shape [number of ROIs * number of models, number of samples]
- models are concatenated along raws (vertically),
- """
- self.nCM = self.C*self.nModel # number of models and communites
- eIniAmp1 = self.eIniAmp1[:,:self.nModel].T.ravel()
- eIniAmp2 = self.eIniAmp2[:,:self.nModel].T.ravel()
- eIniPhi = self.eIniPhi[:,:self.nModel].T.ravel()
- # node's variables
- r = np.zeros((self.nCM, int(self.tMax/self.dt + maxDlay)), dtype=np.float32) # node phase parameter [C, Nsamples to integrate]
- phi = np.zeros((self.nCM, int(self.tMax/self.dt + maxDlay)), dtype=np.float32) # node phase parameter [C, Nsamples to integrate]
- # initial conditions as history for the time delays
- omegaT = self.omega * np.tile(np.linspace(0, (maxDlay+1)*self.dt, maxDlay+1, dtype=np.float32), (self.nCM,1))
- if self.iniCond == 1: # random with fiexd seed
- np.random.seed(self.iniSeed)
- phiRa = np.tile(np.random.uniform(-np.pi, np.pi,self.nCM), (maxDlay+1,1)).T
- phi[:,0:maxDlay+1] = np.float32(np.remainder(omegaT + phiRa, 2*np.pi))
- np.random.seed(self.iniSeed+1)
- r[:,0:maxDlay+1] = np.tile( np.random.uniform(0.05, 0.95,self.nCM), (maxDlay+1,1)).T
- elif self.iniCond == 2: # phase: measured initial conditions
- # phase
- phiIni = np.tile(eIniPhi, (maxDlay+1,1)).T
- phi[:,0:maxDlay+1] = np.float32(np.remainder(omegaT + phiIni, 2*np.pi))
- # amplitude
- slope = (eIniAmp2 - eIniAmp1) / (-0.11/self.dt) # a = (y2 - y1) / (x2 - x1)
- interc = eIniAmp1 - slope*(-0.11/self.dt) # b = y1 - a * x1
- point1 = interc + slope*(maxDlay+1)
- rh = np.zeros((self.nCM, maxDlay+1), dtype=np.float32)
- for i in range(self.nCM):
- rh[i,:] = np.linspace(point1[i], eIniAmp2[i], maxDlay+1)
- rh[rh<0.05] = 0.05
- r[:,0:maxDlay+1] = rh
- elif self.iniCond == 3: # phase: half plane; amplitude: stabl 0.5
- phiOff = np.tile(np.linspace(0,np.pi,self.C), (int(maxDlay+1),self.nModel)).T
- phi[:,0:maxDlay+1] = np.float32(np.remainder(omegaT + phiOff, 2*np.pi))
- r[:,0:maxDlay+1] = 0.5*np.ones((self.nCM,maxDlay+1), dtype=np.float32)
- elif self.iniCond == 4: # phase: random; amplitude: stabl 0.2
- r = 0.2*np.ones((self.C, int(self.tMax/self.dt + maxDlay), self.nModel), dtype=np.float32) # node phase parameter [C, Nsamples to integrate]
- phi = np.float32(np.random.randn(self.C, int(self.tMax/self.dt + maxDlay), self.nModel)) # node phase parameter [C, Nsamples to integrate]
-
- else:
- raise ValueError('not defined self.iniCond for such value')
- if self.oneModel: # for testing
- r = r[:self.C,:]
- phi = phi[:self.C,:]
- else:
- z = np.zeros((self.C,r.shape[1],self.nModel),dtype=np.complex64)
- for i in range(self.nModel):
- z[:,:,i] = r[i*self.C:(i+1)*self.C,:] * np.exp(1j* phi[i*self.C:(i+1)*self.C,:])
- self.N = r.shape[1]
- return z
-
- def _doNodeContainerSimHistory(self):
- """ takes a delays history as input, typicaly the previous stage """
- nh = self.zhist.shape[1] # lenght [samples] of history
- z = np.zeros((self.C, int(nh+self.tMax/self.dt), self.nModel), dtype=np.complex64)
- z[:,:nh,:] = self.zhist
- return z
-
- def modelBehavior(self, z):
- self.z = copy.deepcopy(z)
- z = self.downsampleModel(z)
- self.z_abs = WBLSmodel._absZ(z)
- self.dPLI3d_v(z)
- self.orders, self.metaL = self.doKuramotoOrder(z)
-
- def meanEnvBuStage(self):
- # mean envelope over stages
- self.msEnv = np.zeros((self.C, self.nStates), dtype=np.float32)
- self.mbEnv = np.zeros((self.C, self.nStates), dtype=np.float32)
- for i in range(self.nStates):
- # mean envelpe amplitude over stage samples and model
- self.msEnv[:,i] = np.mean(self.z_abs[:, self.edges_n[i,0]:self.edges_n[i,1],:], axis=(1,2))
- self.mbEnv[:,i] = np.mean(self.z_abs[:, int(self.bump_n[i]-0.025*self.fs):int(self.bump_n[i]+0.025*self.fs),:], axis=(1,2))
-
- def runModel(self):
- self._doCouplingLSBMsimu()
- self.getDelays(self.velocity)
- z = self._doNodeContainers(self.maxDlay)
- z = WBLSmodel._KMAOcommuParZnV(z,self.maxDlay,self.dlayStep,self.fs,self.dt,self.Ksim,self.iXc, self.locaDyn)
- self.modelBehavior(z)
-
- def runModelNoDel(self):
- self._doCouplingLSBMsimu()
- z = WBLSmodel._KMAOcommuParZnV_NoDel(self.zIni,self.fs,self.dt,self.Ksim, self.locaDyn)
- self.modelBehavior(z)
-
- def runModelNoDel_in(self):
- """ pulse created in the model instance """
- z = WBLSmodel._KMAOcommuParZnV_NoDel_in(self.zIni,self.fs,self.dt,self.Ksim, self.locaDyn, self.impulse, self.edgePulse)
- self.modelBehavior(z)
-
- def runModel_Pulsein(self):
- self._doCouplingLSBMsimu()
- self.impulsePerturb() # (z, maxDlay, dlayStep, fs, dt, kS, iXc, localPara, impulse, edges_p)
- edgePulse = self.edgePulse + self.maxDlay
- z = WBLSmodel._KMAOcommuParZnV_in(self.zIni,self.maxDlay,self.dlayStep, self.fs,self.dt,self.Ksim, self.iXc, self.locaDyn, self.impulse, edgePulse)
- self.modelBehavior(z)
- def runModelNoDel_Pulsein(self):
- self._doCouplingLSBMsimu()
- self.impulsePerturb() # (z, maxDlay, dlayStep, fs, dt, kS, iXc, localPara, impulse, edges_p)
- z = WBLSmodel._KMAOcommuParZnV_NoDel_in(self.zIni, self.fs,self.dt,self.Ksim, self.locaDyn, self.impulse, self.edgePulse)
- self.modelBehavior(z)
-
- def runModel_Pulsein_sequence(self):
- self._doCouplingLSBMsimu()
- self.impulsePerturb() # (z, maxDlay, dlayStep, fs, dt, kS, iXc, localPara, impulse, edges_p)
- edgePulse = self.edgePulse + self.maxDlay
- z = WBLSmodel._KMAOcommuParZnV_in_sequen(self.zIni,self.maxDlay,self.dlayStep, self.fs,self.dt,self.Ksim, self.iXc, self.locaDyn, self.impulse, edgePulse)
- self.modelBehavior(z)
-
- if __name__ == "__main__":
-
- stages = [0,1]
- stages = 0
- kao = WBLSmodel(stageIx=stages)
- # kao.iniCond = 1
- # kao.iniSeed = 1
- kao._setHistDelayAndContainers()
- kao._edgesImpulse()
- kao._getEmpiReativEnv()
- kao.doNoise(variance=0)
-
- try:
- kao.pulse = np.ones((68,len(stages)))*10
- except:
- kao.pulse = np.ones(68)*10
- kao.runModel_Pulsein_noise()
- kao.similarityAllPLIs()
- kao.relativeEnvBuPre()
- kao.getScorrelRelEnv()
-
|