from ChildProject.projects import ChildProject from ChildProject.annotations import AnnotationManager from lxml import etree import os import re import pandas as pd import multiprocessing as mp def extract_from_regex(pattern, subject): match = pattern.search(subject) return match.group(1) if match else '' class AnnotationImporter: parameters = { 'vtc': {'set': 'vtc', 'format': 'vtc_rttm'}, 'alice': {'set': 'alice/output', 'format': 'alice'}, 'vcm': {'set': 'vcm', 'format': 'vcm_rttm'}, 'its': {'set': 'its', 'format': 'its'} } def __init__(self, path, threads = 1): self.path = path self.project = ChildProject('.') self.am = AnnotationManager(self.project) def extract_offsets(self, row): its = row['its_filename'] xml = etree.parse(os.path.join('annotations/its/raw', its)) timestamp_pattern = re.compile(r"^P(?:T?)(\d+(\.\d+)?)S$") recording = xml.xpath('/ITS/ProcessingUnit/Recording[@num="{}"]'.format(row['filter']))[0] start_time = int(1000*float(extract_from_regex(timestamp_pattern, recording.get('startTime')))) end_time = int(1000*float(extract_from_regex(timestamp_pattern, recording.get('endTime')))) row['range_onset'] = 0 row['range_offset'] = end_time - start_time row['time_seek'] = -start_time return row def process(self, set, filter_func = None, raw_name_func = None, format = None, threads = 1): threads = threads if threads >= 1 else mp.cpu_count() input = self.project.recordings[['recording_filename', 'its_filename', 'duration']] input['set'] = self.parameters[set]['set'] if set in self.parameters else set input['format'] = self.parameters[set]['format'] if set in self.parameters else format input['filter'] = input['recording_filename'].str.extract(r"_([0-9]{1,})$") input['raw_filename'] = '' pool = mp.Pool(processes = threads) input = pd.DataFrame(pool.map(self.extract_offsets, input.to_dict(orient = 'records'))) if set == 'its': input['raw_filename'] = input['its_filename'] if callable(filter_func): input['filter'] = input.apply(filter_func, axis = 1) if callable(raw_name_func): input['raw_filename'] = input.apply(raw_name_func, axis = 1) input.dropna(subset = ['raw_filename'], inplace = True) self.am.remove_set(set, recursive = True) self.am.import_annotations(input, threads = threads) if set == 'alice': self.am.merge_sets( left_set = 'vtc', right_set = 'alice/output', left_columns = ['speaker_type'], right_columns = ['phonemes','syllables','words'], output_set = 'alice', threads = threads ) def matching_recordings(self, annotation): recordings = self.project.recordings[self.project.recordings['session_id'] == annotation['session_id']].copy() recordings = recordings[['recording_filename', 'duration', 'session_offset']] recordings = recordings.assign(**annotation) recordings['n'] = recordings['recording_filename'].str.extract(r"_([0-9]+)$").astype(int) recordings.sort_values(['n'], ascending = True, inplace = True) if not len(recordings): return {} recordings['start'] = recordings['duration'].cumsum().shift(periods = 1, fill_value = 0) # segments of the session covered by each recording recordings['range_onset'] = recordings['start'] recordings['range_offset'] = recordings['duration'].cumsum() # segments of the annotation covered by each recording, timestamps relative to beginning of the session recordings['range_onset'].clip(lower = annotation['range_onset'], upper = annotation['range_offset'], inplace = True) recordings['range_offset'].clip(lower = annotation['range_onset'], upper = annotation['range_offset'], inplace = True) # remove recordings that do not intersect with the annotation recordings = recordings[(recordings['range_offset']-recordings['range_onset']).astype(int) > 0] # translate session timestamps to recording-level timestamps. recordings['time_seek'] = annotation['time_seek'] - recordings['start'] recordings['range_onset'] -= recordings['start'] recordings['range_offset'] -= recordings['start'] return recordings