123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- #!/usr/bin/env python3
- '''
- created on Tue Feb 18 2020
- author: Christian Olaf Haeusler
- '''
- from collections import defaultdict
- from glob import glob
- import argparse
- import csv
- import os.path
- import random
- # parameters #
- STIM_LENGTH = 0.2 # in seconds
- random.seed(1984) # for jittering the no cut events
- def parse_arguments():
- '''
- '''
- parser = argparse.ArgumentParser(
- description='''converts annotated events to
- event files to be used in FSL''')
- parser.add_argument('-ind',
- default='events/segments/avmovie',
- help='''directory that contains the segmented
- annotation; e.g. 'events/segments/avmovie' ''')
- parser.add_argument('-inp',
- default='locations_run-?_events.tsv',
- help='''input pattern of the segmented
- annotation files ''')
- parser.add_argument('-outd',
- default='events/onsets',
- help='''output directory; e.g. 'events/onsets' ''')
- args = parser.parse_args()
- inDir = args.ind
- inPat = args.inp
- outDir = args.outd
- return inDir, inPat, outDir
- def get_anno_segments(directory, fname_pattern):
- '''
- '''
- path_pattern = os.path.join(directory, fname_pattern)
- anno_pathes = glob(path_pattern)
- return sorted(anno_pathes)
- def get_run_number(path):
- '''
- '''
- fname = os.path.basename(path)
- run = fname.split('run-')[1].split('_events')[0]
- return run
- def read_anno_segment(path):
- '''
- '''
- with open(path, 'r') as csvfile:
- all_rows = csv.reader(csvfile, delimiter='\t')
- # skip the header
- next(all_rows, None)
- # put content of files into a list
- anno = []
- for row in all_rows:
- # convert onset from str to float
- row[0] = float(row[0])
- # convert duration to offset
- row[1] = round(row[0] + float(row[1]), 2)
- anno.append(row)
- return anno
- def compute_spatial_switches(row,
- prev_major_l,
- prev_setting,
- prev_locale):
- '''
- '''
- start_t = row[0]
- major_l = row[2]
- setting = row[3]
- locale = row[4]
- # GET PATTERN OF LOCATION SWITCHES
- # checking which switches are present
- spatial_switches = [None, None, None]
- # major location switch present in current line?
- if major_l != prev_major_l:
- spatial_switches[0] = True
- else:
- spatial_switches[0] = False
- # setting switch present in current line?
- if setting != prev_setting:
- spatial_switches[1] = True
- # register that the setting in the previous line has been "learned"
- if not prev_setting in learned_settings:
- learned_settings.append(prev_setting)
- else:
- spatial_switches[1] = False
- # locale switch present in current line?
- if locale != prev_locale:
- spatial_switches[2] = True
- # register that the locale in the previous line has been "learned"
- # create a dictionary key with the setting's name first
- learned_locales[prev_setting]
- if not prev_locale in learned_locales[prev_setting]:
- learned_locales[prev_setting].append(prev_locale)
- else:
- spatial_switches[2] = False
- return spatial_switches
- def apply_condition_rules(row, switches, learned_settings, learned_locales):
- '''
- '''
- start_t = row[0]
- major_l = row[2]
- setting = row[3]
- locale = row[4]
- # apply rules to compute conditions
- # rule for a setting change (i.e. a cut across scenes)
- if switches == [True, True, True] or switches == [False, True, True]:
- if setting in learned_settings:
- cond = 'vse_old'
- else:
- cond = 'vse_new'
- # rule for locale_change
- # (the locale (mostly a room) changes within a scene/setting)
- elif spatial_switches == [False, False, True]:
- cond = 'vlo_ch'
- # rule for perspective_changes
- elif spatial_switches == [False, False, False]:
- if locale in learned_locales[setting]:
- cond = 'vpe_old'
- else:
- cond = 'vpe_new'
- else:
- raise RuntimeError('cant recognize condition')
- return(start_t, cond)
- def create_nocut(row):
- '''
- '''
- # the first to functions are used to align the events to the nearest
- # movie frame
- def msec_to_time_stamp(milliseconds):
- '''
- Input:
- a time point in milliseconds (int)
- Output:
- a time stamp (str) in format HH:MM:SS:Frame
- '''
- hours = (milliseconds / (60*60*1000))
- minutes = (milliseconds % (60*60*1000) / (60*1000))
- seconds = (milliseconds % (60*60*1000) % (60*1000) / 1000)
- frame = (milliseconds % (60*60*1000) % (60*1000) % (1000) // 40)
- time_stamp = '%02d:%02d:%02d:%02d' % (hours, minutes, seconds, frame)
- return time_stamp
- def time_stamp_to_msec(t_stamp):
- '''
- Input:
- time stamp (str) in format HH:MM:SS:Frame
- Output:
- time point in milliseconds (int)
- '''
- splitted_stamp = t_stamp.split(':')
- milliseconds = (int(splitted_stamp[0]) * 60 * 60 * 1000) +\
- (int(splitted_stamp[1]) * 60 * 1000) +\
- (int(splitted_stamp[2]) * 1000) +\
- (int(splitted_stamp[3]) * 40)
- return milliseconds
- nocut_time = 10
- min_shot_length = 2 * nocut_time
- shot_length = row[1] - row[0]
- if shot_length > min_shot_length:
- # needs floor division
- # code should work with python 2 and python 3
- integer_quotient = int(shot_length / 10)
- event_distance = shot_length / float(integer_quotient)
- nocut_events = []
- for multiplier in range(1, integer_quotient):
- time_point = multiplier * event_distance
- nocut_event = row[0] + time_point
- # add Gaussian Jitter around the event of 1.25 sec)
- nocut_event = nocut_event + (random.gauss(0, 1250) / 1000)
- # align to next movie frame
- nocut_event = msec_to_time_stamp(nocut_event * 1000)
- nocut_event = time_stamp_to_msec(nocut_event) / 1000.0
- nocut_events.append(nocut_event)
- return(nocut_events)
- def count_events_conds(events_dict):
- '''
- '''
- all_segments_dict = defaultdict(int)
- # print events per condition per run
- for run in sorted(events_dict.keys()):
- print('\nrun %s:' % run)
- for loc in sorted(events_dict[run].keys()):
- count = len(events_dict[run][loc])
- if count > 0:
- print('%s\t%s' % (loc, count))
- else:
- print('%s\t%s\t###' % (loc, count))
- # add the event count of the current run to the dict for the
- # whole stimulus
- all_segments_dict[loc] += count
- print('\n\nwhole stimulus:')
- loc_descr_count = [[count, loc] for loc, count in all_segments_dict.items()]
- loc_descr_count.sort(key=lambda x: int(x[0]), reverse=True)
- for count, loc in loc_descr_count:
- print('%s\t%s' % (loc, count))
- return None
- def write_event_files(conds_dict, out_dir):
- '''
- '''
- print('\nWriting onset files')
- # for cond in sorted(t_per_cond.keys()):
- # print(cond, len(t_per_cond[cond]))
- for run in conds_dict.keys():
- for cond in conds_dict[run].keys():
- # print('writing onsets for', run, cond)
- out_fname = os.path.join(out_dir,
- 'run-%i' % run,
- cond + '.txt')
- path = os.path.dirname(out_fname)
- if not os.path.exists(path):
- os.makedirs(path)
- # write lines in FSL's EV3 format
- lines = ['%.3f\t%.1f\t1\n' % (timing, STIM_LENGTH) for timing in conds_dict[run][cond]]
- outfile = open(out_fname, 'w')
- outfile.writelines(lines)
- outfile.close()
- # main program #
- if __name__ == "__main__":
- inDir, inPat, outDir = parse_arguments()
- # build the name of the output directory from the input directory
- # handles if input has timing of audio-description or audio-visual movie
- outDir = os.path.join(outDir, os.path.basename(inDir))
- # search for files that contain the desired annotation
- anno_segments = get_anno_segments(inDir, inPat)
- # initialize the dicts for the tags drawn from their columns
- # for run (key) -> condition (key) -> timings (list)
- locations_conds = {seg:defaultdict(list) for seg in range(1,9)}
- prev_major_l = None
- prev_setting = None
- prev_locale = None
- learned_settings = []
- learned_locales = defaultdict(list)
- # looper over the segmented annotation
- for segment in anno_segments:
- run = int(get_run_number(segment))
- anno = read_anno_segment(segment)
- for row in anno:
- # check which spatial switches happen from
- # former to current row (i.e. frame)
- spatial_switches = compute_spatial_switches(row,
- prev_major_l,
- prev_setting,
- prev_locale)
- start_t, cond = apply_condition_rules(row,
- spatial_switches,
- learned_settings,
- learned_locales)
- locations_conds[run][cond].append(start_t)
- # prepare for next row
- prev_major_l = row[2]
- prev_setting = row[3]
- prev_locale = row[4]
- # create the nocuts from shots longer than specified intervall
- nocut_events = create_nocut(row)
- if nocut_events:
- locations_conds[run]['vno_cut'].extend(nocut_events)
- # COUNTING final conditions
- print('\n\nCONDITIONS count:')
- count_events_conds(locations_conds)
- # write the files
- write_event_files(locations_conds, outDir)
|