|
@@ -0,0 +1,88 @@
|
|
|
+import pandas as pd
|
|
|
+import argparse
|
|
|
+import io
|
|
|
+import os
|
|
|
+from zipfile import ZipFile
|
|
|
+import numpy as np
|
|
|
+import tqdm
|
|
|
+import multiprocessing
|
|
|
+import functools
|
|
|
+import re
|
|
|
+
|
|
|
+def create_single_table_meas(df_dict):
|
|
|
+ table_names = ['patients', 'visits', 'tests', 'measurements']
|
|
|
+ for i, table_name in enumerate(table_names):
|
|
|
+ if i == 0:
|
|
|
+ so_far = df_dict[table_name]
|
|
|
+ else:
|
|
|
+ new = df_dict[table_name]
|
|
|
+ so_far = so_far.merge(right=new,
|
|
|
+ right_on=['%s_uid' % (name[:-1]) for name in table_names[:i]],
|
|
|
+ left_on=['%s_uid' % (name[:-1]) for name in table_names[:i]],
|
|
|
+ how='inner')
|
|
|
+ return so_far
|
|
|
+
|
|
|
+def load_ts(files, archive):
|
|
|
+ with ZipFile(archive, 'r') as z:
|
|
|
+ tmp_list = []
|
|
|
+ for f in files:
|
|
|
+ # Retrieve index from filename
|
|
|
+ ix = int(re.findall('[0-9]+', os.path.basename(f))[0])
|
|
|
+ tmp = np.loadtxt(io.BytesIO(z.read(f)))
|
|
|
+ tmp_list.append({'ix': ix, 'timeseries': tmp})
|
|
|
+ return tmp_list
|
|
|
+
|
|
|
+def single_dataframe(archive):
|
|
|
+ '''Create a single denormalized dataframe containing the patients, visits, tests, and measurements
|
|
|
+ '''
|
|
|
+ tables = ['patient', 'visit', 'test', 'measurement', 'edss']
|
|
|
+
|
|
|
+ with ZipFile(archive, 'r') as z:
|
|
|
+ ts = []
|
|
|
+ # Get all the timeseries
|
|
|
+ # We'll do this in parallel as loadtxt is rather slow
|
|
|
+ p = multiprocessing.Pool()
|
|
|
+ ts_files = [f for f in z.namelist() if '.txt' in f]
|
|
|
+ func = functools.partial(load_ts, archive=archive)
|
|
|
+ n_cpu = multiprocessing.cpu_count()
|
|
|
+ splits = [ts_files[splt[0]:splt[-1]+1] for splt in np.array_split(np.arange(len(ts_files)), min(n_cpu, len(ts_files)))]
|
|
|
+ ts = p.map(func, splits)
|
|
|
+ p.close()
|
|
|
+
|
|
|
+ # Flatten list
|
|
|
+ ts = sum(ts, [])
|
|
|
+
|
|
|
+ ts_df = pd.DataFrame(ts)
|
|
|
+
|
|
|
+ tables_dict = {}
|
|
|
+ for tab in tables:
|
|
|
+ tab_file = [k for k in z.namelist() if tab in k][0]
|
|
|
+ # Explicitly define the date format, as automatic date inference is dodgy
|
|
|
+ dateparse = lambda x: pd.datetime.strptime(x, '%Y-%m-%d')
|
|
|
+ df = pd.read_csv(io.BytesIO(z.read(tab_file)))
|
|
|
+ # Convert the datetime fields
|
|
|
+ date_fields = [k for k in df.keys() if 'date' in k]
|
|
|
+ for d in date_fields:
|
|
|
+ df[d] = pd.to_datetime(df[d], format='%Y-%m-%d')
|
|
|
+ tables_dict[tab + 's'] = df
|
|
|
+
|
|
|
+ df = create_single_table_meas(tables_dict)
|
|
|
+ cdf = tables_dict['edsss']
|
|
|
+
|
|
|
+ assert len(ts_df) == len(df)
|
|
|
+
|
|
|
+ tot_df = df.merge(right=ts_df, left_on='timeseries', right_on='ix', how='inner')
|
|
|
+ tot_df.drop(['timeseries_x', 'ix'], axis=1, inplace=True)
|
|
|
+ tot_df.rename({'timeseries_y': 'timeseries'}, axis=1, inplace=True)
|
|
|
+
|
|
|
+ return tot_df, cdf
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ parser = argparse.ArgumentParser()
|
|
|
+ parser.add_argument('archive')
|
|
|
+ parser.add_argument('-outputfile', default='reconstructed_mep.p')
|
|
|
+ args = vars(parser.parse_args())
|
|
|
+
|
|
|
+ df = single_dataframe(args['archive'])
|
|
|
+ df.to_pickle(args['outputfile'])
|