import pandas as pd import argparse import io import os from zipfile import ZipFile import numpy as np import tqdm import multiprocessing import functools import re def create_single_table_meas(df_dict): table_names = ['patients', 'visits', 'tests', 'measurements'] for i, table_name in enumerate(table_names): if i == 0: so_far = df_dict[table_name] else: new = df_dict[table_name] so_far = so_far.merge(right=new, right_on=['%s_uid' % (name[:-1]) for name in table_names[:i]], left_on=['%s_uid' % (name[:-1]) for name in table_names[:i]], how='inner') return so_far def load_ts(uid_files, archive): uid, files = uid_files with ZipFile(archive, 'r') as z: tmp_list = [] for f in tqdm.tqdm(files, disable=(uid != 0)): # Retrieve index from filename ix = int(re.findall('[0-9]+', os.path.basename(f))[0]) tmp = np.loadtxt(io.BytesIO(z.read(f))) tmp_list.append({'ix': ix, 'timeseries': tmp}) return tmp_list def single_dataframe(archive): '''Create a single denormalized dataframe containing the patients, visits, tests, and measurements ''' tables = ['patient', 'visit', 'test', 'measurement', 'edss'] with ZipFile(archive, 'r') as z: ts = [] # Get all the timeseries # We'll do this in parallel as loadtxt is rather slow print('Linking timeseries') p = multiprocessing.Pool() ts_files = [f for f in z.namelist() if '.txt' in f] func = functools.partial(load_ts, archive=archive) n_cpu = multiprocessing.cpu_count() splits = [(uid, ts_files[splt[0]:splt[-1]+1]) for uid, splt in enumerate(np.array_split(np.arange(len(ts_files)), min(n_cpu, len(ts_files))))] ts = p.map(func, splits) p.close() # Flatten list ts = sum(ts, []) ts_df = pd.DataFrame(ts) tables_dict = {} for tab in tables: tab_file = [k for k in z.namelist() if tab in k][0] # Explicitly define the date format, as automatic date inference is dodgy dateparse = lambda x: pd.datetime.strptime(x, '%Y-%m-%d') df = pd.read_csv(io.BytesIO(z.read(tab_file))) # Convert the datetime fields date_fields = [k for k in df.keys() if 'date' in k] for d in date_fields: df[d] = pd.to_datetime(df[d], format='%Y-%m-%d') tables_dict[tab + 's'] = df df = create_single_table_meas(tables_dict) cdf = tables_dict['edsss'] assert len(ts_df) == len(df) tot_df = df.merge(right=ts_df, left_on='timeseries', right_on='ix', how='inner') tot_df.drop(['timeseries_x', 'ix'], axis=1, inplace=True) tot_df.rename({'timeseries_y': 'timeseries'}, axis=1, inplace=True) return tot_df, cdf if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('archive') parser.add_argument('-outputfile', default='reconstructed_mep.p') args = vars(parser.parse_args()) df = single_dataframe(args['archive']) df.to_pickle(args['outputfile'])