123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- # -*- coding: utf-8 -*-
- # %% import modules ======================
- import os
- n_threads = str(1)
- os.environ["OMP_NUM_THREADS"] = n_threads
- os.environ["OPENBLAS_NUM_THREADS"] = n_threads
- os.environ["MKL_NUM_THREADS"] = n_threads
- os.environ["VECLIB_MAXIMUM_THREADS"] = n_threads
- os.environ["NUMEXPR_NUM_THREADS"] = n_threads
- import scold
- from scold import draw, text_arr_sim, arr_sim, text_arr_sim_wasserstein
- import os.path as op
- import pandas as pd
- import numpy as np
- from tqdm import tqdm
- from string import ascii_lowercase
- from joblib import Parallel, delayed
- n_jobs = -1
- # %matplotlib qt
- # %% build character arrays
- chars = [*ascii_lowercase, 'ä', 'ö', 'ü', 'ß']
- font = 'Arial-Lgt.ttf'
- font_size = 250
- font_size_expl_geom = 75 # font size for the exploratory analyses examining geometric invariance (can be set to be smaller for feasibility)
- round_method = None
- char_arrs = [draw.text_array(c, font=font, size=font_size, method=round_method) for c in chars]
- char_arrs_smaller = [draw.text_array(c, font=font, size=font_size_expl_geom, method=round_method) for c in chars]
- out_path = 'stim_sim'
- res_path_ot = op.join(out_path, 'ot_variants')
- res_path_jacc_geom = op.join(out_path, 'jacc_geom')
- res_path_ot_geom = op.join(out_path, 'ot_geom')
- res_path_prereg = op.join(out_path, 'preregistered')
- # %% prepare for only processing one triangle
- tril_idx = np.tril_indices(len(chars), k=-1)
- tril_idx_long = np.array(tril_idx).T
- # %% complexity (size) distances (done serially as it's simple & fast)
- # char_ols = [draw.outline_shape(ca) for ca in char_arrs]
- # comp = [np.sum(co) for co in char_ols]
- comp = [np.sum(c) for c in char_arrs]
- comp_dists = [np.abs(c_i - c_j) for c_i in comp for c_j in comp]
- comp_dists_mat = np.array(comp_dists).reshape((len(chars), len(chars)))
- file_path = op.join('stim_sim', 'complexity')
- np.save(op.join(file_path, 'complexity.npy'), comp_dists_mat)
- # save to csv
- char1_ids = [c_i for c_i in chars for c_j in chars]
- char2_ids = [c_j for c_i in chars for c_j in chars]
- comp_df = pd.DataFrame({'char1': char1_ids, 'char2': char2_ids, 'comp_dist': comp_dists})
- comp_df.to_csv(op.join(file_path, 'complexity.csv'), index=False, encoding='utf-8')
- comp_features_df = pd.DataFrame( {'char': chars, 'pixel_sum': comp} )
- comp_features_df.to_csv(op.join(file_path, 'complexity_features.csv'), index=False, encoding='utf-8')
- # %% preregistered comparison: 1) Wasserstein Distance
- # cross-correlation-aligned, normalised Wasserstein distance
- desc = f'Preregistered Wasserstein Distance Measure'
- # calculate the similarities
- pw_res = Parallel(n_jobs=n_jobs)(
- delayed(arr_sim.partial_wasserstein_trans)(a_arr=char_arrs[i], b_arr=char_arrs[j], scale_mass=True, scale_mass_method='proportion', mass_normalise=False, distance_normalise=False, nb_dummies=2, del_weight=0.0, ins_weight=0.0, distance_metric='euclidean', translation='crosscor') for i, j in tqdm(tril_idx_long, desc=desc))
- # coerce into matrix
- pw_mat = np.zeros((len(chars), len(chars)))
- pw_mat[tril_idx] = np.array([pw_i['metric'] for pw_i in pw_res])
- pw_mat[tril_idx[1], tril_idx[0]] = pw_mat[tril_idx].T
- # check no ties in ranking for the preregistered analysis
- assert(len(np.unique(pw_mat[tril_idx])) == len(pw_mat[tril_idx]))
- # save to file
- file_name = f'ot'
- file_path = op.join(res_path_prereg, f'{file_name}.npy')
- np.save(file_path, pw_mat)
- # save to csv
- ot_vec = pw_mat[tril_idx]
- char1_ids = np.array(chars)[tril_idx[0]]
- char2_ids = np.array(chars)[tril_idx[1]]
- ot_df = pd.DataFrame({'char1': char1_ids, 'char2': char2_ids, 'ot': ot_vec})
- file_path = op.join(res_path_prereg, f'{file_name}.csv')
- ot_df.to_csv(file_path, index=False, encoding='utf-8')
- # %% preregistered comparison: 2) Jaccard Distance
- # cross-correlation-aligned Jaccard Distance
- desc = f'Preregistered Jaccard Distance Measure'
- # calculate the similarities
- j_res = Parallel(n_jobs=n_jobs)(
- delayed(text_arr_sim.opt_text_arr_sim)(
- chars[i], b_arr=char_arrs[j],
- font_a=font, size=font_size, method=round_method,
- measure='jaccard',
- translate=True,
- scale=False, fliplr=False, flipud=False, rotate=False)
- for i, j in tqdm(tril_idx_long, desc=desc))
- # coerce into matrix
- j_mat = np.ones((len(chars), len(chars)))
- j_mat[tril_idx] = np.array([j_i['jaccard'] for j_i in j_res])
- j_mat[tril_idx[1], tril_idx[0]] = j_mat[tril_idx].T
- # check no ties in ranking for the preregistered analysis
- assert(len(np.unique(j_mat[tril_idx])) == len(j_mat[tril_idx]))
- # invert to get distances (includes diagonals because np.ones was used)
- j_mat = 1 - j_mat
- # save to file
- file_name = f'jacc'
- file_path = op.join(res_path_prereg, f'{file_name}.npy')
- np.save(file_path, j_mat)
- # save to csv
- j_vec = j_mat[tril_idx]
- char1_ids = np.array(chars)[tril_idx[0]]
- char2_ids = np.array(chars)[tril_idx[1]]
- j_df = pd.DataFrame({'char1': char1_ids, 'char2': char2_ids, 'jacc': j_vec})
- file_path = op.join(res_path_prereg, f'{file_name}.csv')
- j_df.to_csv(file_path, index=False, encoding='utf-8')
- # %%
- # for exploratory analysis using Gromov-Wasserstein distance (geometric invariance)
- desc = f'Optimal Transport Gromov-Wasserstein Distance'
- pgw_res = Parallel(n_jobs=n_jobs)(
- delayed(arr_sim.partial_gromov_wasserstein)(a_arr=char_arrs_smaller[i], b_arr=char_arrs_smaller[j], scale_mass=True, scale_mass_method='proportion', mass_normalise=False, nb_dummies=2, del_weight=0.0, ins_weight=0.0) for i, j in tqdm(tril_idx_long, desc=desc))
- # coerce into array format
- pgw_mat = np.zeros((len(chars), len(chars)))
- pgw_mat[tril_idx] = pgw_res
- pgw_mat[tril_idx[1], tril_idx[0]] = pgw_mat[tril_idx].T
- # save to file
- file_name = 'ot_pgw'
- file_path = op.join(res_path_ot_geom, f'{file_name}.npy')
- np.save(file_path, pgw_mat)
- # save to csv
- ot_vec = pgw_mat[tril_idx]
- char1_ids = np.array(chars)[tril_idx[0]]
- char2_ids = np.array(chars)[tril_idx[1]]
- ot_df = pd.DataFrame({'char1': char1_ids, 'char2': char2_ids, 'ot': ot_vec})
- file_path = op.join(res_path_ot_geom, f'{file_name}.csv')
- ot_df.to_csv(file_path, index=False, encoding='utf-8')
- # %%
- # exploratory variants of Wasserstein distance
- for translation_opt in ('crosscor', 'opt'):
- for sq_euclidean in (False, True):
- desc = f'Optimal Transport sq_euclidean={int(sq_euclidean)} translation={translation_opt}'
- distance_metric = 'sqeuclidean' if sq_euclidean else 'euclidean'
- # calculate the similarities
- pw_res = Parallel(n_jobs=n_jobs)(
- delayed(arr_sim.partial_wasserstein_trans)(a_arr=char_arrs_smaller[i], b_arr=char_arrs_smaller[j], scale_mass=True, scale_mass_method='proportion', mass_normalise=False, distance_normalise=False, nb_dummies=2, del_weight=0.0, ins_weight=0.0, distance_metric=distance_metric, translation=translation_opt, n_startvals=5) for i, j in tqdm(tril_idx_long, desc=desc))
- # coerce into array format
- pw_mat = np.zeros((len(chars), len(chars)))
- pw_mat[tril_idx] = np.array([pw_i['metric'] for pw_i in pw_res])
- pw_mat[tril_idx[1], tril_idx[0]] = pw_mat[tril_idx].T
- # save to file
- file_name = f'ot_sq{int(sq_euclidean)}_t{translation_opt}'
- file_path = op.join(res_path_ot, f'{file_name}.npy')
- np.save(file_path, pw_mat)
- # save to csv
- ot_vec = pw_mat[tril_idx]
- char1_ids = np.array(chars)[tril_idx[0]]
- char2_ids = np.array(chars)[tril_idx[1]]
- ot_df = pd.DataFrame({'char1': char1_ids, 'char2': char2_ids, 'ot': ot_vec})
- file_path = op.join(res_path_ot, f'{file_name}.csv')
- ot_df.to_csv(file_path, index=False, encoding='utf-8')
- # %% exploratory variants of OT and Jaccard distance with geometric invariances
- for measure in ('jaccard', 'partial_wasserstein'):
- for translate in (False, True):
- for scale in (False, True):
- for rotate in (False, True):
- # for fliplr in (False, True):
- # for flipud in (False, True):
- fliplr = False
- flipud = False
- desc_lab = 'Jaccard' if measure=='jaccard' else 'Optimal Transport'
- desc = f'{desc_lab} T={int(translate)} S={int(scale)} R={int(rotate)}, Flr={int(fliplr)} Fud={int(flipud)}'
- # calculate the similarities
- if measure=='jaccard':
- opt_res = Parallel(n_jobs=n_jobs)(
- delayed(text_arr_sim.opt_text_arr_sim)(
- chars[i], b_arr=char_arrs_smaller[j],
- font_a=font, size=font_size_expl_geom, method=round_method,
- measure=measure,
- translate=translate,
- scale=scale, scale_eval_n=5, max_scale_change_factor=2.0,
- fliplr=fliplr, flipud=flipud,
- rotate=rotate, rotation_eval_n=5, rotation_bounds=(-np.inf, np.inf)
- ) for i, j in tqdm(tril_idx_long, desc=desc))
- elif measure=='partial_wasserstein':
- opt_res = Parallel(n_jobs=n_jobs)(
- delayed(text_arr_sim_wasserstein.opt_text_arr_sim)(
- chars[i], b_arr=char_arrs_smaller[j],
- font_a=font, size=font_size_expl_geom, method=round_method,
- measure=measure,
- translate=translate, translation_eval_n=5, max_translation_factor=0.99,
- scale=scale, scale_eval_n=5, max_scale_change_factor=2.0,
- fliplr=fliplr, flipud=flipud,
- rotate=rotate, rotation_eval_n=5, rotation_bounds=(-np.inf, np.inf),
- partial_wasserstein_kwargs={'scale_mass':True, 'scale_mass_method':'proportion', 'mass_normalise':False, 'distance_normalise':False, 'ins_weight':0.0, 'del_weight':0.0}
- ) for i, j in tqdm(tril_idx_long, desc=desc))
- # coerce into matrix
- opt_mat = np.ones((len(chars), len(chars)))
- opt_mat[tril_idx] = np.array([x_i[measure] for x_i in opt_res])
- opt_mat[tril_idx[1], tril_idx[0]] = opt_mat[tril_idx].T
- # invert to get distances (includes diagonals because np.ones was used)
- if measure == 'jaccard':
- opt_mat = 1 - opt_mat
- # save to file
- if measure == 'jaccard':
- file_name = f'jacc_T{int(translate)}_S{int(scale)}_R{int(rotate)}_Flr{int(fliplr)}_Fud{int(flipud)}'
- file_path = op.join(res_path_jacc_geom, f'{file_name}.npy')
- file_path_df = op.join(res_path_jacc_geom, f'{file_name}.csv')
- else:
- file_name = f'ot_T{int(translate)}_S{int(scale)}_R{int(rotate)}_Flr{int(fliplr)}_Fud{int(flipud)}'
- file_path = op.join(res_path_ot_geom, f'{file_name}.npy')
- file_path_df = op.join(res_path_ot_geom, f'{file_name}.csv')
- np.save(file_path, opt_mat)
- # save to csv
- char1_ids = np.array(chars)[tril_idx[0]]
- char2_ids = np.array(chars)[tril_idx[1]]
- opt_vec = opt_mat[tril_idx]
- if measure == 'jaccard':
- measure_lab = 'jacc'
- elif measure == 'partial_wasserstein':
- measure_lab = 'ot'
-
- opt_df = pd.DataFrame({'char1': char1_ids, 'char2': char2_ids, measure_lab: opt_vec})
- opt_df.to_csv(file_path_df, index=False, encoding='utf-8')
|