# -*- coding: utf-8 -*- # %% import modules ====================== import os n_threads = str(1) os.environ["OMP_NUM_THREADS"] = n_threads os.environ["OPENBLAS_NUM_THREADS"] = n_threads os.environ["MKL_NUM_THREADS"] = n_threads os.environ["VECLIB_MAXIMUM_THREADS"] = n_threads os.environ["NUMEXPR_NUM_THREADS"] = n_threads import scold from scold import draw, text_arr_sim, arr_sim, text_arr_sim_wasserstein import os.path as op import pandas as pd import numpy as np from tqdm import tqdm from string import ascii_lowercase from joblib import Parallel, delayed n_jobs = -1 # %matplotlib qt # %% build character arrays chars = [*ascii_lowercase, 'ä', 'ö', 'ü', 'ß'] font = 'Arial-Lgt.ttf' font_size = 250 font_size_expl_geom = 75 # font size for the exploratory analyses examining geometric invariance (can be set to be smaller for feasibility) round_method = None char_arrs = [draw.text_array(c, font=font, size=font_size, method=round_method) for c in chars] char_arrs_smaller = [draw.text_array(c, font=font, size=font_size_expl_geom, method=round_method) for c in chars] out_path = 'stim_sim' res_path_ot = op.join(out_path, 'ot_variants') res_path_jacc_geom = op.join(out_path, 'jacc_geom') res_path_ot_geom = op.join(out_path, 'ot_geom') res_path_prereg = op.join(out_path, 'preregistered') # %% prepare for only processing one triangle tril_idx = np.tril_indices(len(chars), k=-1) tril_idx_long = np.array(tril_idx).T # %% complexity (size) distances (done serially as it's simple & fast) # char_ols = [draw.outline_shape(ca) for ca in char_arrs] # comp = [np.sum(co) for co in char_ols] comp = [np.sum(c) for c in char_arrs] comp_dists = [np.abs(c_i - c_j) for c_i in comp for c_j in comp] comp_dists_mat = np.array(comp_dists).reshape((len(chars), len(chars))) file_path = op.join('stim_sim', 'complexity') np.save(op.join(file_path, 'complexity.npy'), comp_dists_mat) # save to csv char1_ids = [c_i for c_i in chars for c_j in chars] char2_ids = [c_j for c_i in chars for c_j in chars] comp_df = pd.DataFrame({'char1': char1_ids, 'char2': char2_ids, 'comp_dist': comp_dists}) comp_df.to_csv(op.join(file_path, 'complexity.csv'), index=False, encoding='utf-8') comp_features_df = pd.DataFrame( {'char': chars, 'pixel_sum': comp} ) comp_features_df.to_csv(op.join(file_path, 'complexity_features.csv'), index=False, encoding='utf-8') # %% preregistered comparison: 1) Wasserstein Distance # cross-correlation-aligned, normalised Wasserstein distance desc = f'Preregistered Wasserstein Distance Measure' # calculate the similarities pw_res = Parallel(n_jobs=n_jobs)( delayed(arr_sim.partial_wasserstein_trans)(a_arr=char_arrs[i], b_arr=char_arrs[j], scale_mass=True, scale_mass_method='proportion', mass_normalise=False, distance_normalise=False, nb_dummies=2, del_weight=0.0, ins_weight=0.0, distance_metric='euclidean', translation='crosscor') for i, j in tqdm(tril_idx_long, desc=desc)) # coerce into matrix pw_mat = np.zeros((len(chars), len(chars))) pw_mat[tril_idx] = np.array([pw_i['metric'] for pw_i in pw_res]) pw_mat[tril_idx[1], tril_idx[0]] = pw_mat[tril_idx].T # check no ties in ranking for the preregistered analysis assert(len(np.unique(pw_mat[tril_idx])) == len(pw_mat[tril_idx])) # save to file file_name = f'ot' file_path = op.join(res_path_prereg, f'{file_name}.npy') np.save(file_path, pw_mat) # save to csv ot_vec = pw_mat[tril_idx] char1_ids = np.array(chars)[tril_idx[0]] char2_ids = np.array(chars)[tril_idx[1]] ot_df = pd.DataFrame({'char1': char1_ids, 'char2': char2_ids, 'ot': ot_vec}) file_path = op.join(res_path_prereg, f'{file_name}.csv') ot_df.to_csv(file_path, index=False, encoding='utf-8') # %% preregistered comparison: 2) Jaccard Distance # cross-correlation-aligned Jaccard Distance desc = f'Preregistered Jaccard Distance Measure' # calculate the similarities j_res = Parallel(n_jobs=n_jobs)( delayed(text_arr_sim.opt_text_arr_sim)( chars[i], b_arr=char_arrs[j], font_a=font, size=font_size, method=round_method, measure='jaccard', translate=True, scale=False, fliplr=False, flipud=False, rotate=False) for i, j in tqdm(tril_idx_long, desc=desc)) # coerce into matrix j_mat = np.ones((len(chars), len(chars))) j_mat[tril_idx] = np.array([j_i['jaccard'] for j_i in j_res]) j_mat[tril_idx[1], tril_idx[0]] = j_mat[tril_idx].T # check no ties in ranking for the preregistered analysis assert(len(np.unique(j_mat[tril_idx])) == len(j_mat[tril_idx])) # invert to get distances (includes diagonals because np.ones was used) j_mat = 1 - j_mat # save to file file_name = f'jacc' file_path = op.join(res_path_prereg, f'{file_name}.npy') np.save(file_path, j_mat) # save to csv j_vec = j_mat[tril_idx] char1_ids = np.array(chars)[tril_idx[0]] char2_ids = np.array(chars)[tril_idx[1]] j_df = pd.DataFrame({'char1': char1_ids, 'char2': char2_ids, 'jacc': j_vec}) file_path = op.join(res_path_prereg, f'{file_name}.csv') j_df.to_csv(file_path, index=False, encoding='utf-8') # %% # for exploratory analysis using Gromov-Wasserstein distance (geometric invariance) desc = f'Optimal Transport Gromov-Wasserstein Distance' pgw_res = Parallel(n_jobs=n_jobs)( delayed(arr_sim.partial_gromov_wasserstein)(a_arr=char_arrs_smaller[i], b_arr=char_arrs_smaller[j], scale_mass=True, scale_mass_method='proportion', mass_normalise=False, nb_dummies=2, del_weight=0.0, ins_weight=0.0) for i, j in tqdm(tril_idx_long, desc=desc)) # coerce into array format pgw_mat = np.zeros((len(chars), len(chars))) pgw_mat[tril_idx] = pgw_res pgw_mat[tril_idx[1], tril_idx[0]] = pgw_mat[tril_idx].T # save to file file_name = 'ot_pgw' file_path = op.join(res_path_ot_geom, f'{file_name}.npy') np.save(file_path, pgw_mat) # save to csv ot_vec = pgw_mat[tril_idx] char1_ids = np.array(chars)[tril_idx[0]] char2_ids = np.array(chars)[tril_idx[1]] ot_df = pd.DataFrame({'char1': char1_ids, 'char2': char2_ids, 'ot': ot_vec}) file_path = op.join(res_path_ot_geom, f'{file_name}.csv') ot_df.to_csv(file_path, index=False, encoding='utf-8') # %% # exploratory variants of Wasserstein distance for translation_opt in ('crosscor', 'opt'): for sq_euclidean in (False, True): desc = f'Optimal Transport sq_euclidean={int(sq_euclidean)} translation={translation_opt}' distance_metric = 'sqeuclidean' if sq_euclidean else 'euclidean' # calculate the similarities pw_res = Parallel(n_jobs=n_jobs)( delayed(arr_sim.partial_wasserstein_trans)(a_arr=char_arrs_smaller[i], b_arr=char_arrs_smaller[j], scale_mass=True, scale_mass_method='proportion', mass_normalise=False, distance_normalise=False, nb_dummies=2, del_weight=0.0, ins_weight=0.0, distance_metric=distance_metric, translation=translation_opt, n_startvals=5) for i, j in tqdm(tril_idx_long, desc=desc)) # coerce into array format pw_mat = np.zeros((len(chars), len(chars))) pw_mat[tril_idx] = np.array([pw_i['metric'] for pw_i in pw_res]) pw_mat[tril_idx[1], tril_idx[0]] = pw_mat[tril_idx].T # save to file file_name = f'ot_sq{int(sq_euclidean)}_t{translation_opt}' file_path = op.join(res_path_ot, f'{file_name}.npy') np.save(file_path, pw_mat) # save to csv ot_vec = pw_mat[tril_idx] char1_ids = np.array(chars)[tril_idx[0]] char2_ids = np.array(chars)[tril_idx[1]] ot_df = pd.DataFrame({'char1': char1_ids, 'char2': char2_ids, 'ot': ot_vec}) file_path = op.join(res_path_ot, f'{file_name}.csv') ot_df.to_csv(file_path, index=False, encoding='utf-8') # %% exploratory variants of OT and Jaccard distance with geometric invariances for measure in ('jaccard', 'partial_wasserstein'): for translate in (False, True): for scale in (False, True): for rotate in (False, True): # for fliplr in (False, True): # for flipud in (False, True): fliplr = False flipud = False desc_lab = 'Jaccard' if measure=='jaccard' else 'Optimal Transport' desc = f'{desc_lab} T={int(translate)} S={int(scale)} R={int(rotate)}, Flr={int(fliplr)} Fud={int(flipud)}' # calculate the similarities if measure=='jaccard': opt_res = Parallel(n_jobs=n_jobs)( delayed(text_arr_sim.opt_text_arr_sim)( chars[i], b_arr=char_arrs_smaller[j], font_a=font, size=font_size_expl_geom, method=round_method, measure=measure, translate=translate, scale=scale, scale_eval_n=5, max_scale_change_factor=2.0, fliplr=fliplr, flipud=flipud, rotate=rotate, rotation_eval_n=5, rotation_bounds=(-np.inf, np.inf) ) for i, j in tqdm(tril_idx_long, desc=desc)) elif measure=='partial_wasserstein': opt_res = Parallel(n_jobs=n_jobs)( delayed(text_arr_sim_wasserstein.opt_text_arr_sim)( chars[i], b_arr=char_arrs_smaller[j], font_a=font, size=font_size_expl_geom, method=round_method, measure=measure, translate=translate, translation_eval_n=5, max_translation_factor=0.99, scale=scale, scale_eval_n=5, max_scale_change_factor=2.0, fliplr=fliplr, flipud=flipud, rotate=rotate, rotation_eval_n=5, rotation_bounds=(-np.inf, np.inf), partial_wasserstein_kwargs={'scale_mass':True, 'scale_mass_method':'proportion', 'mass_normalise':False, 'distance_normalise':False, 'ins_weight':0.0, 'del_weight':0.0} ) for i, j in tqdm(tril_idx_long, desc=desc)) # coerce into matrix opt_mat = np.ones((len(chars), len(chars))) opt_mat[tril_idx] = np.array([x_i[measure] for x_i in opt_res]) opt_mat[tril_idx[1], tril_idx[0]] = opt_mat[tril_idx].T # invert to get distances (includes diagonals because np.ones was used) if measure == 'jaccard': opt_mat = 1 - opt_mat # save to file if measure == 'jaccard': file_name = f'jacc_T{int(translate)}_S{int(scale)}_R{int(rotate)}_Flr{int(fliplr)}_Fud{int(flipud)}' file_path = op.join(res_path_jacc_geom, f'{file_name}.npy') file_path_df = op.join(res_path_jacc_geom, f'{file_name}.csv') else: file_name = f'ot_T{int(translate)}_S{int(scale)}_R{int(rotate)}_Flr{int(fliplr)}_Fud{int(flipud)}' file_path = op.join(res_path_ot_geom, f'{file_name}.npy') file_path_df = op.join(res_path_ot_geom, f'{file_name}.csv') np.save(file_path, opt_mat) # save to csv char1_ids = np.array(chars)[tril_idx[0]] char2_ids = np.array(chars)[tril_idx[1]] opt_vec = opt_mat[tril_idx] if measure == 'jaccard': measure_lab = 'jacc' elif measure == 'partial_wasserstein': measure_lab = 'ot' opt_df = pd.DataFrame({'char1': char1_ids, 'char2': char2_ids, measure_lab: opt_vec}) opt_df.to_csv(file_path_df, index=False, encoding='utf-8')