run_analyses.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. import os
  2. import logging
  3. import argparse
  4. from code.analyses.baseline_response_properties import run_baseline_analysis
  5. from code.analyses.receptive_field_estimation import run_receptive_field_analysis
  6. from code.analyses.phase_analysis import phase_analysis
  7. from code.analyses.driven_response_properties import run_driven_response_analysis
  8. from code.analyses.homogeneous_populations import run_homogeneous_population_analysis
  9. from code.analyses.heterogeneous_populations import run_heterogeneous_population_analysis
  10. from code.util import DelayType, default_job_count
  11. logging.basicConfig(level=logging._nameToLevel["WARN"], force=True)
  12. def run_analyses(args):
  13. list_folder = "file_lists"
  14. data_folder = "raw_data"
  15. results_folder = "derived_data"
  16. stimulus_folder = "stimuli"
  17. base_results = run_baseline_analysis(list_folder, data_folder)
  18. rf_results = run_receptive_field_analysis(list_folder, data_folder, num_cores=args.jobs)
  19. base_results.to_csv(os.path.join(results_folder, "baseline_properties.csv"), sep=";")
  20. rf_results.to_csv(os.path.join(results_folder, "receptivefield_positions.csv"), sep=";")
  21. full_df = phase_analysis(results_folder)
  22. full_df.to_csv(os.path.join(results_folder, "Figure2_baseline_properties.csv"), sep=";")
  23. driven_results = run_driven_response_analysis(list_folder, data_folder, results_folder, num_cores=args.jobs)
  24. driven_results.to_csv(os.path.join(results_folder, "whitenoise_trials.csv"), sep=";")
  25. # NOTE: The following analyses will take a while... depending on your hardware this will be a few hours
  26. whitenoise_trial_info = os.path.join(results_folder, "whitenoise_trials.csv")
  27. homogeneous_results = run_homogeneous_population_analysis(whitenoise_trial_info, data_folder, stimulus_folder, num_cores=args.jobs)
  28. homogeneous_results.to_csv(os.path.join(results_folder, "homogeneous_populationcoding.csv"), sep=";")
  29. population_sizes = [2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30]
  30. delays = [-1.0, 0.0, 0.000125, 0.00025, 0.0005, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.007, 0.008, 0.009, 0.01, 0.0125, 0.015]
  31. kernels = [0.000125, 0.00025, 0.0005, 0.001, 0.002, 0.003, 0.005, 0.01, 0.015, 0.02, 0.025]
  32. delay_types = [DelayType.Gaussian]
  33. heterogeneous_results = None
  34. for dt in delay_types:
  35. results = run_heterogeneous_population_analysis(whitenoise_trial_info, data_folder,
  36. stimulus_folder, population_sizes,
  37. delays, kernels, num_cores=args.jobs,delay_type=dt)
  38. if heterogeneous_results is None:
  39. heterogeneous_results = results
  40. else:
  41. heterogeneous_results = heterogeneous_results.append(results, ignore_index=True)
  42. heterogeneous_results.to_csv(os.path.join(results_folder, "heterogeneous_populationcoding.csv"), sep=";")
  43. def create_parser():
  44. parser = argparse.ArgumentParser(description="Command line tool for the data analysis of the Hladnik & Grewe population coding project. \nThis may take a while... On a single core, this may easily exceed 10 hours...")
  45. default_num_cores = default_job_count()
  46. parser.add_argument("-j", "--jobs", type=int, default=default_num_cores, help=f"The number of parallel processes spawned for the analyses (defaults to {default_num_cores})")
  47. parser.set_defaults(func=run_analyses)
  48. return parser
  49. def main():
  50. parser = create_parser()
  51. args = parser.parse_args()
  52. args.func(args)
  53. if __name__ == "__main__":
  54. main()