123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- import sys
- import numpy as np
- sys.path.append(str(Path.cwd().parents[0] / 'project_utils'))
- from project_paths import project_paths
- from types import SimpleNamespace
- configfile: 'config.yml'
- config = SimpleNamespace(**config)
- profile_definition_file = project_paths.working_dir \
- / 'project_utils' / 'profiles.txt'
- with open(profile_definition_file) as file:
- PROFILES = [line.strip() for line in file if len(line.strip())]
- def select_profile(profile, excludes=[]):
- if not type(excludes) == list:
- excludes = [excludes]
- return np.array([str(exc) not in profile for exc in excludes]).all()
- def get_profiles(variant='', excludes=[]):
- profiles = [p for p in PROFILES if select_profile(p, excludes)]
- if variant:
- return [profile+variant for profile in profiles if 'LENS' in profile]
- else:
- return profiles
- wildcard_constraints:
- profile = '[\w]+',
- measure_type = '[a-z\-]+',
- event_name = '[a-z]+',
- variant = '\|(macrodim[0-9]+|minimatrigger)|\d?'
- def pipeline_output(w):
- stage_type = f'{"channel_" if w.measure_type=="channel-wise" else ""}wave'
- stage = f'stage05_{stage_type}_characterization'
- stage_output = f'{w.event_name}_{w.measure_type}_measures.csv'
- path = lambda profile: project_paths.pipeline_output \
- / profile / stage / stage_output
- return [path(profile) for profile in get_profiles(variant=w.variant,
- excludes=config.EXCLUDES)]
- def all_input(w):
- dfs = expand(project_paths.dataframes
- / '{event_name}_{measure_type}{variant}_measures.csv',
- measure_type = config.MEASURE_TYPE,
- event_name = 'wavefronts',
- variant = config.VARIANT)
- if 'wavemodes' in config.EVENT_NAME:
- dfs += expand(project_paths.dataframes \
- / 'wavemodes_{measure_type}_avg_measures.csv',
- measure_type = config.MEASURE_TYPE)
- if len(config.VARIANT) \
- and any('macrodim' in variant for variant in config.VARIANT):
- dfs += expand(project_paths.dataframes \
- / 'wavefronts_{measure_type}_trend_measures.csv',
- measure_type = config.MEASURE_TYPE)
- return dfs
- rule all:
- input:
- all_input
- rule aggregate_pipeline_output:
- input:
- script = 'scripts/aggregate_pipeline_output.py',
- data = pipeline_output
- params:
- excludes = config.EXCLUDES
- output:
- dataframe = project_paths.dataframes \
- / '{event_name}_{measure_type}{variant}_measures.csv'
- shell:
- """
- python {input.script} --data {input.data:q} \
- --output {output.dataframe:q} \
- --excludes {params.excludes:q}
- """
- rule average_wavemode_measures:
- input:
- script = 'scripts/average_wavemode_measures.py',
- dataframe = '{dir}/wavefronts_{measure_type}{variant}_measures.csv'
- output:
- dataframe = '{dir}/wavemodes_{measure_type}{variant}_avg_measures.csv'
- shell:
- """
- python {input.script} --dataframe {input.dataframe:q} \
- --output {output.dataframe:q}
- """
- rule macrodim_trends:
- input:
- script = 'scripts/trend_wavefront_measures.py',
- dataframes = expand(str(project_paths.dataframes \
- / 'wavefronts_{{measure_type}}{variant}_measures.csv'),
- variant = [v for v in config.VARIANT if 'minima' not in v])
- params:
- groupby = ['anesthetic', 'technique', 'macro_pixel_dim']
- output:
- dataframe = project_paths.dataframes \
- / 'wavefronts_{measure_type}_trend_measures.csv'
- shell:
- """
- python {input.script} --data {input.dataframes:q} \
- --groupby {params.groupby:q} \
- --output {output.dataframe:q}
- """
|