@@ -0,0 +1,373 @@
+#!/usr/bin/env fslpython
+# -*- coding: utf-8 -*-
+ @author: Slava Karolis (slava.karolis@kcl.ac.uk)
+ This work is jointly copyrighted by University of Oxford and King's College London
+ Copyright 2024
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ See the License for the specific language governing permissions and
+ limitations under the License.
+import glob
+import os.path as op
+import numpy as np
+from ios.vol_io import get_vol
+import methods.mp as mp
+from methods.ic_regs import ic_regress
+# from scipy.io import loadmat
+from scipy.stats import zscore
+from methods.dctm import dctm
+from sklearn.decomposition import PCA
+def generate_path_to_input_folder(sub, ses, structure, generic_root):
+ # the data are expected to be organised as follows:
+ # generic_root
+ # |___ rawdata
+ # | |___ f'sub-{sub}/ses-{ses}/func
+ # |__ derivatives
+ # |__ dhcp_fmri_pipeline
+ # |__ f'sub-{sub}/ses-{ses}/func
+ assert (
+ structure == "derivatives" or structure == "rawdata" or structure is None
+ ), "structure should be rawdata or derivatives"
+ if structure == "derivatives":
+ structure = f"{structure}/dhcp_fmri_pipeline"
+ return f"{generic_root}/{structure}/sub-{sub}/ses-{ses}/func"
+def generate_path_to_dv_volume(sub, ses, generic_root):
+ input_folder = generate_path_to_input_folder(sub, ses, "rawdata", generic_root)
+ dv = glob.glob(
+ f"{input_folder}/sub-{sub}_ses-{ses}_run-*_task-rest_rec-mcdc_bold.nii.gz"
+ )
+ return dv[0]
+def generate_iv_file_name(sub, ses, name_ending, structure, generic_root=None):
+ # generates path to a regressor file
+ if structure == "rawdata": # then need to figure out run number from the data
+ # search for the mcdc file and extract run number from that
+ dv = generate_path_to_dv_volume(sub, ses, generic_root)
+ run = op.basename(dv)
+ run = run.replace(f"sub-{sub}_ses-{ses}_run-", "")
+ run = run.replace("_task-rest_rec-mcdc_bold.nii.gz", "")
+ input_file = f"sub-{sub}_ses-{ses}_run-{run}_{name_ending}"
+ else:
+ input_file = f"sub-{sub}_ses-{ses}_{name_ending}"
+ return input_file
+def generate_path_to_mask(sub, ses, root_folder):
+ # HARDWIRED, change as appropriate.
+ # Assumes that there is a mask_dil.nii.gz file, which is a dilated version of the mask.nii.gz
+ mask_path = f"{root_folder}/derivatives/dhcp_fmri_pipeline/sub-{sub}/ses-{ses}/func/mask_dil.nii.gz"
+ assert op.exists(
+ mask_path
+ ), f"Mask file {mask_path} does not exist (not included in the release). Create it using fslmaths {root_folder}/derivatives/dhcp_fmri_pipeline/sub-{sub}/ses-{ses}/func/sub-{sub}_ses-{ses}_task-rest_desc-brain_mask.nii.gz -dilM {root_folder}/derivatives/dhcp_fmri_pipeline/sub-{sub}/ses-{ses}/func/mask_dil.nii.gz"
+ return mask_path
+def load_nothing(non_arg):
+ return []
+def loc_load_cs_text(fname):
+ text_file = open(fname)
+ lines = text_file.readlines()
+ if len(lines) == 0:
+ data = None
+ else:
+ bad_vols = lines[0].split(",")
+ data = [int(i) for i in bad_vols]
+ return data
+def loc_get_vol(path, vol_name=None, applymask=False):
+ if isinstance(vol_name, list):
+ for i, name in enumerate(vol_name):
+ func_, dims, mask = get_vol(path, name, applymask)
+ if i == 0:
+ shp = list(func_.shape)
+ shp.append(len(vol_name))
+ shp = tuple(shp)
+ func = np.zeros(shp)
+ func[:, :, i] = func_
+ else:
+ func, dims, mask = get_vol(path, vol_name, applymask)
+ return func
+## IV transform methods
+def tm_bptf(iv, options, censored_vols=None):
+ # iv is a place holder
+ iv = dctm(350, 2.2, options[0], options[1])
+ return iv
+def tm_vol_censor(iv, options, censored_vols=None):
+ if options is None:
+ options = 350 # options is number of volumes here
+ if iv is None:
+ iv = np.zeros((options, 0))
+ else:
+ iv_ = np.zeros((options, len(iv)))
+ for i, d in enumerate(iv):
+ iv_[d, i] = 1
+ iv = iv_
+ return iv
+def tm_zscoring(iv, options, censored_vols=None):
+ if options is None:
+ options = iv.shape[1]
+ options = min(options, iv.shape[1])
+ iv = iv[:, np.arange(0, options)]
+ iv = zscore(iv, axis=0)
+ return iv
+def tm_mp_map(iv, options):
+ iv = mp.mp_expansion(iv, detrend=False, aroma=False)
+ options = min(options, iv.shape[1])
+ iv = iv[:, np.arange(0, options)]
+ iv = zscore(iv, axis=0)
+ return iv
+def do_pca(iv, thres=0.99):
+ pca = PCA(n_components=iv.shape[1] - 1, svd_solver="full")
+ pca.fit(iv)
+ a = np.cumsum(pca.explained_variance_ratio_)
+ if isinstance(thres, float) and thres < 1:
+ ind = [i for i, k in enumerate(a) if k > thres]
+ if len(ind) == 0:
+ Ncomps = iv.shape[1] - 1
+ else:
+ Ncomps = ind[0]
+ elif isinstance(thres, int):
+ Ncomps = thres
+ else:
+ raise "Thres needs to be either int of number of comps or < 1 (percentile)"
+ pc = np.transpose(pca.components_)
+ iv_demean = iv - pca.mean_
+ iv = np.dot(iv_demean, pc[:, 0:Ncomps])
+ return iv, pca
+def tm_mp_pca(iv, options, censored_vols=None):
+ Ncols = int(iv.shape[1] / 2)
+ iv1 = tm_mp_map(iv[:, 0:Ncols], options[0])
+ iv2 = tm_mp_map(iv[:, Ncols:], options[1])
+ iv = np.hstack((iv1, iv2))
+ thres = options[2]
+ if censored_vols is not None:
+ good_vols = np.arange(0, 350)
+ good_vols = np.setdiff1d(good_vols, censored_vols)
+ iv_ = np.delete(iv, censored_vols, axis=0)
+ iv_ = do_pca(iv_, thres)[0]
+ iv = np.zeros((iv.shape[0], iv_.shape[1]))
+ iv[good_vols, :] = iv_
+ else:
+ iv = do_pca(iv, thres)[0]
+ return iv
+def transform_function_mapping(function_name):
+ transform_func = {
+ "vol_censor": tm_vol_censor,
+ "zscoring": tm_zscoring,
+ "mp_pca": tm_mp_pca,
+ "bptf": tm_bptf,
+ }
+ return transform_func[function_name]
+class IV:
+ def __init__(self, sub, ses, root_folder, config):
+ self.root_folder = root_folder
+ self.mask_path = generate_path_to_mask(sub, ses, root_folder)
+ self.sub = sub
+ self.ses = ses
+ self.regr_set = [k for k in config.keys()]
+ self.options = [config[k]["options"] for k in config.keys()]
+ self.name = [
+ config[k]["name"] for k in config.keys()
+ ] # list of lists for file names to load the data
+ self.structure = [
+ config[k]["structure"][0] for k in config.keys()
+ ] # rawdata or derivatives
+ self.ismap = [config[k]["ismap"][0] for k in config.keys()] # voxel-wise or not
+ for i, k in enumerate(self.options):
+ if len(k) == 1:
+ self.options[i] = k[0]
+ self.transform_func = []
+ for k in self.regr_set:
+ func_name = config[k]["transform_method"][0]
+ self.transform_func.append(transform_function_mapping(func_name))
+ self.folder_name, self.basename = self.obtain_basename()
+ self.load_func, self.load_args = self.load_method()
+ def obtain_basename(self):
+ folder_name = []
+ basename = []
+ for name, structure in zip(self.name, self.structure):
+ folder_name.append(
+ generate_path_to_input_folder(
+ self.sub, self.ses, structure, self.root_folder
+ )
+ )
+ basename.append(
+ [
+ generate_iv_file_name(
+ self.sub, self.ses, k, structure, self.root_folder
+ )
+ for k in name
+ ]
+ )
+ return folder_name, basename
+ def load_method(self): # defines methods to load
+ load_func = []
+ load_args = []
+ for foldname, bname, ismap, rs in zip(
+ self.folder_name, self.basename, self.ismap, self.regr_set
+ ):
+ if ismap:
+ func = loc_get_vol
+ args = {
+ "path": foldname,
+ "vol_name": bname,
+ "applymask": self.mask_path,
+ }
+ else:
+ assert len(bname) == 1, "multiple or empty inputs are not allowed"
+ if rs == "vc":
+ func = loc_load_cs_text
+ args = {"fname": f"{foldname}/{bname[0]}"}
+ elif rs == "ica":
+ func = np.loadtxt
+ args = {"fname": f"{foldname}/{bname[0]}"}
+ elif rs == "bp": # nothing to load, placeholder
+ func = load_nothing
+ args = {"non_arg": []}
+ else:
+ raise NameError("no regressor is specified with this name")
+ load_func.append(func)
+ load_args.append(args)
+ return load_func, load_args
+ def loader(self): # loads IV data
+ data = []
+ for i, func in enumerate(self.load_func):
+ print(self.load_args[i])
+ data_ = func(**self.load_args[i])
+ data.append(data_)
+ self.data = data
+ ind = [j for j, rs in enumerate(self.regr_set) if rs == "vc"]
+ if len(ind) == 1:
+ self.censored_vols = self.data[ind[0]]
+ else:
+ self.censored_vols = None
+ def transformer(self): # ,nvox=None
+ self.transformed = [None] * len(self.data)
+ # populate regressors which are not voxel-specific, i.e., no need to compute them each time
+ ind = [i for i, x in enumerate(self.ismap) if x == False]
+ for i in ind:
+ self.transformed[i] = self.transform_func[i](
+ self.data[i], options=self.options[i]
+ )
+ if any(
+ self.ismap
+ ): # if there are voxel specific regressors, we will will be creating regressor set dynamically
+ ind = [i for i, j in enumerate(self.ismap) if j]
+ nvox = self.data[ind[0]].shape[1]
+ self.iv = map(lambda vox: self.transformer_pervox(vox), range(nvox))
+ else: # if all of them are not voxel-specific, we can freeze the regressor set
+ self.iv = np.hstack(self.transformed)
+ def transformer_pervox(self, vox):
+ ind = [i for i, x in enumerate(self.ismap) if x]
+ for i in ind:
+ self.transformed[i] = self.transform_func[i](
+ self.data[i][:, vox, :], self.options[i], self.censored_vols
+ )
+ return np.hstack(self.transformed)
+class Regress:
+ def __init__(self):
+ self.regress_func = ic_regress
+ self.args = {"m": None}
+class DV:
+ def __init__(self, sub, ses, generic_root):
+ self.path = generate_path_to_dv_volume(sub, ses, generic_root)
+ self.mask_path = generate_path_to_mask(sub, ses, generic_root)
+ def loader(self):
+ self.data, self.dims, _ = get_vol(self.path, None, self.mask_path)
+ # self.data=[self.data]
+ def run_regress_sequentially(self, Regress, IV):
+ resid = np.zeros(self.data.shape)
+ yhat = np.zeros(self.data.shape)
+ for i, iv in enumerate(IV.iv):
+ out = Regress.regress_func(iv, self.data[:, i], **Regress.args)
+ resid[:, i] = out[0].reshape(-1)
+ yhat[:, i] = out[1].reshape(-1)
+ return resid, yhat
+ def activate_dnx(self, IV, Regress):
+ self.loader()
+ IV.loader()
+ IV.transformer()
+ print("start denoise")
+ if any(IV.ismap):
+ resid, yhat = self.run_regress_sequentially(Regress, IV)
+ else:
+ out = Regress.regress_func(IV.iv, self.data, **Regress.args)
+ resid = out[0]
+ yhat = out[1]
+ self.yhat = yhat
+ self.resid = resid