123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import os
- import glob
- import pandas as pd
- import nibabel as nib
- import numpy as np
- from tqdm import tqdm
- # Set up paths
- code_dir = os.path.dirname(os.path.abspath(__file__))
- parent_dir = os.path.dirname(code_dir)
- input_address = r"E:\CRC_data\SND\proc_data"
- mask_names_file = os.path.join(parent_dir, "input", "acronym_atlas_abriviation.csv")
- # Load mask names into a DataFrame
- df_maskName = pd.read_csv(mask_names_file)
- # Define session mapping
- session_mapping = {
- 0: 0, 1: 3, 2: 3, 3: 3, 4: 3, 5: 3, 6: 7, 7: 7, 8: 7, 9: 7,
- 10: 7, 11: 14, 12: 14, 13: 14, 14: 14, 15: 14, 16: 14, 17: 14,
- 18: 14, 19: 21, 20: 21, 21: 21, 22: 21, 23: 21, 24: 21, 25: 21,
- 26: 28, 27: 28, 28: 28, 29: 28, 30: 28, 42: 42, 43: 42, 56: 56, 57: 56
- }
- # Initialize list to store extracted information
- data = []
- # Iterate over files
- file_paths = glob.glob(os.path.join(input_address, "**", "dwi", "DSI_studio", "*_flipped.nii.gz"), recursive=True)
- for file_path in tqdm(file_paths, desc="Processing files"):
- print(file_path)
- # Extract information from the file path
- subject_id = file_path.split(os.sep)[-5]
- time_point = file_path.split(os.sep)[-4]
- int_time_point = 0 if (time_point == "ses-Baseline") else int(time_point.split("-P")[1])
- merged_time_point = session_mapping.get(int_time_point, 'Unknown')
- q_type = os.path.basename(file_path).split("_flipped")[0]
-
- try:
- search_stroke = os.path.join(os.path.dirname(file_path), "*StrokeMask_scaled.nii")
- stroke_path = glob.glob(search_stroke)[0]
- stroke_flag = True
- except IndexError:
- stroke_path = None
- stroke_flag = False
- temp_path = os.path.join(os.path.dirname(file_path))
- mask_path = os.path.join(temp_path, "*dwiDNSmoothMicoBetAnnoSplit_parental_scaled.nii")
- # Load DWI data
- dwi_img = nib.load(file_path)
- dwi_data = dwi_img.get_fdata()
- # Load mask data
- mask_files = glob.glob(mask_path)
- if not mask_files:
- continue
- mask_img = nib.load(mask_files[0])
- mask_data = mask_img.get_fdata()
- unique_masks = np.unique(mask_data)
- for rr in unique_masks:
- unique_region_mask = mask_data == rr
- if stroke_path:
- stroke_img = nib.load(stroke_path)
- stroke_data = stroke_img.get_fdata()
- unique_region_mask = unique_region_mask & ~(stroke_data > 1)
- ROI = dwi_data * unique_region_mask
- mean_roi_value = ROI[ROI > 0].mean() if np.any(ROI > 0) else np.nan
- # Check for the mask name
- if rr in df_maskName["RegionID"].values:
- mask_name_row = df_maskName[df_maskName["RegionID"] == rr]
- mask_name = "L_" + mask_name_row["RegionAbbreviation"].values[0]
- elif (rr - 2000) in df_maskName["RegionID"].values:
- mask_name_row = df_maskName[df_maskName["RegionID"] == (rr - 2000)]
- mask_name = "R_" + mask_name_row["RegionAbbreviation"].values[0]
- else:
- mask_name = "Unknown"
- # Append data to list
- data.append([
- file_path,
- subject_id,
- time_point,
- int_time_point,
- merged_time_point,
- q_type,
- rr,
- mask_name,
- mean_roi_value,
- "Stroke" if stroke_flag else "Sham"
- ])
- # Create DataFrame from the collected data
- columns = ["fullpath", "subjectID", "timePoint", "int_timepoint", "merged_timepoint", "Qtype", "mask_id", "mask_name", "Value", "Group"]
- df_results = pd.DataFrame(data, columns=columns)
- # Update Group column based on subject ID
- # Check if any entry for a subject is "Stroke", if so, set all entries for that subject to "Stroke"
- stroke_subjects = df_results[df_results["Group"] == "Stroke"]["subjectID"].unique()
- df_results.loc[df_results["subjectID"].isin(stroke_subjects), "Group"] = "Stroke"
- # Define the path for the output CSV file
- output_csv_path = os.path.join(parent_dir, "output", "Final_Quantitative_output_for_atalas_regions", "Quantitative_results_from_dwi_processing_atlas.csv")
- # Create the directory if it does not exist
- output_dir = os.path.dirname(output_csv_path)
- if not os.path.exists(output_dir):
- os.makedirs(output_dir)
- # Save the DataFrame to a CSV file
- df_results.to_csv(output_csv_path, index=False)
- print(f"Processing complete. Results saved to {output_csv_path}")
|