1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- import os
- import logging
- import argparse
- from code.analyses.baseline_response_properties import run_baseline_analysis
- from code.analyses.receptive_field_estimation import run_receptive_field_analysis
- from code.analyses.phase_analysis import phase_analysis
- from code.analyses.driven_response_properties import run_driven_response_analysis
- from code.analyses.homogeneous_populations import run_homogeneous_population_analysis
- from code.analyses.heterogeneous_populations import run_heterogeneous_population_analysis
- from code.util import DelayType, default_job_count
- logging.basicConfig(level=logging._nameToLevel["WARN"], force=True)
- def run_analyses(args):
- list_folder = "file_lists"
- data_folder = "raw_data"
- results_folder = "derived_data"
- stimulus_folder = "stimuli"
- base_results = run_baseline_analysis(list_folder, data_folder)
- rf_results = run_receptive_field_analysis(list_folder, data_folder, num_cores=args.jobs)
- base_results.to_csv(os.path.join(results_folder, "baseline_properties.csv"), sep=";")
- rf_results.to_csv(os.path.join(results_folder, "receptivefield_positions.csv"), sep=";")
- full_df = phase_analysis(results_folder)
- full_df.to_csv(os.path.join(results_folder, "Figure2_baseline_properties.csv"), sep=";")
- driven_results = run_driven_response_analysis(list_folder, data_folder, results_folder, num_cores=args.jobs)
- driven_results.to_csv(os.path.join(results_folder, "whitenoise_trials.csv"), sep=";")
- # NOTE: The following analyses will take a while... depending on your hardware this will be a few hours
- whitenoise_trial_info = os.path.join(results_folder, "whitenoise_trials.csv")
- homogeneous_results = run_homogeneous_population_analysis(whitenoise_trial_info, data_folder, stimulus_folder, num_cores=args.jobs)
- homogeneous_results.to_csv(os.path.join(results_folder, "homogeneous_populationcoding.csv"), sep=";")
- population_sizes = [2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30]
- delays = [-1.0, 0.0, 0.000125, 0.00025, 0.0005, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.007, 0.008, 0.009, 0.01, 0.0125, 0.015]
- kernels = [0.000125, 0.00025, 0.0005, 0.001, 0.002, 0.003, 0.005, 0.01, 0.015, 0.02, 0.025]
- delay_types = [DelayType.Gaussian]
- heterogeneous_results = None
- for dt in delay_types:
- results = run_heterogeneous_population_analysis(whitenoise_trial_info, data_folder,
- stimulus_folder, population_sizes,
- delays, kernels, num_cores=args.jobs,delay_type=dt)
- if heterogeneous_results is None:
- heterogeneous_results = results
- else:
- heterogeneous_results = heterogeneous_results.append(results, ignore_index=True)
- heterogeneous_results.to_csv(os.path.join(results_folder, "heterogeneous_populationcoding.csv"), sep=";")
- def create_parser():
- parser = argparse.ArgumentParser(description="Command line tool for the data analysis of the Hladnik & Grewe population coding project. \nThis may take a while... On a single core, this may easily exceed 10 hours...")
- default_num_cores = default_job_count()
- parser.add_argument("-j", "--jobs", type=int, default=default_num_cores, help=f"The number of parallel processes spawned for the analyses (defaults to {default_num_cores})")
- parser.set_defaults(func=run_analyses)
- return parser
- def main():
- parser = create_parser()
- args = parser.parse_args()
- args.func(args)
- if __name__ == "__main__":
- main()
|