123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- #!usr/bin/env python
- # -*- coding: utf8 -*-
- import csv
- # -----------------------------------------------------------------------------
- # File: messages.py (as part of project URUMETRICS)
- # Created: 29/07/2022 15:35
- # Last Modified: 29/07/2022 15:35
- # -----------------------------------------------------------------------------
- # Author: William N. Havard
- # Postdoctoral Researcher
- #
- # Mail : william.havard@ens.fr / william.havard@gmail.com
- #
- # Institution: ENS / Laboratoire de Sciences Cognitives et Psycholinguistique
- #
- # ------------------------------------------------------------------------------
- # Description:
- # •
- # -----------------------------------------------------------------------------
- import logging
- import os
- import math
- from datetime import datetime
- import pandas as pd
- import yaml
- from ChildProject.annotations import AnnotationManager
- from ChildProject.projects import ChildProject
- logger = logging.getLogger(__name__)
- def _read_yaml(yaml_path):
- with open(yaml_path, 'r') as in_yaml:
- data = yaml.load(in_yaml, Loader=yaml.FullLoader)
- return data
- def get_metrics(project_path, metrics_file):
- """
- given a dataset and an output metrics file
- return a merge of the metrics and recording info, as well as a list of all the metrics labels
- :param project_path: path to the dataset
- :type project_path: str
- :param metrics_file: path to the metrics csv file
- :type metrics_file: str
- :return: dataframe of metrics, existing metrics columns (columns of the dataframe minus metadata columns)
- :rtype: pandas.Dataframe, list[str]
- """
- project = ChildProject(project_path)
- am = AnnotationManager(project)
- am.read()
- # Read metrics file and get metrics columns
- metrics = pd.read_csv(metrics_file)
- metrics_columns = list(set(metrics.columns) - set(['recording_filename', 'child_id']))
- # Merge with recordings to get date_iso
- metrics_recordings = pd.merge(metrics, project.recordings, on='recording_filename', suffixes=('', '_drop'))
- metrics_recordings.drop([col for col in metrics_recordings.columns if 'drop' in col], axis=1, inplace=True)
- # Handle file with the same child_id that have the same date -> keep the longest one
- metrics_recordings = (metrics_recordings.groupby(['child_id', 'date_iso'], as_index=False)
- # Keep only the first segment for each candidate speaker
- .apply(lambda rows: (rows.sort_values(by='start_time', ascending=False) # take last instead
- .head(n=1))))
- return metrics_recordings, metrics_columns
- def fill_template(template_key, messages, metrics_evolution, date, start_date, end_date):
- """
- given the full list of templates, a template key and a measure of the evolution of the metric,
- returns the wanted template filled with the correct evolution indication
- :param template_key: positive or negative evolution of metrics, used to select a master template
- :type template_key: [bool, bool]
- :param messages: dictionary of templates (taken from yaml file)
- :type messages: dict
- :param metrics_evolution: list of tuple containing each metric and its evolution
- :type metrics_evolution: [(str, float, bool),(str, float, bool)]
- :param date: date for wich to fill the template
- :type date: datetime.datetime
- :param start_date: date of the start of the experiment
- :type start_date: datetime.datetime
- :param duration_experiment: total duration of the experiment in days
- :type duration_experiment: int
- :return: filled template
- :rtype: str
- """
-
- template = messages['templates']['_{}_{}'.format(*template_key)]
- for positivity_item_index, (positivity_item, _, positivity_direction) in enumerate(metrics_evolution, 1):
- template = template.replace('#{}'.format(positivity_item_index),
- messages['metrics'][positivity_item][positivity_direction])
- # message_variables = [msg for msg in messages if msg.startswith('_')]
- # for message_variable in message_variables:
- # message_variable = message_variable[1:]
- # template = template.replace('#{}'.format(message_variable),
- # messages['_{}'.format(message_variable)])
- # template = template.capitalize()
-
- for msg_key in messages['others']['fixed']:
- template = template.replace('#{}'.format(msg_key),messages['others']['fixed'][msg_key])
-
- if end_date <= start_date : raise ValueError('start_date {} should be before end_date {}'.format(start_date,end_date))
- total_days = end_date - start_date
- time_elapsed = date.date() - start_date.date()
-
- for msg_key in messages['others']['variable']:
- nb_keys = len(messages['others']['variable'][msg_key].keys())
- if nb_keys == 0 : raise ValueError('Message must have at least one version')
- msg_fraction = 1 / nb_keys
- time_fraction = time_elapsed.days / total_days.days
- index = math.ceil(time_fraction / msg_fraction)
- if index < 1 :index = 1 #if before the start of the experiment, use the first message
- if index > nb_keys : index = nb_keys #if after the end of the experiment, use last message
- #index = str(index)
-
- if index not in messages['others']['variable'][msg_key].keys() : raise ValueError('Could not find message : <others: variable : {} : {}> in yaml. Messages listed in a variable sentence shoud be 1, 2, ...'.format(msg_key,index))
-
- template = template.replace('#{}'.format(msg_key), messages['others']['variable'][msg_key][index])
-
- return template
- def fill_default(messages, date, start_date, end_date):
- """
- given the full list of templates, the date to consider and the dates of the experiment,
- takes the default messages template and fills it according to the keywords
- :param messages: dictionary of templates (taken from yaml file)
- :type messages: dict
- :param date: date for wich to fill the template
- :type date: datetime.datetime
- :param start_date: date of the start of the experiment
- :type start_date: datetime.datetime
- :param duration_experiment: total duration of the experiment in days
- :type duration_experiment: int
- :return: filled default message
- :rtype: str
- """
- template = messages['others']['fixed']['default']
-
- for msg_key in messages['others']['fixed']:
- if msg_key == 'default': continue #skip default message as it is our base template (could lead to infinite loop of filling templates otherwise)
- template = template.replace('#{}'.format(msg_key),messages['others']['fixed'][msg_key])
-
- if end_date <= start_date : raise ValueError('start_date {} should be before end_date {}'.format(start_date,end_date))
- total_days = end_date - start_date
- time_elapsed = date.date() - start_date.date()
-
- for msg_key in messages['others']['variable']:
- nb_keys = len(messages['others']['variable'][msg_key].keys())
- if nb_keys == 0 : raise ValueError('Message must have at least one version')
- msg_fraction = 1 / nb_keys
- time_fraction = time_elapsed.days / total_days.days
- index = math.ceil(time_fraction / msg_fraction)
- if index < 1 :index = 1 #if before the start of the experiment, use the first message
- if index > nb_keys : index = nb_keys #if after the end of the experiment, use last message
- #index = str(index)
-
- if index not in messages['others']['variable'][msg_key].keys() : raise ValueError('Could not find message : <others: variable : {} : {}> in yaml. Messages listed in a variable sentence shoud be 1, 2, ...'.format(msg_key,index))
-
- template = template.replace('#{}'.format(msg_key), messages['others']['variable'][msg_key][index])
-
- return template
- def build_messages(metrics_recordings, metrics_columns, message_file_path, date):
- """
- from a datadrame of computed metrics and a date, computes the evolution of all metrics,
- computes the metric evolution between the considered date and the previous one,
- then builds the messages to output based on those evolution
- :param metrics_recordings: Dataframe of the metrics computed for the dataset
- :type metrics_recordings: pandas.Dataframe
- :param metrics_columns: list of all metrics labels present in the file
- :type metrics_columns: list[str]
- :param message_file_path: path to the yaml where templates are stored
- :type message_file_path: str
- :param date: date for which to build the messages, row belonging to another date will not generate messages
- :type date: str
- :return: dataframe containing messages associated with a recording filename
- :rtype: pandas.Dataframe
- """
- og_date = date
- try:
- date_time = datetime.strptime(date, "%Y%m%d")
- date = date_time.strftime("%Y-%m-%d")
- except:
- raise ValueError('--date format should be YYYYMMDD without any separators.')
- # Get metrics of interest and messages
- metric_messages = _read_yaml(message_file_path)
- metrics_of_interest = [item for item in list(metric_messages['metrics'].keys())]
-
- experiment_start_date = datetime.strptime(metric_messages['start_date'], "%Y-%m-%d")
- experiment_end_date = datetime.strptime(metric_messages['end_date'], "%Y-%m-%d")
- # Keep only rows for which the date is below or equal to the one we want
- metrics_recordings = metrics_recordings[metrics_recordings['date_iso'] <= date]
- # Generate messages
- output_messages = []
- metrics_grouped = metrics_recordings.groupby('child_id', as_index=False)
- for _ignored_child_id, metrics_grouped_item in metrics_grouped:
- sorted_metrics_grouped_items = metrics_grouped_item.sort_values(by=['date_iso', 'imported_at'],
- ascending=False)
- # If the first row is not the desired date, skip as no message was/will be generated for this family as
- # this recording is too old
- #if sorted_metrics_grouped_items.iloc[0]['date_iso'] != date:
- # continue
- #tmp fix where the date iso of the recording os not the date is was submitted, in this case,
- #we want to compute according to the reception date and note the recording date_iso
- if sorted_metrics_grouped_items.iloc[0]['recording_filename'].split('/')[0] != og_date:
- continue
- # Only one audio (first week), generated default message
- if len(metrics_grouped_item) == 1:
- recording_filename = metrics_grouped_item.iloc[0]['recording_filename']
- message = fill_default(metric_messages, date_time, experiment_start_date, experiment_end_date)
- #message = metric_messages['others']['fixed']['default']
- # More than one audio file: generate a message
- else:
- todays_row = sorted_metrics_grouped_items.iloc[0]
- previous_row = sorted_metrics_grouped_items.iloc[1]
- # Compute the difference between the two sets of metrics
- diff_metrics = (todays_row[metrics_columns] - previous_row[metrics_columns])[metrics_of_interest]
- diff_metrics = diff_metrics.to_dict()
- metrics_evolution = [(metric, diff_metrics[metric], diff_metrics[metric] > 0)
- for metric in metrics_of_interest]
- # Message sorting
- metrics_evolution = sorted(metrics_evolution, key=lambda tup: (abs(tup[1]), tup[2]))[:2]
- template_key = list([tpl_key for (_, _, tpl_key) in metrics_evolution])
- recording_filename = todays_row['recording_filename']
- message = fill_template(template_key, metric_messages, metrics_evolution, date_time, experiment_start_date, experiment_end_date)
- output_messages.append({'recording_filename': recording_filename,
- 'message': message})
- df_out = pd.DataFrame(output_messages)
- return df_out
- def generate_messages(project_path, metrics_file, message_definition, date):
- """
- given a path to a dataset, to a metrics file and a message definition yaml file,
- creates all the messages for a given date, then stores it in the dataset (extra/messages/generated/messages_YYYYMMDD.csv)
- :param project_path: path to childproject dataset
- :type project_path: str
- :param metrics_file: path to the extracted metrics
- :type metrics_file: str
- :param message_definition: path to the file defining the messages templates
- :type message_definition: str
- :param date: date in format YYYYMMDD for which to generate messages
- :type date: str
- """
- message_out_path = os.path.join(project_path, 'extra', 'messages', 'generated', 'messages_{}.csv'.format(date))
- message_out_dir = os.path.dirname(message_out_path)
- if not os.path.exists(message_out_dir):
- os.makedirs(message_out_dir)
- # Make sure we have all the files we need
- metrics_recordings, metrics_columns = get_metrics(project_path, metrics_file)
- messages = build_messages(metrics_recordings, metrics_columns, message_definition, date)
- if not os.path.exists(message_out_path):
- if len(messages):
- messages.to_csv(message_out_path, index=False, quoting=csv.QUOTE_NONNUMERIC,sep=";")
- logger.info('{} messages generated.'.format(len(messages)))
- else:
- logger.warning('No message needs to be generated for date {}.'.format(date))
- else:
- raise IOError('File {} already exists!'.format(message_out_path))
- def main(project_path, **kwargs):
- project_path = os.path.abspath(project_path)
- expected_metrics_file = os.path.join(project_path, 'extra', 'metrics', 'metrics.csv')
- expected_message_definition = os.path.join(project_path, 'extra', 'messages', 'definition', 'metrics_messages.yaml')
- assert os.path.exists(expected_metrics_file) and os.path.exists(expected_message_definition), \
- ValueError('Expected metrics ({}) and/or message definition file ({}) not found. Are you sure to be running this '
- 'command from the root of the data set?'.format(expected_metrics_file, expected_message_definition))
- generate_messages(project_path=project_path, metrics_file=expected_metrics_file,
- message_definition=expected_message_definition, **kwargs)
- def _parse_args(argv):
- import argparse
- parser = argparse.ArgumentParser(description='Generate feedback messages.')
- parser.add_argument('--project-path', required=False, type=str, default='',
- help="Path to a ChildProject/datalad project (useful for debugging purposes).")
- parser.add_argument('--date', type=str, default=datetime.now().strftime("%Y%m%d"),
- help='Date for which to generate messages.')
- args = parser.parse_args(argv)
- return vars(args)
- if __name__ == '__main__':
- import sys
- pgrm_name, argv = sys.argv[0], sys.argv[1:]
- args = _parse_args(argv)
- logging.basicConfig(level=logging.INFO)
- try:
- main(**args)
- sys.exit(0)
- except Exception as e:
- logger.exception(e)
- sys.exit(1)
|