123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- import os
- import sys
- import json
- sys.path.append("./")
- sys.path.append("../")
- sys.path.append(".../")
- from itertools import product
- from tqdm import tqdm
- import kenlm
- from math import log
- import numpy as np
- from make_noiser import Noise
- import pandas as pd
- import sys
- from get_most_probable_phonemes import get_most_probable_phonemes
- import random
- from collections import Counter
- random.seed(1023)
- LANGUAGES_TYPOLOGIES = {
- 'da' : ("Danish", "fusional"),
- 'de' : ("German", "fusional"),
- 'en' : ("English", "fusional"),
- 'es' : ("Spanish", "fusional"),
- 'et' : ("Estonian", "agglutinative"),
- 'eu' : ("Basque", "agglutinative"),
- 'fr' : ("French", "fusional"),
- 'ja' : ("Japanese", "agglutinative"),
- 'pl' : ("Polish", "fusional"),
- 'pt' : ("Portuguese", "fusional"),
- 'sr' : ("Serbian", "fusional"),
- 'tr' : ("Turkish", "agglutinative")}
- def compute_word_frequencies(word_train_corpus, pct=0.95) :
- frequencies = Counter()
- for line in word_train_corpus :
- line = line.strip()
- if not line : continue
- # line = line.strip()
- frequencies.update(Counter(line.split(" ")))
- return dict(frequencies)
- def statistics_word(utterances, word_frequencies, model) :
- phoneme_utterances = []
- unique_words = set()
- nb_unk = 0
- mlu_w = 0.0
- mlu_p = 0.0
- mean_word_frequencies = 0
- nb_utterances = 0
- nb_words = 0
- statistics = {}
- for utterance in utterances :
- utterance = utterance.strip()
- if not utterance : continue
- nb_utterances += 1
- utterance_w = utterance.replace("@", " ").replace("$", "")
- utterance_p = utterance.replace("@", " ").replace("$", " ")
- phoneme_utterances.append(utterance_p)
- utterance_words = utterance_w.split(" ")
- mlu_w += len(utterance_words)
- mlu_p += len(utterance_p.split(" "))
- nb_words += len(utterance_words)
- unique_words |= set(utterance_words)
- for word in utterance_words :
- word = word.strip()
- if word in word_frequencies :
- mean_word_frequencies += word_frequencies[word]
- else :
- nb_unk += 1
-
- mlu_w /= nb_utterances
- mlu_p /= nb_utterances
- ttr_w = len(unique_words) / nb_words
-
- ppl = model.perplexity("\n".join(phoneme_utterances))
- entropy = log(ppl)
- statistics["ppl"] = ppl
- statistics["entropy"] = entropy
- statistics["mlu_w"] = mlu_w
- statistics["mlu_p"] = mlu_p
- statistics["ttr_w"] = ttr_w
- statistics["mean_word_frequencies"] = mean_word_frequencies
- statistics["nb_unk"] = nb_unk
- return statistics
- def create_sparse_combinantions(values) :
- sparse_combinantions = []
- for value in values :
- for idx in range(len(values)) :
- sparse_values = [0.0] * len(values)
- sparse_values[idx] = value
- sparse_combinantions.append(tuple(sparse_values))
- return set(sparse_combinantions)
- def test(json_files_directory, models_directory, phoneme_train_files, word_train_files, add_noise=False) :
- """
- """
- columns = ["language", "typology", "family", "speaker",\
- "age", "perplexity", "entropy", "mlu", "mlu_without_repetition",\
- "phonemes_order_noise", "speakers_noise_adult",\
- "speakers_noise_child", "phonemes_noise"]
- results = pd.DataFrame(columns=columns, index=None)
- all_combinations = list(product((0.0, 0.25, 0.5, 0.75), repeat=4)) if add_noise else [((0.0, 0.0, 0.0, 0.0))]
- # sparse_combinantions = create_sparse_combinantions((0.0, 0.25, 0.5, 0.75))
- # noise_values = np.linspace(0.0, 1.0, num=6)
- for phonemes_noise, speakers_noise_child, speakers_noise_adult, phonemes_order_noise in tqdm(all_combinations, total=len(all_combinations)) :
- for test_filename, model_filename in product(os.listdir(json_files_directory), os.listdir(models_directory)) :
- lg_iso, _ = test_filename.split(".")
- model_lg = model_filename.split(".")[0]
- if lg_iso != model_lg : continue
- print(lg_iso, model_lg)
- most_probable_phonemes = get_most_probable_phonemes(f"{phoneme_train_files}/{lg_iso}.one_sentence_per_line")
- word_frequencies = compute_word_frequencies(f"{word_train_files}/{lg_iso}.one_sentence_per_line")
- loaded_json = json.load(open(f"{json_files_directory}/{test_filename}"))
- if add_noise :
- noise = Noise(most_probable_phonemes,
- phonemes_order_noise=phonemes_order_noise,
- speakers_noise=(speakers_noise_child, speakers_noise_adult),
- phonemes_noise=phonemes_noise)
- loaded_json = noise(loaded_json)
- model = kenlm.Model(f"{models_directory}/{model_filename}")
- for family in loaded_json :
- for age in loaded_json[family] :
- if age == "None" : print(family, lg_iso, age); continue
- for speaker in loaded_json[family][age] :
- if speaker not in ["Adult", "Target_Child"] : continue
- # test_utterances = "\n".join(loaded_json[family][age][speaker])
- # utterances = [utterance.split(" ") for utterance in loaded_json[family][age][speaker]]
- # mlu = np.mean([len(utterance) for utterance in utterances])
- # mlu_without_repetition = np.mean([len(set(utterance)) for utterance in utterances])
- # ppl = model.perplexity(test_utterances)
- # entropy = log(ppl)
- results_statistics = statistics_word(loaded_json[family][age][speaker], word_frequencies, model)
- language, typology = LANGUAGES_TYPOLOGIES[lg_iso]
- new_row = {"language" : language,
- "typology" : typology,
- "family" : family,
- "speaker" : speaker,
- "age" : float(age),
- "perplexity" : results_statistics["ppl"],
- "entropy" : results_statistics["entropy"],
- "mlu_w" : results_statistics["mlu_w"],
- "mlu_p" : results_statistics["mlu_p"],
- "ttr_w" : results_statistics["ttr_w"],
- "mean_word_frequencies" : results_statistics["mean_word_frequencies"],
- "nb_unk" : results_statistics["nb_unk"],
- "phonemes_order_noise" : phonemes_order_noise,
- "speakers_noise_adult" : speakers_noise_adult,
- "speakers_noise_child" : speakers_noise_child,
- "phonemes_noise" : phonemes_noise}
- results = results.append(new_row, ignore_index=True)
- return results
- if __name__ == "__main__":
- from argparse import ArgumentParser, BooleanOptionalAction
- parser = ArgumentParser()
- parser.add_argument('--phoneme_train_directory',
- required=True,
- help="Dataset containing the train files in phonemes (dot one_sentence_per_line) "
- )
- parser.add_argument('--word_train_directory',
- required=True,
- help="Dataset containing the train files in words (dot one_sentence_per_line) "
- )
- parser.add_argument('--models_directory',
- required=True,
- help="Folder containing the estimated parameters"
- )
-
- parser.add_argument('--json_files_directory',
- required=True,
- help="Directory containing json files for test"
- )
-
- parser.add_argument('--out_dirname',
- required=True,
- help="Out directory"
- )
- parser.add_argument('--out_filename',
- required=True,
- help="Out filename"
- )
- parser.add_argument("--add_noise", action=BooleanOptionalAction)
- args = parser.parse_args()
- add_noise = args.add_noise
- json_files_directory = args.json_files_directory
- phoneme_train_files, word_train_files = args.phoneme_train_directory, args.word_train_directory
- models_directory = args.models_directory
- out_dirname = args.out_dirname
- out_filename = args.out_filename
- if not os.path.exists("results"):
- os.makedirs("results")
- test(json_files_directory,
- models_directory,
- phoneme_train_files,
- word_train_files,
- add_noise).to_csv(f"{out_dirname}/{out_filename}.csv")
|