12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- import pandas as pd
- import argparse
- import io
- import os
- from zipfile import ZipFile
- import numpy as np
- import tqdm
- import multiprocessing
- import functools
- import re
- def create_single_table_meas(df_dict):
- table_names = ['patients', 'visits', 'tests', 'measurements']
- for i, table_name in enumerate(table_names):
- if i == 0:
- so_far = df_dict[table_name]
- else:
- new = df_dict[table_name]
- so_far = so_far.merge(right=new,
- right_on=['%s_uid' % (name[:-1]) for name in table_names[:i]],
- left_on=['%s_uid' % (name[:-1]) for name in table_names[:i]],
- how='inner')
- return so_far
- def load_ts(files, archive):
- with ZipFile(archive, 'r') as z:
- tmp_list = []
- for f in files:
- # Retrieve index from filename
- ix = int(re.findall('[0-9]+', os.path.basename(f))[0])
- tmp = np.loadtxt(io.BytesIO(z.read(f)))
- tmp_list.append({'ix': ix, 'timeseries': tmp})
- return tmp_list
- def single_dataframe(archive):
- '''Create a single denormalized dataframe containing the patients, visits, tests, and measurements
- '''
- tables = ['patient', 'visit', 'test', 'measurement', 'edss']
- with ZipFile(archive, 'r') as z:
- ts = []
- # Get all the timeseries
- # We'll do this in parallel as loadtxt is rather slow
- p = multiprocessing.Pool()
- ts_files = [f for f in z.namelist() if '.txt' in f]
- func = functools.partial(load_ts, archive=archive)
- n_cpu = multiprocessing.cpu_count()
- splits = [ts_files[splt[0]:splt[-1]+1] for splt in np.array_split(np.arange(len(ts_files)), min(n_cpu, len(ts_files)))]
- ts = p.map(func, splits)
- p.close()
- # Flatten list
- ts = sum(ts, [])
- ts_df = pd.DataFrame(ts)
- tables_dict = {}
- for tab in tables:
- tab_file = [k for k in z.namelist() if tab in k][0]
- # Explicitly define the date format, as automatic date inference is dodgy
- dateparse = lambda x: pd.datetime.strptime(x, '%Y-%m-%d')
- df = pd.read_csv(io.BytesIO(z.read(tab_file)))
- # Convert the datetime fields
- date_fields = [k for k in df.keys() if 'date' in k]
- for d in date_fields:
- df[d] = pd.to_datetime(df[d], format='%Y-%m-%d')
- tables_dict[tab + 's'] = df
-
- df = create_single_table_meas(tables_dict)
- cdf = tables_dict['edsss']
- assert len(ts_df) == len(df)
- tot_df = df.merge(right=ts_df, left_on='timeseries', right_on='ix', how='inner')
- tot_df.drop(['timeseries_x', 'ix'], axis=1, inplace=True)
- tot_df.rename({'timeseries_y': 'timeseries'}, axis=1, inplace=True)
- return tot_df, cdf
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument('archive')
- parser.add_argument('-outputfile', default='reconstructed_mep.p')
- args = vars(parser.parse_args())
- df = single_dataframe(args['archive'])
- df.to_pickle(args['outputfile'])
|