|
@@ -0,0 +1,209 @@
|
|
|
+import os
|
|
|
+import sys
|
|
|
+import json
|
|
|
+sys.path.append("./")
|
|
|
+sys.path.append("../")
|
|
|
+sys.path.append(".../")
|
|
|
+from itertools import product
|
|
|
+from tqdm import tqdm
|
|
|
+import kenlm
|
|
|
+from math import log
|
|
|
+import numpy as np
|
|
|
+from make_noiser import Noise
|
|
|
+import pandas as pd
|
|
|
+import sys
|
|
|
+from get_most_probable_phonemes import get_most_probable_phonemes
|
|
|
+import random
|
|
|
+from collections import Counter
|
|
|
+random.seed(1023)
|
|
|
+
|
|
|
+
|
|
|
+LANGUAGES_TYPOLOGIES = {
|
|
|
+ 'da' : ("Danish", "fusional"),
|
|
|
+ 'de' : ("German", "fusional"),
|
|
|
+ 'en' : ("English", "fusional"),
|
|
|
+ 'es' : ("Spanish", "fusional"),
|
|
|
+ 'et' : ("Estonian", "agglutinative"),
|
|
|
+ 'eu' : ("Basque", "agglutinative"),
|
|
|
+ 'fr' : ("French", "fusional"),
|
|
|
+ 'ja' : ("Japanese", "agglutinative"),
|
|
|
+ 'pl' : ("Polish", "fusional"),
|
|
|
+ 'pt' : ("Portuguese", "fusional"),
|
|
|
+ 'sr' : ("Serbian", "fusional"),
|
|
|
+ 'tr' : ("Turkish", "agglutinative")}
|
|
|
+
|
|
|
+def compute_word_frequencies(word_train_corpus, pct=0.95) :
|
|
|
+ frequencies = Counter()
|
|
|
+ for line in word_train_corpus :
|
|
|
+ line = line.strip()
|
|
|
+ if not line : continue
|
|
|
+ # line = line.strip()
|
|
|
+ frequencies.update(Counter(line.split(" ")))
|
|
|
+ return dict(frequencies)
|
|
|
+
|
|
|
+
|
|
|
+def statistics_word(utterances, word_frequencies, model) :
|
|
|
+ phoneme_utterances = []
|
|
|
+ unique_words = set()
|
|
|
+ nb_unk = 0
|
|
|
+ mlu_w = 0.0
|
|
|
+ mlu_p = 0.0
|
|
|
+ mean_word_frequencies = 0
|
|
|
+ nb_utterances = 0
|
|
|
+ nb_words = 0
|
|
|
+
|
|
|
+ statistics = {}
|
|
|
+ for utterance in utterances :
|
|
|
+ utterance = utterance.strip()
|
|
|
+ if not utterance : continue
|
|
|
+ nb_utterances += 1
|
|
|
+
|
|
|
+ utterance_w = utterance.replace("@", " ").replace("$", "")
|
|
|
+ utterance_p = utterance.replace("@", " ").replace("$", " ")
|
|
|
+ phoneme_utterances.append(utterance_p)
|
|
|
+
|
|
|
+ utterance_words = utterance_w.split(" ")
|
|
|
+ mlu_w += len(utterance_words)
|
|
|
+ mlu_p += len(utterance_p.split(" "))
|
|
|
+ nb_words += len(utterance_words)
|
|
|
+ unique_words |= set(utterance_words)
|
|
|
+
|
|
|
+ for word in utterance_words :
|
|
|
+ word = word.strip()
|
|
|
+ if word in word_frequencies :
|
|
|
+ mean_word_frequencies += word_frequencies[word]
|
|
|
+ else :
|
|
|
+ nb_unk += 1
|
|
|
+
|
|
|
+ mlu_w /= nb_utterances
|
|
|
+ mlu_p /= nb_utterances
|
|
|
+ ttr_w = len(unique_words) / nb_words
|
|
|
+
|
|
|
+ ppl = model.perplexity("\n".join(phoneme_utterances))
|
|
|
+ entropy = log(ppl)
|
|
|
+
|
|
|
+ statistics["ppl"] = ppl
|
|
|
+ statistics["entropy"] = entropy
|
|
|
+ statistics["mlu_w"] = mlu_w
|
|
|
+ statistics["mlu_p"] = mlu_p
|
|
|
+ statistics["ttr_w"] = ttr_w
|
|
|
+ statistics["mean_word_frequencies"] = mean_word_frequencies
|
|
|
+ statistics["nb_unk"] = nb_unk
|
|
|
+
|
|
|
+ return statistics
|
|
|
+
|
|
|
+def create_sparse_combinantions(values) :
|
|
|
+ sparse_combinantions = []
|
|
|
+ for value in values :
|
|
|
+ for idx in range(len(values)) :
|
|
|
+ sparse_values = [0.0] * len(values)
|
|
|
+ sparse_values[idx] = value
|
|
|
+ sparse_combinantions.append(tuple(sparse_values))
|
|
|
+ return set(sparse_combinantions)
|
|
|
+
|
|
|
+def test(json_files_directory, models_directory, phoneme_train_files, word_train_files, add_noise=False) :
|
|
|
+ """
|
|
|
+ """
|
|
|
+ columns = ["language", "typology", "family", "speaker",\
|
|
|
+ "age", "perplexity", "entropy", "mlu", "mlu_without_repetition",\
|
|
|
+ "phonemes_order_noise", "speakers_noise_adult",\
|
|
|
+ "speakers_noise_child", "phonemes_noise"]
|
|
|
+ results = pd.DataFrame(columns=columns, index=None)
|
|
|
+ all_combinations = list(product((0.0, 0.25, 0.5, 0.75), repeat=4)) if add_noise else [((0.0, 0.0, 0.0, 0.0))]
|
|
|
+ # sparse_combinantions = create_sparse_combinantions((0.0, 0.25, 0.5, 0.75))
|
|
|
+ # noise_values = np.linspace(0.0, 1.0, num=6)
|
|
|
+ for phonemes_noise, speakers_noise_child, speakers_noise_adult, phonemes_order_noise in tqdm(all_combinations, total=len(all_combinations)) :
|
|
|
+ for test_filename, model_filename in product(os.listdir(json_files_directory), os.listdir(models_directory)) :
|
|
|
+ lg_iso, _ = test_filename.split(".")
|
|
|
+ model_lg = model_filename.split(".")[0]
|
|
|
+ if lg_iso != model_lg : continue
|
|
|
+ print(lg_iso, model_lg)
|
|
|
+ most_probable_phonemes = get_most_probable_phonemes(f"{phoneme_train_files}/{lg_iso}.one_sentence_per_line")
|
|
|
+ word_frequencies = compute_word_frequencies(f"{word_train_files}/{lg_iso}.one_sentence_per_line")
|
|
|
+ loaded_json = json.load(open(f"{json_files_directory}/{test_filename}"))
|
|
|
+ if add_noise :
|
|
|
+ noise = Noise(most_probable_phonemes,
|
|
|
+ phonemes_order_noise=phonemes_order_noise,
|
|
|
+ speakers_noise=(speakers_noise_child, speakers_noise_adult),
|
|
|
+ phonemes_noise=phonemes_noise)
|
|
|
+ loaded_json = noise(loaded_json)
|
|
|
+ model = kenlm.Model(f"{models_directory}/{model_filename}")
|
|
|
+ for family in loaded_json :
|
|
|
+ for age in loaded_json[family] :
|
|
|
+ if age == "None" : print(family, lg_iso, age); continue
|
|
|
+ for speaker in loaded_json[family][age] :
|
|
|
+ if speaker not in ["Adult", "Target_Child"] : continue
|
|
|
+ # test_utterances = "\n".join(loaded_json[family][age][speaker])
|
|
|
+ # utterances = [utterance.split(" ") for utterance in loaded_json[family][age][speaker]]
|
|
|
+ # mlu = np.mean([len(utterance) for utterance in utterances])
|
|
|
+ # mlu_without_repetition = np.mean([len(set(utterance)) for utterance in utterances])
|
|
|
+ # ppl = model.perplexity(test_utterances)
|
|
|
+ # entropy = log(ppl)
|
|
|
+
|
|
|
+ results_statistics = statistics_word(loaded_json[family][age][speaker], word_frequencies, model)
|
|
|
+ language, typology = LANGUAGES_TYPOLOGIES[lg_iso]
|
|
|
+ new_row = {"language" : language,
|
|
|
+ "typology" : typology,
|
|
|
+ "family" : family,
|
|
|
+ "speaker" : speaker,
|
|
|
+ "age" : float(age),
|
|
|
+ "perplexity" : results_statistics["ppl"],
|
|
|
+ "entropy" : results_statistics["entropy"],
|
|
|
+ "mlu_w" : results_statistics["mlu_w"],
|
|
|
+ "mlu_p" : results_statistics["mlu_p"],
|
|
|
+ "ttr_w" : results_statistics["ttr_w"],
|
|
|
+ "mean_word_frequencies" : results_statistics["mean_word_frequencies"],
|
|
|
+ "nb_unk" : results_statistics["nb_unk"],
|
|
|
+ "phonemes_order_noise" : phonemes_order_noise,
|
|
|
+ "speakers_noise_adult" : speakers_noise_adult,
|
|
|
+ "speakers_noise_child" : speakers_noise_child,
|
|
|
+ "phonemes_noise" : phonemes_noise}
|
|
|
+ results = results.append(new_row, ignore_index=True)
|
|
|
+ return results
|
|
|
+if __name__ == "__main__":
|
|
|
+ from argparse import ArgumentParser, BooleanOptionalAction
|
|
|
+
|
|
|
+ parser = ArgumentParser()
|
|
|
+ parser.add_argument('--phoneme_train_directory',
|
|
|
+ required=True,
|
|
|
+ help="Dataset containing the train files in phonemes (dot one_sentence_per_line) "
|
|
|
+ )
|
|
|
+ parser.add_argument('--word_train_directory',
|
|
|
+ required=True,
|
|
|
+ help="Dataset containing the train files in words (dot one_sentence_per_line) "
|
|
|
+ )
|
|
|
+ parser.add_argument('--models_directory',
|
|
|
+ required=True,
|
|
|
+ help="Folder containing the estimated parameters"
|
|
|
+ )
|
|
|
+
|
|
|
+ parser.add_argument('--json_files_directory',
|
|
|
+ required=True,
|
|
|
+ help="Directory containing json files for test"
|
|
|
+ )
|
|
|
+
|
|
|
+ parser.add_argument('--out_dirname',
|
|
|
+ required=True,
|
|
|
+ help="Out directory"
|
|
|
+ )
|
|
|
+ parser.add_argument('--out_filename',
|
|
|
+ required=True,
|
|
|
+ help="Out filename"
|
|
|
+ )
|
|
|
+ parser.add_argument("--add_noise", action=BooleanOptionalAction)
|
|
|
+
|
|
|
+ args = parser.parse_args()
|
|
|
+ add_noise = args.add_noise
|
|
|
+ json_files_directory = args.json_files_directory
|
|
|
+ phoneme_train_files, word_train_files = args.phoneme_train_directory, args.word_train_directory
|
|
|
+ models_directory = args.models_directory
|
|
|
+ out_dirname = args.out_dirname
|
|
|
+ out_filename = args.out_filename
|
|
|
+
|
|
|
+ if not os.path.exists("results"):
|
|
|
+ os.makedirs("results")
|
|
|
+ test(json_files_directory,
|
|
|
+ models_directory,
|
|
|
+ phoneme_train_files,
|
|
|
+ word_train_files,
|
|
|
+ add_noise).to_csv(f"{out_dirname}/{out_filename}.csv")
|