123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- #!/usr/bin/env python3
- '''
- To Do:
- '''
- from collections import defaultdict
- from glob import glob
- import argparse
- import csv
- import os.path
- # a whiteliste just to check for errors in the annotation
- DESCRNOUNS = [
- 'body', 'bodypart',
- 'face', 'head',
- 'female', 'females', 'fname',
- 'male', 'males', 'mname',
- 'persons',
- 'setting_new', 'setting_rec',
- 'geo', 'geo-room',
- 'object', 'objects', 'furniture',
- 'time', '-', '+', '++'
- ]
- # dictionary with the used events and the mapping to the condition they
- # belong to; not used: 'time', '-', '+', '++'
- DESCRNOUNS = {
- 'body': 'body',
- 'bodypart': 'bpart',
- 'face': 'fahead',
- 'head': 'fahead',
- 'fname': 'sex_f',
- 'female': 'sex_f',
- 'females': 'sex_f',
- 'mname': 'sex_m',
- 'male': 'sex_m',
- 'males': 'sex_m',
- 'person': 'sex_u',
- 'persons': 'sex_u',
- 'setting_new': 'se_new',
- 'setting_rec': 'se_old',
- 'geo': 'geo',
- 'geo-room': 'groom',
- 'object': 'obj',
- 'objects': 'obj',
- 'furniture': 'furn'
- }
- def parse_arguments():
- '''
- '''
- parser = argparse.ArgumentParser(
- description='''converts annotated events to
- event files to be used in FSL''')
- parser.add_argument('-ind',
- default='events/segments/aomovie',
- help='''directory that contains the segmented
- annotation; e.g. 'events/segments/aomovie' ''')
- parser.add_argument('-inp',
- default='fg_rscut_ad_ger_speech_tagged_run-*.tsv',
- help='''input pattern of the segmented
- annotation files ''')
- parser.add_argument('-outd',
- default='events/onsets',
- help='''output directory; e.g. 'events/onsets' ''')
- args = parser.parse_args()
- inDir = args.ind
- inPat = args.inp
- outDir = args.outd
- return inDir, inPat, outDir
- def get_anno_segments(directory, fname_pattern):
- '''
- '''
- path_pattern = os.path.join(directory, fname_pattern)
- anno_pathes = glob(path_pattern)
- return sorted(anno_pathes)
- def get_run_number(path):
- '''
- '''
- fname = os.path.basename(path)
- run = fname.split('run-')[1].split('_events')[0]
- return run
- def read_n_clean(path):
- '''
- '''
- with open(path, 'r') as csvfile:
- all_rows = csv.reader(csvfile, delimiter='\t')
- # skip the headers
- next(all_rows, None)
- # put files' content into a list
- anno = []
- for row in all_rows:
- # ignore lines with whole sentences,
- # and onsets with an values smaller than 0
- # (which is, at the beginnign of the last run, an result of
- # timing corrections made by researchcut2segments.py
- # in data speech annotation data paper)
- if float(row[0]) < 0:
- continue
- # convert onset from str to float
- row[0] = float(row[0])
- # convert duration to offset
- row[1] = round(float(row[1]), 3)
- if ';' in row[9]:
- row[9] = row[9].split(';')[0]
- # choose columns for onset, duration, text, tag, and noun
- # (dropping person, pos, dep, lemma, stop, word vector)
- anno.append([row[0], row[1], row[3], row[4], row[5], row[9]])
- return anno
- def extract_descr_nouns(row, run, events_dict):
- '''
- '''
- timing = row[0:2]
- descrEntry = row[5]
- # take the first category if column contains multiple (seperator=';')
- if ';' in descrEntry:
- category = descrEntry.split(';')[0]
- else:
- category = descrEntry
- if category not in DESCRNOUNS:
- print(category, 'is a unknown localizer')
- else:
- # populate the dict
- events_dict[run][category].append(timing)
- return events_dict
- def convert_noun2cond(events_dict):
- '''
- '''
- conds_dict = defaultdict(int)
- # loop over the events dict
- for run in sorted(events_dict.keys()):
- # make keys from run number with an empty default dict as value
- conds_dict[run] = defaultdict(list)
- for category in sorted(events_dict[run].keys()):
- timings = events_dict[run][category]
- # abbreviations of the categories to be used for the filenames
- # pool the events to conditions by using the mapping in the
- # dictionary DESCRNOUNS
- condition = DESCRNOUNS[category]
- conds_dict[run][condition].extend(timings)
- # sort the pooled lists such that event timings are in temporal order
- for run in conds_dict.keys():
- for cond in conds_dict[run].keys():
- conds_dict[run][cond] = sorted(conds_dict[run][cond])
- return conds_dict
- def count_events_conds(events_dict):
- '''
- '''
- all_segments_dict = defaultdict(int)
- # print events per condition per run
- for run in sorted(events_dict.keys()):
- print('\nrun %s:' % run)
- for cond in sorted(events_dict[run].keys()):
- count = len(events_dict[run][cond])
- if count > 5:
- print('%s\t%s' % (cond, count))
- else:
- print('%s\t%s\t###' % (cond, count))
- # add the event count of the current run to the dict for the
- # whole stimulus
- all_segments_dict[cond] += count
- print('\n\nwhole stimulus:')
- cond_count = [[count, cond] for cond, count in all_segments_dict.items()]
- cond_count.sort(key=lambda x: int(x[0]), reverse=True)
- for count, cond in cond_count:
- print('%s\t%s' % (cond, count))
- return None
- def write_event_files(conds_dict, out_dir):
- '''
- '''
- print('\nWriting onset files')
- for run in conds_dict.keys():
- print('run', run)
- for cond in conds_dict[run].keys():
- out_fname = os.path.join(out_dir,
- 'run-%i' % run,
- cond + '.txt')
- path = os.path.dirname(out_fname)
- if not os.path.exists(path):
- os.makedirs(path)
- # write lines in FSL's EV3 format
- lines = ['%.3f\t%.3f\t1\n' % (timing[0], timing[1]) for timing in conds_dict[run][cond]]
- outfile = open(out_fname, 'w')
- outfile.writelines(lines)
- outfile.close()
- return None
- if __name__ == "__main__":
- inDir, inPat, outDir = parse_arguments()
- # build the name of the output directory from the input directory
- # handles if input has timing of audio-description or audio-visual movie
- outDir = os.path.join(outDir, os.path.basename(inDir))
- # search for files that contain the desired annotation
- annoSegments = get_anno_segments(inDir, inPat)
- # initialize the dicts for the tags drawn from their columns
- descrEvents = {}
- # looper over the segmented annotation
- for segment in annoSegments:
- # read annotation and convert duration to offset on the fly
- lastWordEnd = 0.0
- anno = read_n_clean(segment)
- run = int(get_run_number(segment))
- # in the dict for every run, make a new dict with keys=conditions
- descrEvents[run] = {category:[] for category in DESCRNOUNS}
- # EXTRACTING
- # loop over the rows in the current segment's annotation
- for row in anno:
- # extract events for 'descriptive nouns'
- if row[5] in DESCRNOUNS.keys():
- descrEvents = extract_descr_nouns(row, run, descrEvents)
- # convert annotated nouns to regressors
- descrConditions = convert_noun2cond(descrEvents)
- # counts for annotated categories
- print('Counts per Events:')
- count_events_conds(descrEvents)
- # counts for the regressors build from (pooled) categories
- print('\n\nCounts per regressor:')
- print('Descriptive Nouns:')
- count_events_conds(descrConditions)
- # write data of current run to file
- write_event_files(descrConditions, outDir)
|