12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133 |
- '''
- For microstate sequence analysis after getting the sequences for each trial through fit-back process (microstate_to_sequence.py)
- Designed for analyzing Flight Simulator data collected at CTA, Montreal in November 2021
- Adapted from Jia's algorithm
- Prepared by Mengting Zhao, on Sep 16 2024
- '''
- import numpy as np
- import copy
- from scipy.stats import chi2, chi2_contingency, entropy, t, sem, norm
- from scipy import signal
- from utilis import load_data, read_subject_info, write_info, read_xlsx, to_string
- from multiprocessing import Pool
- import codecs, json
- import itertools
- import matplotlib.pyplot as plt
- import matplotlib.ticker as ticker
- from itertools import combinations
- from matplotlib.ticker import ScalarFormatter
- from sklearn.preprocessing import normalize
- import string
- import matplotlib.gridspec as gridspec
- from scipy import stats
- from statsmodels.stats.multitest import multipletests
- from math_utilis import nCr
- import mne
- from utilis import create_info
- from scipy.io import loadmat, savemat
- import os
- from collections import OrderedDict
- from os.path import join as pjoin
- import math
- import statsmodels.stats.multitest as smt
- def ceil_decimal(num, n):
- q = len(num)
- pvalues = []
- for i in range(q):
- p = math.ceil(num[i]*10**n)/(10**n)
- pvalues.append(p)
- return pvalues
- def pval_corrected(pvals, method='bonferroni'):
- return smt.multipletests(pvals, method=method)[1]
- class MicrostateParameter:
- def __init__(self, sequence, n_microstate):
- self.sequence = sequence
- self.n_microstate = n_microstate
- self.n_sequence = len(sequence)
- def calculate_duration(self, window=None):
- def duration(sequence):
- res = []
- label = sequence[0]
- count = 1
- res_temp = {}
- for i in range(self.n_microstate):
- res_temp[i] = []
- for j in range(1, len(sequence)):
- if label == sequence[j]:
- count += 1
- else:
- label = sequence[j]
- res_temp[sequence[j - 1]].append(count)
- count = 1
- for i in range(self.n_microstate):
- res.append(np.sum(res_temp[i]) / len(res_temp[i]) if len(res_temp[i])>0 else 1)
- return res
- if window:
- n = int(self.n_sequence / window)
- temp = []
- for i in range(n):
- res = duration(self.sequence[i*window:(i+1)*window])
- temp.append(res)
- return np.asarray(temp).mean(axis=0).tolist()
- else:
- return duration(self.sequence)
- def calculate_frequency(self, window):
- res = []
- res_temp = {}
- for i in range(self.n_microstate):
- res_temp[i] = []
- n_block = int(self.n_sequence / window)
- for i in range(n_block):
- label = self.sequence[i*window]
- temp = {}
- for j in range(i*window + 1, (i+1)*window):
- if label != self.sequence[j]:
- if label in temp:
- temp[label] += 1
- else:
- temp[label] = 1
- label = self.sequence[j]
- for key, value in temp.items():
- res_temp[key].append(value)
- for i in range(self.n_microstate):
- res.append(np.mean(res_temp[i]))
- return res
- def calculate_coverage(self):
- res = []
- for i in range(self.n_microstate):
- res.append(np.argwhere(np.asarray(self.sequence) == i).shape[0] / self.n_sequence)
- return res
- class MicrostateLongRangeDependence:
- def __init__(self, sequence, n_microstate):
- self.sequence = sequence
- self.n_microstate = n_microstate
- self.n_sequence = len(sequence)
- def partition_state(self, k):
- comb = combinations([i for i in range(self.n_microstate)], k)
- length = nCr(self.n_microstate, k)
- res = []
- for index, item in enumerate(comb):
- if k % 2 == 0:
- if index == length / 2:
- break
- else:
- res.append(item)
- else:
- res.append(item)
- return res
- def embed_random_walk(self, k):
- partitions = self.partition_state(k)
- np_sequence = np.asarray(self.sequence)
- res = {}
- for item in partitions:
- temp_x = np.ones(self.n_sequence) * -1
- for state in item:
- temp_x[np.where(np_sequence == state)[0]] = 1
- res[to_string(item)] = temp_x
- return res
- @staticmethod
- def detrend(embed_sequence, window_size):
- ## commented by Mengting
- shape = (embed_sequence.shape[0]//window_size, window_size)
- temp = np.lib.stride_tricks.as_strided(embed_sequence, shape)
- window_size_index = np.arange(window_size)
- res = np.zeros(shape[0])
- for index, y in enumerate(temp):
- coeff = np.polyfit(window_size_index, y, 1) # finding the trend through fitting
- y_hat = np.polyval(coeff, window_size_index)
- res[index] = np.sqrt(np.mean((y-y_hat)**2)) # subtracting the 'trend'
- return res
- @staticmethod
- def dfa(embed_sequence, segment_range, segment_density):
- ## this function is called in the dfa( ) function defined outside the Class
- ## commented by Mengting
- y = np.cumsum(embed_sequence-np.mean(embed_sequence))
- scales = (2**np.arange(segment_range[0], segment_range[1], segment_density)).astype(np.int)
- f = np.zeros(len(scales))
- for index, window_size in enumerate(scales):
- f[index] = np.sqrt(np.mean(MicrostateLongRangeDependence.detrend(y, window_size)**2))
- coeff = np.polyfit(np.log2(scales), np.log2(f), 1) # the core idea is to find the polyfitting coefficient
- return {'slope': coeff[0], 'fluctuation': f.tolist(), 'scales':scales.tolist()}
- @staticmethod
- def shanon_entropy(x, nx, ns):
- ## in our case, base 2 is applied
- p = np.zeros(ns)
- for t in range(nx):
- p[x[t]] += 1.0
- p /= nx
- return -np.sum(p[p>0]*np.log2(p[p>0]))
- @staticmethod
- def shanon_joint_entropy(x, y, nx, ny, ns):
- n = min(nx, ny)
- p = np.zeros((ns, ns))
- for t in range(n):
- p[x[t], y[t]] += 1.0
- p /= n
- return -np.sum(p[p>0]*np.log2(p[p>0]))
- @staticmethod
- def shanon_joint_entropy_k(x, nx, ns, k):
- # # ns = n_k
- p = np.zeros(tuple(k*[ns]))
- for t in range(nx-k):
- p[tuple(x[t:t+k])] += 1.0
- p /= (nx-k)
- h = -np.sum(p[p>0]*np.log2(p[p>0]))
- return h
- def mutual_information(self, lag):
- ### this part corresponds to the last line of Eq. (5) in Wenjun's paper
- lag = min(self.n_sequence, lag)
- res = np.zeros(lag)
- for time_lag in range(lag):
- nmax = self.n_sequence - time_lag # number of segments used for computation (moving forward with a length of 'lag')
- ## h respresents H(Xt)
- h = self.shanon_entropy(self.sequence[:nmax], nmax, self.n_microstate)
- ## h_lag respresents H(X(t+\tao))
- h_lag = self.shanon_entropy(self.sequence[time_lag:time_lag+nmax], nmax, self.n_microstate)
- ## h_h_lag respresents the joint entropy of the mentioned two sequence segments
- h_h_lag = self.shanon_joint_entropy(self.sequence[:nmax], self.sequence[time_lag:time_lag+nmax], nmax,
- nmax, self.n_microstate)
- res[time_lag] = h + h_lag - h_h_lag
- return res
- def partial_mutual_information(self, lag):
- p = np.zeros(lag)
- a = self.mutual_information(2)
- p[0], p[1] = a[0], a[1]
- for k in range(2, lag):
- h1 = MicrostateLongRangeDependence.shanon_joint_entropy_k(self.sequence,self.n_sequence,self.n_microstate,lag)
- h2 = MicrostateLongRangeDependence.shanon_joint_entropy_k(self.sequence,self.n_sequence,self.n_microstate,lag-1)
- h3 = MicrostateLongRangeDependence.shanon_joint_entropy_k(self.sequence, self.n_sequence, self.n_microstate,
- lag + 1)
- p[k] = 2*h1 -h2 - h3
- return p
- def excess_entropy_rate(self, kmax):
- h = np.zeros(kmax)
- for k in range(kmax):
- h[k] = MicrostateLongRangeDependence.shanon_joint_entropy_k(self.sequence, self.n_sequence, self.n_microstate, k+1)
- ks = np.arange(1, kmax+1)
- entropy_rate, excess_entropy = np.polyfit(ks, h, 1) # Least squares polynomial fit
- return entropy_rate, excess_entropy, ks
- @staticmethod
- def plot_mutual_information(data, title, subject, index, f):
- plt.semilogy(np.arange(0, data.shape[0]*f, f), data)
- plt.xlabel("time lag (1ms)")
- plt.ylabel("AIF (bits)")
- plt.title(subject+"_"+title+"_"+str(index))
- plt.show()
- # @staticmethod
- # def plot_mutual_information(data, title):
- # for i in range(len(data)):
- # temp = np.asarray(data[i])
- # plt.semilogy([j for j in range(2000)], temp[0:2000]/temp[0])
- # # plt.semilogy([j for j in range(len(data[i][0:2000]))], data[i][0:2000])
- # plt.xlabel("time lag (2ms)")
- # plt.ylabel("AIF (bits)")
- # plt.title(title)
- # plt.show()
- class MicrostateMarkov:
- def __init__(self, sequence, n_microstate):
- self.sequence = sequence
- self.n_microstate = n_microstate
- self.n_sequence = len(sequence)
- def transition_matrix_hat(self, k):
- ## When executing this function in Markov_probability, k is set to 1 & 2
- ## the output condition_distribution matrix is 7*7, saving the conditional probability of the Markom process
- # ### where row represents the current (t-th) status and column represents the (t+k)th status
- joint_distribution = np.zeros(self.n_microstate**k)
- condition_distribution = np.zeros((self.n_microstate**k, self.n_microstate)) # a 7*7 matrix for k=1
- sh = tuple(k*[self.n_microstate])
- d = {}
- for i, index in enumerate(np.ndindex(sh)): #for setting the dictionary 'd' as 7 lists with the first elements as 0, 2, ...6
- d[index] = i
- for t in range(self.n_sequence-k): ## counting the combinations of sequence[t] & sequence[t+k] in 'condition_distribution' matrix
- idx = tuple(self.sequence[t:t+k]) # for k=1, the element at sequence[t]=3 for example
- i = d[idx]
- j = self.sequence[t+k] # the element at sequence[t+k]=4 for example
- joint_distribution[i] += 1. ## saving the joint_distribution in a vector of 7 elements
- condition_distribution[i,j] += 1.
- joint_distribution /= joint_distribution.sum()
- p_row = condition_distribution.sum(axis=1, keepdims=True)
- p_row[p_row == 0] = 1.
- condition_distribution /= p_row
- return joint_distribution, condition_distribution
- @staticmethod
- def surrogate_markov_chain(joint_distribution, condition_distribution, k, n_microstate, n_surrogate):
- sh = tuple(k * [n_microstate])
- d = {}
- dinv = np.zeros((n_microstate**k, k))
- for i, idx in enumerate(np.ndindex(sh)):
- d[idx] = i
- dinv[i] = idx
- joint_distribution_sum = np.cumsum(joint_distribution)
- x = np.zeros(n_surrogate)
- x[:k] = dinv[np.min(np.argwhere(np.random.rand() < joint_distribution_sum))]
- condition_distribution_sum = np.cumsum(condition_distribution, axis=1)
- for t in range(n_surrogate-k-1):
- idx = tuple(x[t:t+k])
- i = d[idx]
- ## how to understand the following line??
- j = np.min(np.argwhere(np.random.rand() < condition_distribution_sum[i]))
- x[t+k] = j
- return x.astype('int')
- def test_markov(self, order):
- if order == 0:
- return self.test_markov0()
- n = len(self.sequence)
- df = np.power(self.n_microstate, 2 + order) - 2 * np.power(self.n_microstate, 1 + order) + np.power(
- self.n_microstate, order)
- temp = np.zeros(self.n_microstate)
- frequency = []
- # f_ijk..(n+2), f_ij..(n+1)*, f_*jk..(n+2), f_*jk...(n+1)*
- frequency.append(np.tile(temp, [self.n_microstate for i in range(order + 1)] + [1]))
- frequency.append(np.tile(temp, [self.n_microstate for i in range(order)] + [1]))
- frequency.append(np.tile(temp, [self.n_microstate for i in range(order)] + [1]))
- frequency.append(np.tile(temp, [self.n_microstate for i in range(order - 1)] + [1]))
- for t in range(n - order - 1):
- frequency_index = []
- for j in range(order + 2):
- frequency_index.append(self.sequence[t + j])
- frequency[0][tuple(frequency_index)] += 1
- frequency[1][tuple(frequency_index[:-1])] += 1
- frequency[2][tuple(frequency_index[1::])] += 1
- frequency[3][tuple(frequency_index[1:-1])] += 1
- T = 0.0
- for frequency_index in np.ndindex(frequency[0].shape):
- f = frequency[0][tuple(frequency_index)] * frequency[1][tuple(frequency_index[:-1])] * \
- frequency[2][tuple(frequency_index[1::])] * frequency[3][tuple(frequency_index[1:-1])]
- if f > 0:
- T += frequency[0][tuple(frequency_index)] * \
- np.log((frequency[0][tuple(frequency_index)] * frequency[3][tuple(frequency_index[1:-1])])
- / (frequency[1][tuple(frequency_index[:-1])] * frequency[2][tuple(frequency_index[1::])]))
- T *= 2.0
- p = chi2.sf(T, df)
- # print(T, df, p)
- return p
- def test_markov0(self):
- n = len(self.sequence)
- f_ij = np.zeros((self.n_microstate, self.n_microstate))
- f_i = np.zeros(self.n_microstate)
- f_j = np.zeros(self.n_microstate)
- for t in range(n - 1):
- i = self.sequence[t]
- j = self.sequence[t + 1]
- f_ij[i, j] += 1.0
- f_i[i] += 1.0
- f_j[i] += 1.0
- T = 0.0
- for i, j in np.ndindex(f_ij.shape):
- f = f_ij[i, j] * f_i[i] * f_j[j]
- if f > 0:
- T += (f_ij[i, j] * np.log((n * f_i[i] * f_j[j]) / f_i[i] * f_j[j]))
- T *= 2.0
- df = (self.n_microstate - 1) * (self.n_microstate - 1)
- p = chi2.sf(T, df)
- return p
- def test_conditional_homogeneity(self, block_size, order, s=None):
- if s is None:
- n = len(self.sequence)
- s = int(np.floor(float(n) / float(block_size)))
- df = (s - 1.) * (np.power(self.n_microstate, order + 1) - np.power(self.n_microstate, order))
- frequency = []
- # f_ijk..(n+2), f_ij..(n+1)*, f_*jk..(n+2), f_*jk...(n+1)*
- frequency.append(np.zeros(([s] + [self.n_microstate for i in range(order + 1)])))
- frequency.append(np.zeros(([s] + [self.n_microstate for i in range(order)])))
- frequency.append(np.zeros(([self.n_microstate for i in range(order + 1)])))
- frequency.append(np.zeros(([self.n_microstate for i in range(order)])))
- l_total = 0
- for i in range(s):
- if isinstance(block_size, list):
- l = block_size[i]
- else:
- l = block_size
- for j in range(l - order):
- frequency_index = []
- frequency_index.append(i)
- for k in range(order + 1):
- frequency_index.append(self.sequence[l_total + j + k])
- frequency[0][tuple(frequency_index)] += 1.
- frequency[1][tuple(frequency_index[:-1])] += 1.
- frequency[2][tuple(frequency_index[1::])] += 1.
- frequency[3][tuple(frequency_index[1:-1])] += 1.
- l_total += l
- T = 0.0
- for frequency_index in np.ndindex(frequency[0].shape):
- f = frequency[0][tuple(frequency_index)] * frequency[1][tuple(frequency_index[:-1])] * \
- frequency[2][tuple(frequency_index[1::])] * frequency[3][tuple(frequency_index[1:-1])]
- if f > 0:
- T += frequency[0][tuple(frequency_index)] * \
- np.log((frequency[0][tuple(frequency_index)] * frequency[3][tuple(frequency_index[1:-1])])
- / (frequency[1][tuple(frequency_index[:-1])] * frequency[2][tuple(frequency_index[1::])]))
- T *= 2.0
- p = chi2.sf(T, df, loc=0, scale=1.)
- return p
- def conditionalHomogeneityTest(self, l):
- n = len(self.sequence)
- ns = self.n_microstate
- X = self.sequence
- r = int(np.floor(float(n) / float(l))) # number of blocks
- nl = r * l
- f_ijk = np.zeros((r, ns, ns))
- f_ij = np.zeros((r, ns))
- f_jk = np.zeros((ns, ns))
- f_i = np.zeros(r)
- f_j = np.zeros(ns)
- # calculate f_ijk (time / block dep. transition matrix)
- for i in range(r): # block index
- for ii in range(l - 1): # pos. inside the current block
- j = X[i * l + ii]
- k = X[i * l + ii + 1]
- f_ijk[i, j, k] += 1.0
- f_ij[i, j] += 1.0
- f_jk[j, k] += 1.0
- f_i[i] += 1.0
- f_j[j] += 1.0
- # conditional homogeneity (Markovianity stationarity)
- T = 0.0
- for i, j, k in np.ndindex(f_ijk.shape):
- # conditional homogeneity
- f = f_ijk[i, j, k] * f_j[j] * f_ij[i, j] * f_jk[j, k]
- if (f > 0):
- T += (f_ijk[i, j, k] * np.log((f_ijk[i, j, k] * f_j[j]) / (f_ij[i, j] * f_jk[j, k])))
- T *= 2.0
- df = (r - 1) * (ns - 1) * ns
- # p = chi2test(T, df, alpha)
- p = chi2.sf(T, df, loc=0, scale=1)
- return p
- def paired_t_test_condition_parameter(path, savepath, sheets, n_microstates, conditions):
- n_conditions = len(conditions)
- alphabet_string = list(string.ascii_uppercase)
- for sheet in sheets:
- res = []
- res_correct = []
- data = np.asarray(read_xlsx(path, sheet))
- for i in range(n_microstates):
- data_temp = data[:,i*n_conditions:(i+1)*n_conditions]
- correct_p = []
- count = 0
- for comb in itertools.combinations([j for j in range(n_conditions)], 2):
- p = stats.ttest_rel(data_temp[:, comb[0]], data_temp[:, comb[1]])
- res.append([alphabet_string[i], conditions[comb[0]], conditions[comb[1]], p[0], p[1]])
- res_correct.append([alphabet_string[i], conditions[comb[0]], conditions[comb[1]]])
- correct_p.append(p[1])
- count += 1
- temp = multipletests(correct_p, method='bonferroni')[1].tolist()
- for j in range(len(res)-1, len(res)-count-1, -1):
- res_correct[j].append(temp[count-1])
- count -= 1
- write_info(savepath, sheet, res)
- write_info(savepath, sheet + '_' + 'bonferroni', res_correct)
- def paired_t_test_run_parameter(path, savepath, sheets, n_microstates, n_conditions, n_runs):
- for sheet in sheets:
- res = []
- res_correct = []
- data = np.asarray(read_xlsx(path, sheet))[:, n_microstates::]
- for i in range(0, n_microstates*n_conditions*n_runs, n_runs):
- data_temp = data[:, i*n_runs:(i+1)*n_runs]
- correct_p = []
- for comb in itertools.combinations([j for j in range(n_runs)], 2):
- p = stats.ttest_rel(data_temp[:, comb[0]], data_temp[:, comb[1]])
- res.append(p)
- correct_p.append(p)
- # res_correct.extend(multipletests(correct_p, method='bonferroni')[1])
- write_info(savepath, sheet, res)
- # write_info(savepath, sheet+'_'+'bonferroni', res_correct)
- def plot_run_parameter(path, sheets, row, n_microstates, n_runs, conditions):
- alphabet_string = list(string.ascii_uppercase)
- column = int(math.ceil(n_microstates / row))
- n_conditions = len(conditions)
- title = ['Microstate' + alphabet_string[i] for i in range(n_microstates)]
- block_size = n_microstates * n_runs
- for sheet in sheets:
- data = np.asarray(read_xlsx(path, sheet))[n_microstates::]
- for n_condition in range(n_conditions):
- fig, ax = plt.subplot(row, column)
- count = 0
- for i in range(row):
- for j in range(column):
- ax[i][j].errorbar(data[count:count+n_runs], np.mean(data[count:count+n_runs], axis=0),
- yerr=stats.sem(data, axis=0))
- count += n_runs
- def label_diff(i, j, yoffset, text, y, ax):
- y = max(y[i], y[j])+ yoffset
- props = {'arrowstyle': '-', 'linewidth': 2}
- ax.annotate(text, xy=(i, y+0.003), zorder=1)
- ax.annotate('', xy=(i, y), xytext=(j, y), arrowprops=props)
- def plot_block(mean, yerr, conditions, ax, title, ylabe, p_value=None, p_bar=None):
- n_conditions = len(conditions)
- if p_value is None:
- for index, value in enumerate(conditions):
- ax.errorbar(index, mean[index], yerr[index], fmt='.', marker='.', color='black', capsize=5, capthick=2)
- else:
- for index, value in enumerate(conditions):
- if index == 0:
- ax.errorbar(index, mean[index], yerr[index], fmt='.', marker='.', color='black', capsize=5, capthick=2)
- else:
- p = float(p_value[index-1])
- if p > 0.05:
- ax.errorbar(index, mean[index], yerr=yerr[index], fmt='.', marker='.', color='black', capsize=5, capthick=2)
- if 0.01 <= p <= 0.05:
- ax.errorbar(index, mean[index], yerr=yerr[index], fmt='.', marker='*', mfc='red', mec='red', color='black', capsize=5, capthick=2)
- if 0.005 <= p < 0.01:
- ax.errorbar(index, mean[index], yerr=yerr[index], fmt='.', marker='*', mfc='green', mec='green', color='black', capsize=5, capthick=2)
- if p < 0.005:
- ax.errorbar(index, mean[index], yerr=yerr[index], fmt='.', marker='*', mfc='blue', mec='blue', color='black', capsize=5, capthick=2)
- for i in range(len(conditions)-1, len(p_value)):
- p = float(p_value[i])
- if p < 0.05:
- p_str = "p=" + format(p, '.3f')
- label_diff(p_bar[i][0], p_bar[i][1], p_bar[i][2], p_str, mean+yerr, ax)
- ax.set_title(title)
- ax.set_xticks(list(range(n_conditions)))
- ax.set_xticklabels(conditions)
- ax.set_ylabel(ylabe)
- ax.set_ylim(ymax=max(mean+yerr) + 0.01)
- ax.yaxis.set_major_formatter(ticker.FormatStrFormatter('%.2f'))
- def plot_condition_paramter(path, savepath, sheets, row, n_microstates, conditions, ylabe, path_t_test=None, p_value_bar=None):
- alphabet_string = list(string.ascii_uppercase)
- column = int(math.ceil(n_microstates/row))
- n_conditions = len(conditions)
- n_cr = int(nCr(n_conditions,2))
- row_num = column * n_conditions
- n_conditions = len(conditions)
- title = ['Microstate'+ alphabet_string[i] for i in range(n_microstates)]
- for sheet in sheets:
- data = np.asarray(read_xlsx(path, sheet))
- p_value = np.asarray(read_xlsx(path_t_test, sheet))[:,-1] if path_t_test else None
- fig, ax = plt.subplots(row, column, figsize=(14,8))
- for i in range(row):
- for j in range(column):
- if column*i+j == n_microstates:
- ax[i][j].axis('off')
- break
- plot_block(data[:, row_num*i+j*n_conditions:row_num*i+(j+1)*n_conditions], conditions, ax[i][j], title[column*i+j], ylabe[sheet], p_value[n_cr*(column*i+j):n_cr*(column*i+j+1)], p_value_bar)
- plt.subplots_adjust(hspace=0.4, wspace=0.5)
- plt.savefig(savepath+"//"+sheet+".png", dpi=1200)
- plt.show()
- def reorder_run_parameter(path, savepath, sheets, n_microstates, n_runs, n_conditions, order):
- for sheet in sheets:
- data = np.asarray(read_xlsx(path, sheet))
- data_ordered = np.zeros((data.shape[0], data.shape[1]))
- data_ordered[:, 0:n_microstates] = data[:, np.asarray(order)]
- block_size = n_microstates * n_runs
- for i in range(n_conditions):
- index = []
- for j in order:
- index.extend(list(range(j*n_runs+n_microstates+i*block_size,(j+1)*n_runs+n_microstates+i*block_size)))
- data_ordered[:,i*block_size+n_microstates:(i+1)*block_size+n_microstates] = data[:,index]
- write_info(savepath, sheet, data_ordered)
- def reorder_condition_parameter(path, savepath, sheets, n_conditions, order):
- for sheet in sheets:
- data = np.asarray(read_xlsx(path, sheet))
- data_ordered = np.zeros((data.shape[0], data.shape[1]))
- for i, i_value in enumerate(order):
- data_ordered[:, i * n_conditions:(i + 1) * n_conditions] = data[:, i_value * n_conditions:(i_value + 1) * n_conditions]
- write_info(savepath, sheet, data_ordered.tolist())
- def formulate_run_parameter(path, save_path, sheets, n_microstates, n_conditions, n_runs):
- block_size = n_microstates * n_runs
- for sheet in sheets:
- res = []
- data = read_xlsx(path, sheet)
- for row in data:
- temp_res = row[0:n_microstates]
- np_data = np.asarray(row[n_microstates::])
- for n_condition in range(n_conditions):
- index = []
- for j in range(n_microstates):
- for i in range(n_condition*block_size, (n_condition+1)*block_size, n_microstates):
- index.append(i+j)
- temp_res.extend(np_data[index])
- res.append(temp_res)
- write_info(save_path, sheet, res)
- def formulate_condition_parameter(path, save_path, sheets, n_microstates, n_conditions, n_runs):
- for sheet in sheets:
- res = []
- data = read_xlsx(path, sheet)
- for row in data:
- temp = []
- temp_res = []
- for i in range(n_microstates, len(row), n_runs):
- temp.append(np.mean(row[i:i+n_runs]))
- for i in range(n_microstates):
- temp_res.append(row[i])
- for j in range(n_conditions):
- temp_res.append(temp[i + j*n_microstates])
- res.append(temp_res)
- write_info(save_path, sheet, res)
- def batch_test_homogeneity_within_condition_within_subject(data, comb, order, n_microstate):
- res = {}
- multi_res = []
- pool = Pool(8)
- for combination in comb:
- data_merged = []
- block_size = []
- for item in combination:
- data_merged.extend(data[item])
- block_size.append(len(data[item]))
- multi_res.append(
- pool.apply_async(test_homogeneity, ([data_merged, n_microstate, block_size, order, len(combination)],)))
- pool.close()
- pool.join()
- for i in range(len(multi_res)):
- temp = '*'.join(comb[i])
- res[temp] = multi_res[i].get()
- return res
- def batch_test_homogeneity_within_task_across_subject(comb, task, order, n_microstate, subject_path):
- res = {}
- multi_res = []
- pool = Pool(8)
- for combination in comb:
- data_merged = []
- block_size = []
- for item in combination:
- data_temp = load_data(subject_path + "\\" + item + "_1_30_microstate_labels.json")[task]
- data_merged.extend(data_temp)
- block_size.append(len(data_temp))
- multi_res.append(
- pool.apply_async(test_homogeneity, ([data_merged, n_microstate, block_size, order, len(combination)],)))
- pool.close()
- pool.join()
- for i in range(len(multi_res)):
- temp = '*'.join(comb[i])
- res[temp] = multi_res[i].get()
- return res
- ## updated by Mengting Zhao on March 24 2022 for CTA anlaysis
- ## the function is ready for use
- def batch_surrogate_aif(joint_p, condition_p, n_markov, n_microstate, n_surrogate, n_lag, n_repetition):
- res = []
- multi_res = []
- # pool = Pool(7)
- for i in range(n_repetition):
- multi_res.append(surrogate_aif([joint_p, condition_p, n_markov, n_microstate, n_surrogate, n_lag]))
- # multi_res.append(
- # pool.apply_async(surrogate_aif, ([joint_p, condition_p, n_markov, n_microstate, n_surrogate, n_lag],))
- # )
- # pool.close()
- # pool.join()
- for i in range(len(multi_res)):
- # res.append(multi_res[i].get().tolist())
- res.append(multi_res[i].tolist())
- return res
- def surrogate_aif(para):
- j_p = para[0]
- c_p = para[1]
- n_markov = para[2]
- n_microstate = para[3]
- n_surrogate = para[4]
- n_lag = para[5]
- surrogate_data = MicrostateMarkov.surrogate_markov_chain(j_p, c_p, n_markov, n_microstate, n_surrogate)
- lrd = MicrostateLongRangeDependence(surrogate_data, n_microstate)
- return lrd.mutual_information(n_lag)
- ### updated by Mengting Zhao for CTA analysis on March 2
- ### function ready for use on CTA data analysis
- def batch_calculate_aif(data_path, tasks, n_microstate, lag, window_size, window_step, method='aif'):
- nb_trials = len(tasks)
- res = OrderedDict()
- multi_res = []
- if window_size == -1:
- # pool = Pool(11)
- for q in range(nb_trials):
- task = tasks[q]
- # res[task] = []
- data = loadmat(data_path + "\\" + path_name[q] + "\\" + task + "_seq.mat")['EEG'].flatten()
- multi_res.append(aif([data, n_microstate, lag, method]))
- # multi_res.append(pool.apply_async(aif, ([data, n_microstate, lag, method],)))
- for i in range(len(multi_res)):
- # res[tasks[i]].append(multi_res[i].get().tolist())
- res[tasks[i]].append(multi_res[i].tolist())
- return res
- def aif(para):
- ## change between AIF and partial-AIF with the fourth parameter
- ## the key function for AIF computation is mutual_information( )
- lrd = MicrostateLongRangeDependence(para[0], para[1])
- if para[3] == 'aif':
- res_aif = lrd.mutual_information(para[2])
- elif para[3] == 'paif':
- res_aif = lrd.partial_mutual_information(para[2])
- return res_aif
- ## updated by Mengting Zhao on March 24 2022 for CTA analysis
- ## function ready for use on preprocessed CTA data
- def batch_calculate_dfa(data, n_microstate, n_partitions, segment_range, segment_density):
- res = []
- # res = OrderedDict()
- multi_res = []
- # pool = Pool(11)
- lrd = MicrostateLongRangeDependence(data, n_microstate)
- #### !!! It is necessary to look into the details of embed_random_walk( )
- embeding = lrd.embed_random_walk(n_partitions)
- for embed_sequence_index, embed_sequence in embeding.items():
- # v_test = dfa([embed_sequence, embed_sequence_index, segment_range, segment_density])
- multi_res.append(dfa([embed_sequence, embed_sequence_index, segment_range, segment_density]))
- # multi_res.append(
- # pool.apply_async(dfa, ([embed_sequence, embed_sequence_index, segment_range, segment_density],))
- # )
- # pool.close()
- # pool.join()
- for i in range(len(multi_res)):
- # res.append(multi_res[i].get())
- res.append(multi_res[i])
- return res
- def dfa(para):
- embed_sequence = para[0]
- embed_sequence_index = para[1]
- segment_range = para[2]
- segment_density = para[3]
- res = {}
- res[embed_sequence_index] = MicrostateLongRangeDependence.dfa(embed_sequence, segment_range, segment_density)
- return res
- def test_homogeneity(para):
- sequence = MicrostateMarkov(para[0], para[1])
- p = sequence.test_conditional_homogeneity(para[2], para[3], para[4])
- return p
- def conditions_tasks(conditions, tasks):
- res = {}
- for condition in conditions:
- res[condition] = []
- for task in tasks:
- if task.startswith(condition):
- res[condition].append(task)
- return res
- def read_TLX_ratings(input_fname, sheet_name):
- # try_path = r'F:\FlightSim_experiment_2021\Segmentation_NRC\4. Subjective Ratings\Inlook_rating_zhao.xlsx'
- ddd = read_xlsx(input_fname, sheet_name, row=True)
- # # nb_trials = 22
- # lis_score = np.zeros(nb_trials)
- # id = 0
- res_score = []
- for row in ddd[1: :]:
- data_temp = row[1:7]
- score = np.sum(data_temp)/6
- # lis_score[id] = score
- # id += 1
- res_score.append(score)
- return res_score
- if __name__ == '__main__':
- n_k = 7
- n_chs = 64
- data_dir = r'F:\FlightSim_experiment_2021\Concordia_preprocessed'
- n_conditions = 3
- condition_name = ['training', 'practiceA', 'practiceB']
- subjects = ['P01', 'P02', 'P03', 'P04', 'P05', 'P06', 'P07', 'P08', 'P09', 'P10', 'P11', 'P12',
- 'P13', 'P14', 'P15', 'P16', 'P17', 'P18', 'P19', 'P20', 'P21', 'P22', 'P23', 'P24']
- path_name = ['trial_1', 'trial_2', 'trial_3', 'trial_4', 'trial_5', 'trial_6', 'trial_7',
- 'trial_8', 'trial_9', 'trial_10', 'trial_11', 'trial_12', 'trial_13', 'trial_14', 'trial_15',
- 'trial_16', 'trial_17', 'trial_18', 'trial_19', 'trial_20', 'trial_21', 'trial_22']
- lis_name = ['trial_1.set', 'trial_2.set', 'trial_3.set', 'trial_4.set', 'trial_5.set', 'trial_6.set', 'trial_7.set',
- 'trial_8.set', 'trial_9.set', 'trial_10.set', 'trial_11.set', 'trial_12.set', 'trial_13.set', 'trial_14.set', 'trial_15.set',
- 'trial_16.set', 'trial_17.set', 'trial_18.set', 'trial_19.set', 'trial_20.set', 'trial_21.set', 'trial_22.set']
- tasks = ['training_1', 'training_2', 'training_3', 'training_4', 'training_5', 'training_6', 'training_7',
- 'practiceA_1', 'practiceA_2', 'practiceA_3', 'practiceA_4', 'practiceA_5', 'practiceA_6', 'practiceA_7', 'practiceA_8',
- 'practiceB_1', 'practiceB_2', 'practiceB_3', 'practiceB_4', 'practiceB_5', 'practiceB_6', 'practiceB_7']
- nb_trials = len(path_name)
- ################the computation of temporal parameters is independant from the AIF, DFA, and entropy rate analysis#################
- ################ calculate temporal parameters
- ##### tested with success on reordered sequences###
- save_path_json = r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\parameters'
- save_path_excel = r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\parameters\condition_parameters.xlsx'
- f = 250
- f_m = 1000 / f
- n_k = 7
- res = {}
- res_xlsx = {'duration':[], 'frequency':[], 'coverage':[]}
- for subject in subjects:
- print(subject)
- subject_path = data_dir + "\\" + subject + "\\" + r'clean_data_matlab' + "\\" + r'data'
- res[subject] = {}
- # data = load_data(subject_path + "\\" + subject + "_microstate_labels.json")
- temp = [[],[],[]]
- for q in range(nb_trials):
- task = tasks[q]
- # res[task] = []
- data = loadmat(subject_path + "\\" + path_name[q] + "\\" + task + "_seq.mat")['EEG'].flatten()
- sequence = MicrostateParameter(data, n_k)
- duration = sequence.calculate_duration() #during each iteration, the size of duration is 7
- frequency = sequence.calculate_frequency(f) #during each iteration, the size of frequency is 7
- coverage = sequence.calculate_coverage() #during each iteration, the size of coverage is 7
- duration = [i*f_m for i in duration]
- coverage = [i*100 for i in coverage]
- res[subject][task] = {'duration': duration, 'frequency': frequency, 'coverage': coverage}
- temp[0].extend(duration)
- temp[1].extend(frequency)
- temp[2].extend(coverage)
- res_xlsx['duration'].append([i*f_m for i in temp[0]])
- res_xlsx['frequency'].append(temp[1])
- res_xlsx['coverage'].append([i*100 for i in temp[2]])
- print('tested with success')
- #################formulate task-wise and condition-wise paramters ###########################
- ### tested with success on microstate sequences###############
- read_path_json = r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\parameters\parameters_subjects.json'
- save_path_json = r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\parameters'
- n_k = 7
- paras = ['duration', 'frequency', 'coverage']
- for pa in paras:
- res = np.zeros((len(condition_name), n_k*len(subjects)))
- for index, condition in enumerate(condition_name):
- temp_res = []
- for task in tasks:
- temp = []
- for subject in subjects:
- if task.startswith(condition):
- data = load_data(read_path_json)[subject][task][pa]
- temp.extend(data)
- if temp:
- temp_res.append(temp)
- np_data = np.asarray(temp_res)
- res[index, :] = np_data.mean(axis=0)
- print('finish')
- #################formulate t-test data files##########################
- read_path_json = r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\parameters'
- save_path_json = pjoin(read_path_json, r't-test') # results saved under the folder "t-test"
- paras = ['duration', 'frequency', 'coverage']
- n_k = 7
- length = n_k * len(subjects)
- m_index = [] # to save the index for each eegmap: e.g. 'A' corresponds to [0, 7, 14, ...], 'B' corresponds to [1, 8, 15, ...]
- for m in range(n_k):
- temp = []
- for n in range(m, length, n_k):
- temp.append(n)
- m_index.append(temp)
- for pa in paras:
- # for condition in condition_name:
- # read_path = read_path_json + '\\' + condition + '_' + pa + '.mat'
- read_path = read_path_json + '\\' + 'condition' + '_' + pa + '.mat' # change betwwen 'condition' and condition (within the condition loop)
- data = loadmat(read_path)['EEG']
- for index, value in enumerate(m_index):
- # save_path = save_path_json + "\\" + condition + '_' + pa + '_m' + str(index) + '.mat'
- save_path = save_path_json + "\\" + 'condition' + '_' + pa + '_m' + str(index) + '.mat'
- savemat(save_path, {'EEG':data[:, value]})
- print('print')
- #################conduct t-test on the formulated data ##########################
- ### tested with success on microstate sequences ###########################
- read_path_json = r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\parameters\t-test'
- save_path_json = pjoin(read_path_json, r'results')
- n_k = 7
- paras = ['duration', 'frequency', 'coverage']
- alpha = 0.95
- sample_size = len(subjects)
- for pa in paras:
- for m in range(n_k):
- res = []
- res_t = []
- res_interval = [[],[]]
- save_path_t = save_path_json + "\\" + 'condition' + '_' + pa + "_mt" + str(m) + '.mat'
- save_path_c = save_path_json + "\\" + 'condition' + '_' + pa + "_mc" + str(m) + '.mat'
- read_path = read_path_json + "\\" + 'condition' + '_' + pa + '_m' + str(m) + '.mat'
- data = loadmat(read_path)['EEG']
- for comb in itertools.combinations([i for i in range(data.shape[0])], 2):
- p = stats.ttest_rel(data[comb[0], :], data[comb[1], :])
- res.append(p[1])
- res_t.append(p[0])
- diff = data[comb[0], :] - data[comb[1], :]
- diff_mean = np.mean(diff)
- diff_std = stats.sem(diff)
- ci = stats.t.interval(alpha, sample_size - 1, loc=diff_mean, scale=diff_std)
- res_interval[0].append(ci[0])
- res_interval[1].append(ci[1])
- savemat(save_path_t, {'EEG': np.asarray(res_t)})
- savemat(save_path_c, {'EEG': np.asarray(res_interval)})
- print('right')
- #################calculate temporal parameters and write to Excel spreadsheets ##########################
- for para in paras:
- f = 250
- f_m = 1000 / f
- n_microstate = 7
- res = {}
- res_xlsx = {'duration':[], 'frequency':[], 'coverage':[]}
- tasks = para['tasks']
- subject_path = para['subject_path']
- save_path_json = para['save_path_json']
- save_path_excel = para['save_path_excel']
- for subject in subjects:
- print(subject)
- res[subject] = {}
- # data = load_data(subject_path + "\\" + subject + "_microstate_labels.json")
- temp = [[],[],[]]
- for task in tasks:
- data = loadmat(subject_path + "\\" + subject + "\\" + task +"_seq.mat")['EEG'].flatten()
- sequence = MicrostateParameter(data, n_microstate)
- duration = sequence.calculate_duration()
- frequency = sequence.calculate_frequency(f)
- coverage = sequence.calculate_coverage()
- duration = [i*f_m for i in duration]
- coverage = [i*100 for i in coverage]
- res[subject][task] = {'duration': duration, 'frequency': frequency, 'coverage': coverage}
- json.dump(res, codecs.open(save_path_json, 'w', encoding='utf-8'), separators=(',', ':'), sort_keys=True)
- ########### Part I: entropy rate anlaysis #########################################
- for lag in range(4, 7):
- for subject in subjects:
- subject_path = data_dir + "\\" + subject
- print(subject)
- res = {}
- for q in range(nb_trials):
- task = tasks[q]
- trial_path = subject_path + "\\" + r'clean_data_matlab' + "\\" + r'data' + "\\" + path_name[q]
- # data_path= pjoin(trial_path, lis_name[q])
- # EEG_epochs= mne.io.read_epochs_eeglab(data_path)
- data_path_save = trial_path + "\\" + tasks[q] + '_seq.mat'
- data = loadmat(trial_path + "\\" + tasks[q] + "_seq.mat")['EEG'].flatten()
- lrd = MicrostateLongRangeDependence(data, n_k)
- entropy_rate, excess_entropy, index = lrd.excess_entropy_rate(lag)
- res[task] = {'entropy_rate':entropy_rate.tolist(), 'excess_entropy':excess_entropy.tolist(), 'index':index.tolist()}
- json.dump(res, codecs.open(data_dir + "\\microstates_sequences" + "\\entropy_rate" + "\\history_" + str(lag) + '\\'
- + subject + ".json", 'w', encoding='utf-8'), separators=(',', ':'), sort_keys=True)
- ################ testing condition's entropy_rate computation on the microstate sequences ##########################
- read_path_json = r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\entropy_rate\history_6' #using history_6 as mentioned in the paper
- save_path_json = pjoin(read_path_json, r'condition_acrosstrials')
- conditions_res = np.zeros((len(condition_name), len(subjects)))
- for index, condition in enumerate(condition_name):
- res = []
- for task in tasks:
- temp = []
- if task.startswith(condition):
- for subject in subjects:
- data = load_data(read_path_json + "\\" + subject + ".json")[task]['entropy_rate']
- temp.append(data)
- if temp:
- res.append(temp)
- np_res = np.asarray(res)
- conditions_res[index, :] = np_res.mean(axis=0)
- savemat(save_path_json+"\\" + condition + ".mat", {'EEG':np_res}) #the entropy rate for one specific condition
- savemat(save_path_json+"\\" + 'condition' + ".mat", {'EEG':conditions_res}) # the entropy rates for all conditions
- ##################### Part III: DFA (detrended fluctuation analysis) computation #######################################
- save_path_json = r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\DFA'
- n_k = 7
- n_partitions = 3 # one part has 3, the other part has 4
- for subject in subjects:
- print(subject)
- subject_path = data_dir + "\\" + subject
- data_path = subject_path + "\\" + r'clean_data_matlab' + "\\" + r'data'
- res = {}
- for q in range(nb_trials):
- task = tasks[q]
- data = loadmat(data_path + "\\" + path_name[q] + "\\" + task + "_seq.mat")['EEG'].flatten()
- segment_range = [2, int(np.log2(len(data)))]
- segment_density = 0.25
- res[task] = batch_calculate_dfa(data, n_k, n_partitions, segment_range, segment_density)
- json.dump(res, codecs.open(save_path_json+"\\"+subject+"_dfa.json", 'w', encoding='utf-8'), separators=(',', ':'), sort_keys=True)
- print('stop')
- ################# Section 2: generating Surrogate data ######################
- ### tested with success on microstate sequences #########
- # # markov probability computation, tested with success ####
- save_path_json = r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\markovianity.json'
- res = {}
- n_k = 7
- for subject in subjects:
- print(subject)
- subject_path = data_dir + "\\" + subject
- data = load_data(subject_path + "\\" + r'microstates' + "\\" + subject + "_microstate_labels.json")
- res[subject] = {}
- for task in tasks:
- res[subject][task] = {}
- markov = MicrostateMarkov(data[task], n_k)
- for k in range(1, 6):
- joint_p, condition_p = markov.transition_matrix_hat(k)
- res[subject][task][k] = {'joint_p': joint_p.tolist(), 'condition_p': condition_p.tolist()}
- json.dump(res, codecs.open(save_path_json, 'w', encoding='utf-8'), separators=(',', ':'), sort_keys=True, indent=4)
- print(999)
- #########################generating surrogate data, tested with success #################################################
- n_k = 7
- for subject in subjects:
- print(subject)
- res = {}
- subject_path = data_dir + "\\" + subject + "\\" + "microstates"
- data_path = subject_path + "\\" + subject + "_microstate_labels.json"
- data = load_data(data_path)
- save_path = subject_path + "\\" + subject + "_surrogate_microstate_labels.json" # the results are saved under each subject's folder
- matrix = load_data(r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\markovianity.json')
- for task in tasks:
- n_surrogate = len(data[task])
- res[task] = {}
- for k in range(1, 6):
- condition_p = matrix[subject][task][str(k)]['condition_p']
- joint_p = matrix[subject][task][str(k)]['joint_p']
- res[task][k] = MicrostateMarkov.surrogate_markov_chain(joint_p, condition_p, k, n_k, n_surrogate).tolist()
- json.dump(res, codecs.open(save_path, 'w', encoding='utf-8'), separators=(',', ':'), sort_keys=True, indent=4)
- print('check results')
- #################### DFA realated computation ##################################################
- ###### tested with success
- read_path_json = r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\DFA'
- save_path_json = pjoin(read_path_json, 'formalated_empirical')
- n_k = 7
- n_paritions = 3
- comb = combinations([i for i in range(n_k)], n_paritions)
- for index, item in enumerate(comb):
- res = np.zeros((len(condition_name), len(subjects)))
- for c_index, condition in enumerate(condition_name):
- temp = []
- for task in tasks:
- slope = []
- if task.startswith(condition):
- for subject in subjects:
- data = load_data(read_path_json + "\\" + subject+"_dfa.json")
- slope.append(data[task][index][to_string(item)]['slope'])
- if slope:
- temp.append(slope)
- np_temp = np.asarray(temp)
- res[c_index, :] = np_temp.mean(axis=0) # c_index: 0 (training), 1 (practiceA), 2 (practiceB)
- savemat(save_path_json+"\\"+condition+"_"+to_string(item)+".mat",{'EEG':np_temp}) # saved the results for one condition (one combination)
- savemat(save_path_json+"\\"+"condition"+"_"+to_string(item)+".mat",{'EEG':res}) ## saved the results for all conditions (one combination)
- print(999)
- ###################plot the across-subjects DFA results: Hurst Exponent on empirical data (group level)#################
- read_path_excel = para['read_path_excel']
- p_read_path_excel = para['p_read_path_excel']
- read_path_json = r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\DFA\formalated_empirical'
- save_path_fig = r'F:\FlightSim_experiment_2021\Concordia_preprocessed\microstates_sequences\DFA\figures'
- c_t = conditions_tasks(condition_name, tasks)
- n_k = 7
- n_condition = len(condition_name)
- n_paritions = 3
- row = 2
- column = 5
- all_comb = set([i for i in range(n_k)])
- comb = [item for item in combinations([i for i in range(n_k)], n_paritions)]
- label_str = ['_1_10.png', '_11_20.png', '_21_30.png', '_31_35.png']
- label_index = [0, 10, 20, 30]
- for index, index_item in enumerate(label_index):
- for condition in condition_name:
- fig, ax = plt.subplots(row, column, figsize=(20, 8), dpi=600)
- for i in range(row):
- for j in range(column):
- if i == 1 and index == len(label_index)-1:
- ax[i][j].axis('off')
- continue
- item = comb[i*column+j+index_item]
- data = loadmat(read_path_json+"\\"+'condition'+"_"+to_string(item)+".mat")['EEG']
- data = np.asarray(read_xlsx(read_path_excel, to_string(item)))
- p_value = np.asarray(read_xlsx(p_read_path_excel, to_string(item)))[:, 1]
- title = to_string(item) + " Vs. " + to_string(all_comb - set(item))
- if row == 1:
- plot_block(data.mean(axis=0), stats.sem(data, axis=0), condition_name, ax[j], title, 'H')
- # plot_block(data.mean(axis=0), stats.sem(data, axis=0), c_t[condition], ax[j], title, 'H')
- else:
- # plot_block(data.mean(axis=0), stats.sem(data, axis=0), c_t[condition], ax[i][j], title, 'H')
- plot_block(data.mean(axis=1), stats.sem(data, axis=1), condition_name, ax[i][j], title, 'H')
- plt.subplots_adjust(hspace=0.4, wspace=0.5)
- # plt.savefig(save_path_fig + "\\" + condition + "\\" + condition + "_1_10" + ".png", dpi=600)
- plt.savefig(save_path_fig + "\\" + 'condition' + label_str[index], dpi=600)
- # plt.show()
- plt.clf()
- plt.close()
- ss = 8
|