import os import sys import json sys.path.append("./") sys.path.append("../") sys.path.append(".../") from itertools import product from tqdm import tqdm import kenlm from math import log import numpy as np from make_noiser import Noise import pandas as pd import sys from get_most_probable_phonemes import get_most_probable_phonemes import random from collections import Counter random.seed(1023) LANGUAGES_TYPOLOGIES = { 'da' : ("Danish", "fusional"), 'de' : ("German", "fusional"), 'en' : ("English", "fusional"), 'es' : ("Spanish", "fusional"), 'et' : ("Estonian", "agglutinative"), 'eu' : ("Basque", "agglutinative"), 'fr' : ("French", "fusional"), 'ja' : ("Japanese", "agglutinative"), 'pl' : ("Polish", "fusional"), 'pt' : ("Portuguese", "fusional"), 'sr' : ("Serbian", "fusional"), 'tr' : ("Turkish", "agglutinative")} def compute_word_frequencies(word_train_corpus, pct=0.95) : frequencies = Counter() for line in word_train_corpus : line = line.strip() if not line : continue # line = line.strip() frequencies.update(Counter(line.split(" "))) return dict(frequencies) def statistics_word(utterances, word_frequencies, model) : phoneme_utterances = [] unique_words = set() nb_unk = 0 mlu_w = 0.0 mlu_p = 0.0 mean_word_frequencies = 0 nb_utterances = 0 nb_words = 0 statistics = {} for utterance in utterances : utterance = utterance.strip() if not utterance : continue nb_utterances += 1 utterance_w = utterance.replace("@", " ").replace("$", "") utterance_p = utterance.replace("@", " ").replace("$", " ") phoneme_utterances.append(utterance_p) utterance_words = utterance_w.split(" ") mlu_w += len(utterance_words) mlu_p += len(utterance_p.split(" ")) nb_words += len(utterance_words) unique_words |= set(utterance_words) for word in utterance_words : word = word.strip() if word in word_frequencies : mean_word_frequencies += word_frequencies[word] else : nb_unk += 1 mlu_w /= nb_utterances mlu_p /= nb_utterances ttr_w = len(unique_words) / nb_words ppl = model.perplexity("\n".join(phoneme_utterances)) entropy = log(ppl) statistics["ppl"] = ppl statistics["entropy"] = entropy statistics["mlu_w"] = mlu_w statistics["mlu_p"] = mlu_p statistics["ttr_w"] = ttr_w statistics["mean_word_frequencies"] = mean_word_frequencies statistics["nb_unk"] = nb_unk return statistics def create_sparse_combinantions(values) : sparse_combinantions = [] for value in values : for idx in range(len(values)) : sparse_values = [0.0] * len(values) sparse_values[idx] = value sparse_combinantions.append(tuple(sparse_values)) return set(sparse_combinantions) def test(json_files_directory, models_directory, phoneme_train_files, word_train_files, add_noise=False) : """ """ columns = ["language", "typology", "family", "speaker",\ "age", "perplexity", "entropy", "mlu", "mlu_without_repetition",\ "phonemes_order_noise", "speakers_noise_adult",\ "speakers_noise_child", "phonemes_noise"] results = pd.DataFrame(columns=columns, index=None) all_combinations = list(product((0.0, 0.25, 0.5, 0.75), repeat=4)) if add_noise else [((0.0, 0.0, 0.0, 0.0))] # sparse_combinantions = create_sparse_combinantions((0.0, 0.25, 0.5, 0.75)) # noise_values = np.linspace(0.0, 1.0, num=6) for phonemes_noise, speakers_noise_child, speakers_noise_adult, phonemes_order_noise in tqdm(all_combinations, total=len(all_combinations)) : for test_filename, model_filename in product(os.listdir(json_files_directory), os.listdir(models_directory)) : lg_iso, _ = test_filename.split(".") model_lg = model_filename.split(".")[0] if lg_iso != model_lg : continue print(lg_iso, model_lg) most_probable_phonemes = get_most_probable_phonemes(f"{phoneme_train_files}/{lg_iso}.one_sentence_per_line") word_frequencies = compute_word_frequencies(f"{word_train_files}/{lg_iso}.one_sentence_per_line") loaded_json = json.load(open(f"{json_files_directory}/{test_filename}")) if add_noise : noise = Noise(most_probable_phonemes, phonemes_order_noise=phonemes_order_noise, speakers_noise=(speakers_noise_child, speakers_noise_adult), phonemes_noise=phonemes_noise) loaded_json = noise(loaded_json) model = kenlm.Model(f"{models_directory}/{model_filename}") for family in loaded_json : for age in loaded_json[family] : if age == "None" : print(family, lg_iso, age); continue for speaker in loaded_json[family][age] : if speaker not in ["Adult", "Target_Child"] : continue # test_utterances = "\n".join(loaded_json[family][age][speaker]) # utterances = [utterance.split(" ") for utterance in loaded_json[family][age][speaker]] # mlu = np.mean([len(utterance) for utterance in utterances]) # mlu_without_repetition = np.mean([len(set(utterance)) for utterance in utterances]) # ppl = model.perplexity(test_utterances) # entropy = log(ppl) results_statistics = statistics_word(loaded_json[family][age][speaker], word_frequencies, model) language, typology = LANGUAGES_TYPOLOGIES[lg_iso] new_row = {"language" : language, "typology" : typology, "family" : family, "speaker" : speaker, "age" : float(age), "perplexity" : results_statistics["ppl"], "entropy" : results_statistics["entropy"], "mlu_w" : results_statistics["mlu_w"], "mlu_p" : results_statistics["mlu_p"], "ttr_w" : results_statistics["ttr_w"], "mean_word_frequencies" : results_statistics["mean_word_frequencies"], "nb_unk" : results_statistics["nb_unk"], "phonemes_order_noise" : phonemes_order_noise, "speakers_noise_adult" : speakers_noise_adult, "speakers_noise_child" : speakers_noise_child, "phonemes_noise" : phonemes_noise} results = results.append(new_row, ignore_index=True) return results if __name__ == "__main__": from argparse import ArgumentParser, BooleanOptionalAction parser = ArgumentParser() parser.add_argument('--phoneme_train_directory', required=True, help="Dataset containing the train files in phonemes (dot one_sentence_per_line) " ) parser.add_argument('--word_train_directory', required=True, help="Dataset containing the train files in words (dot one_sentence_per_line) " ) parser.add_argument('--models_directory', required=True, help="Folder containing the estimated parameters" ) parser.add_argument('--json_files_directory', required=True, help="Directory containing json files for test" ) parser.add_argument('--out_dirname', required=True, help="Out directory" ) parser.add_argument('--out_filename', required=True, help="Out filename" ) parser.add_argument("--add_noise", action=BooleanOptionalAction) args = parser.parse_args() add_noise = args.add_noise json_files_directory = args.json_files_directory phoneme_train_files, word_train_files = args.phoneme_train_directory, args.word_train_directory models_directory = args.models_directory out_dirname = args.out_dirname out_filename = args.out_filename if not os.path.exists("results"): os.makedirs("results") test(json_files_directory, models_directory, phoneme_train_files, word_train_files, add_noise).to_csv(f"{out_dirname}/{out_filename}.csv")