#!/usr/bin/env python # coding: utf-8 import sys import os import io import subprocess import numpy as np import pandas as pd import matplotlib as mpl import matplotlib.pyplot as plt import scipy.stats as sps import scipy.io.wavfile as siowav import scipy.signal as sps os.makedirs("../output", exist_ok=True) ### # Global plot settings mpl.rcParams["font.family"] = "TeX Gyre Pagella" mpl.rcParams["font.size"] = 8 rst_column_width = 3.3 rst_total_width = 7 ### # Animals ordered_deaf_pups = ["b3", "b5", "b8"] deaf_pups = set(ordered_deaf_pups) ordered_hearing_pups = ["b2", "b4", "b7"] hearing_pups = set(ordered_hearing_pups) all_pups = deaf_pups.union(hearing_pups) bat_birthdates = pd.DataFrame([("b2", "2017/01/26"), ("b4", "2017/01/30"), ("b7", "2017/02/08"), ("b3", "2017/01/29"), ("b5", "2017/02/02"), ("b8", "2017/02/08")], columns=["bat", "birthdate"]) bat_birthdates["birthdate"] = pd.to_datetime(bat_birthdates["birthdate"]) bat_birthdates.set_index("bat", inplace=True) ### # Group colors and identity markers group_colors = dict(hearing="#e1be6a", deaf="#40b0a6") bat_colors = {**{bat: group_colors["deaf"] for bat in deaf_pups}, **{bat: group_colors["hearing"] for bat in hearing_pups}} bat_colors["b4"] = "#9b803e" bat_markers = dict(zip(sorted(all_pups), ["P", "o", "*", "s", "X", "D"])) ### # Load data for juveniles pup_sessions = pd.read_csv("../pup_sessions.csv", index_col=0, parse_dates=["start_time"]) pup_calls = pd.read_csv("../pup_calls.csv", index_col=[0, 1], parse_dates=["start_time"], na_values=["--"]) # filter out mothers pup_calls = pup_calls[pup_calls["calling_bat"].isin(all_pups)].reset_index() # determine ages (in full days) of the calling bats birth_dates = bat_birthdates.loc[pup_calls["calling_bat"]]["birthdate"].reset_index(drop=True) ages = pup_calls["start_time"] - birth_dates pup_calls["calling_bat_age_in_days"] = ages.dt.days pup_calls["calling_bat_age_in_weeks"] = ages.dt.days // 7 pup_calls["call_rms"] = pup_calls["call_rms"] + 55 pup_calls["calling_bat_deaf"] = pup_calls["calling_bat_deaf"] != 0 ### # Load data for adults def single_value(values): value_set = set(values) if len(value_set) != 1: raise ValueError("non-unity set") return next(iter(value_set)) adult_calls = pd.read_csv("../adult_calls.csv", parse_dates=["start_time"], na_values=["--"], dtype=dict(group="category", session_id="int"), index_col=["session_id", "call_id"]) adult_calls["recording_day"] = adult_calls["start_time"].dt.date adult_calls["call_rms"] -= adult_calls["call_rms"].min() adult_sessions = pd.read_csv("../adult_sessions.csv", parse_dates=["start_time"], dtype=dict(group="category"), index_col=0) adult_recording_days = adult_sessions[["group"]] \ .groupby(adult_sessions["start_time"].dt.date) \ .agg(single_value) adult_recording_days.index.name = "recording_day" ### # Data that is excluded from the analysis # 1. Possible echolocation calls (3 ms or shorter) pup_calls = pup_calls[pup_calls["call_duration"] > 3e-3] adult_calls = adult_calls[adult_calls["call_duration"] > 3e-3] # 2. Sessions that are shorter than 20 minutes session_gaps = pup_sessions.sort_values("start_time")["start_time"].diff() new_index = session_gaps.index.insert(0, -1) new_index = new_index[:-1] session_gaps.index = new_index session_gaps.drop(-1, inplace=True) ids_to_drop = session_gaps[session_gaps < pd.Timedelta(20, "m")].index pup_calls = pup_calls[~pup_calls["session_id"].isin(ids_to_drop)] pup_sessions = pup_sessions.drop(ids_to_drop) ### # Last two recordings with mother for pups that have more than others # (to ensure the same total recording time for each individual) sorted_pup_mother_sessions = pup_sessions[pup_sessions["animal2"].str.startswith("m")] \ .sort_values("start_time") session_counts_with_mothers = sorted_pup_mother_sessions["animal1"].value_counts() num_sessions_to_drop = session_counts_with_mothers - session_counts_with_mothers.min() ids_to_drop = set() for bat, drop_count in num_sessions_to_drop.iteritems(): if drop_count == 0: continue this_bat_sessions = sorted_pup_mother_sessions[sorted_pup_mother_sessions["animal1"] == bat] ids_to_drop = ids_to_drop.union(this_bat_sessions.tail(drop_count).index) pup_calls = pup_calls[~pup_calls["session_id"].isin(ids_to_drop)] pup_sessions = pup_sessions.drop(index=ids_to_drop) ### # Mark pre-deafening calls and sessions sorted_pup_mother_sessions = pup_sessions[pup_sessions["animal2"].str.startswith("m")] \ .sort_values("start_time") pup_calls["before_deafening"] = False pup_sessions["before_deafening"] = False for pup, sessions in sorted_pup_mother_sessions.groupby("animal1"): first_id = sessions.index[0] pup_sessions.loc[first_id, "before_deafening"] = True pup_calls.loc[pup_calls["session_id"] == first_id, "before_deafening"] = True ### # Compute activity statistics (separately for pups before and after deafening) calls_per_pup_and_session = pup_calls.reset_index() \ .groupby(["calling_bat", "session_id"])[["calling_bat"]].count() \ / (20*6) # per 10 seconds calls_per_pup_and_session.columns = ["calls_per_10s"] calls_per_pup_and_session.reset_index(inplace=True) session_dates = pup_sessions.loc[calls_per_pup_and_session["session_id"]]["start_time"] birth_dates = bat_birthdates.loc[calls_per_pup_and_session["calling_bat"]]["birthdate"] calls_per_pup_and_session["calling_bat_age_in_days"] = \ (session_dates.reset_index(drop=True) - birth_dates.reset_index(drop=True)).dt.days calls_per_pup_and_session["calling_bat_age_in_weeks"] = \ calls_per_pup_and_session["calling_bat_age_in_days"] // 7 calls_per_pup_and_session = \ pd.merge(calls_per_pup_and_session, pup_sessions[["before_deafening"]], left_on="session_id", right_index=True,) calls_per_pup_and_session.loc[:, "calling_bat_deaf"] = \ calls_per_pup_and_session["calling_bat"].isin(deaf_pups) dfs = [] for before_deafening in [False, True]: mask = pup_calls["before_deafening"] if not before_deafening: mask = ~mask gr = pup_calls[mask].groupby(["calling_bat", "calling_bat_age_in_weeks"]) gr = gr[["call_duration", "mean_aperiodicity", "f0_mean", "call_rms"]] params_per_pup_and_week = gr.median() params_per_pup_and_week["calls_this_week"] = gr.size() mask = calls_per_pup_and_session["before_deafening"] if not before_deafening: mask = ~mask gr = calls_per_pup_and_session[mask].groupby(["calling_bat", "calling_bat_age_in_weeks"]) params_per_pup_and_week["calls_per_10s"] = gr["calls_per_10s"].mean() params_per_pup_and_week.reset_index(inplace=True) params_per_pup_and_week["before_deafening"] = before_deafening dfs.append(params_per_pup_and_week) params_per_pup_and_week = pd.concat(dfs) gr = adult_calls.groupby(["group", "session_id"], observed=True) calls_per_adult_group_and_session = gr.size().to_frame(name="calls_per_10s").reset_index() ### # Quantities of extracted data print("Total calls by pups: {}\nCalls by deaf pups: {} ({:.1f} %)\nCalls by hearing pups: {} ({:.1f} %)".format( len(pup_calls), np.sum(pup_calls["calling_bat_deaf"]), np.sum(pup_calls["calling_bat_deaf"]) / len(pup_calls) * 100, np.sum(~pup_calls["calling_bat_deaf"]), np.sum(~pup_calls["calling_bat_deaf"] / len(pup_calls) * 100))) print("Total calls by adults: {}\nCalls by deaf adults: {} ({:.1f} %)\nCalls by hearing adults: {} ({:.1f} %)".format( len(adult_calls), np.sum(adult_calls["group"] == "deaf"), np.sum(adult_calls["group"] == "deaf") / len(adult_calls) * 100, np.sum(adult_calls["group"] == "hearing"), np.sum(adult_calls["group"] == "hearing") / len(adult_calls) * 100)) ### # Run the statistical evaluation pup_calls.to_csv("../output/pup_calls_for_model.csv", index=False) calls_per_pup_and_session.to_csv("../output/pup_activity_for_model.csv", index=False) subprocess.run(["Rscript", "analysis.R"]) ### # Load the p-value table generated by R pup_ps = pd.read_csv("../output/pup_model_results.csv", index_col="parameter") ### # Development trajectory of call parameters def plot_trajectories(ax, values, parameter, scale): line_artists = {} for bat, df in values.groupby("calling_bat"): bat_color, bat_marker = bat_colors[bat], bat_markers[bat] line_artist, = ax.plot(df["calling_bat_age_in_weeks"] + 1, df[parameter] * scale, c=bat_color, linewidth=0.8) ax.plot(df["calling_bat_age_in_weeks"] + 1, df[parameter] * scale, c=bat_color, marker=bat_marker, markersize=5, clip_on=False, markerfacecolor="none", linestyle="") line_artists[bat] = line_artist return line_artists def plot_histograms(ax, values, parameter, scale, y_range, p_values=None): data = {} if "group" in values: for group, hearing in [("deaf", False), ("hearing", True)]: data[hearing] = (values.loc[values["group"] == group, parameter] * scale).values else: for hearing, df in values.groupby(values["calling_bat"].isin(hearing_pups)): data[hearing] = (df[parameter] * scale).values ax.hist([data[True], data[False]], density=True, range=y_range, bins=15, color=[group_colors["hearing"], group_colors["deaf"]], orientation="horizontal", rwidth=0.85) if p_values is not None: p = p_values.loc[parameter, "p_group_adjusted"] if p < 0.05: ax.annotate("*", (0.5, 0.6), xycoords="axes fraction", ha="center") def plot_individual_boxplots(ax, values, parameter, scale, annotate=False, p_values=None): data = [] bat_groups = [] bats = [] for group, individuals in [("deaf", ordered_deaf_pups), ("hearing", ordered_hearing_pups)]: for bat in individuals: data.append(scale * values.loc[values["calling_bat"] == bat, parameter].dropna().values) bats.append(bat) bat_groups.append(group) artists = ax.boxplot(data, vert=True, manage_ticks=False, showfliers=False, widths=0.2, positions=np.arange(len(data)) * 0.3, patch_artist=True, boxprops=dict(linewidth=0.5), whiskerprops=dict(linewidth=0.5), medianprops=dict(linewidth=1, color="#8c1010"), capprops=dict(linewidth=0.5)) for artist, bat in zip(artists["boxes"], bats): artist.set_facecolor(bat_colors[bat]) if annotate: for i, (bat_group, bat) in enumerate(zip(bat_groups, bats)): xy_top = artists["whiskers"][2 * i + 1].get_xydata()[1] ax.plot(xy_top[0], xy_top[1] + 10, marker=bat_markers[bat], c=bat_colors[bat], markerfacecolor="none", markersize=3, markeredgewidth=0.5) if p_values is not None: p = p_values.loc[parameter, "p_group_adjusted"] if p < 0.05: ax.annotate("*", (0.5, 0.9), xycoords="axes fraction", ha="center") # In[35]: def plot_pooled_boxplots(ax, values, parameter, scale): data = [] for group in ["deaf", "hearing"]: data.append(scale * values.loc[values["group"] == group, parameter].dropna().values) artists = ax.boxplot(data, vert=True, manage_ticks=False, showfliers=False, widths=0.75, positions=np.arange(len(data)), patch_artist=True, boxprops=dict(linewidth=0.5), whiskerprops=dict(linewidth=0.5), medianprops=dict(linewidth=1, color="#8c1010"), capprops=dict(linewidth=0.5)) for artist, group in zip(artists["boxes"], ["deaf", "hearing"]): artist.set_facecolor(group_colors[group]) call_parameters = [("calls_per_10s", 1, "Vocal activity\n[calls/10 s]", (0, 100), 20), ("call_rms", 1, "Amplitude\n[dB]", (0, 54), 12), ("call_duration", 1e3, "Duration\n[ms]", (0, 80), 20), ("f0_mean", 1e-3, "Fundamental\nfrequency [kHz]", (0, 40), 10), ("mean_aperiodicity", 1, "Aperiodicity\n[1]", (0, 1), 0.25)] fig_width = rst_total_width fig_height = 0.7 * fig_width left_margin, right_margin = 0.75, 0.1 bottom_margin, top_margin = 0.45, 0.45 spacing = 0.2 row_height = (fig_height - bottom_margin - top_margin - (len(call_parameters) - 1) * spacing) / len(call_parameters) pup_boxplot_width = 0.5 extra_adult_spacing = 0.1 adult_boxplot_width = 0.2 trajectory_width = (fig_width - left_margin - right_margin - 2 * pup_boxplot_width - adult_boxplot_width - 3 * spacing - 2 * extra_adult_spacing) fig = plt.figure(figsize=(fig_width, fig_height)) roman_numerals = ["I", "II", "III", "IV", "V"] for i, (parameter, scale, y_label, y_range, y_tick_interval) in enumerate(reversed(call_parameters)): early_ax = fig.add_axes([left_margin / fig_width, (bottom_margin + i * (row_height + spacing)) / fig_height, pup_boxplot_width / fig_width, row_height / fig_height]) late_ax = fig.add_axes([(left_margin + pup_boxplot_width + spacing) / fig_width, (bottom_margin + i * (row_height + spacing)) / fig_height, pup_boxplot_width / fig_width, row_height / fig_height]) adult_ax = fig.add_axes([(left_margin + 2 * pup_boxplot_width + 2 * spacing + extra_adult_spacing) / fig_width, (bottom_margin + i * (row_height + spacing)) / fig_height, adult_boxplot_width / fig_width, row_height / fig_height]) trajectory_ax = fig.add_axes([(left_margin + 2 * pup_boxplot_width + adult_boxplot_width + 3 * spacing + 2 * extra_adult_spacing) / fig_width, (bottom_margin + i * (row_height + spacing)) / fig_height, trajectory_width / fig_width, row_height / fig_height]) overall_ax = fig.add_axes([left_margin / fig_width, (bottom_margin + i * (row_height + spacing)) / fig_height, (fig_width - left_margin - right_margin) / fig_width, row_height / fig_height], frameon=False) overall_ax.set_xticks([]) overall_ax.set_yticks([]) if parameter == "calls_per_10s": df = calls_per_pup_and_session else: df = pup_calls plot_individual_boxplots(early_ax, df.query("before_deafening"), parameter, scale, annotate=(i == len(call_parameters) - 1), p_values=None) plot_individual_boxplots(late_ax, df.query("not before_deafening"), parameter, scale, annotate=(i == len(call_parameters) - 1), p_values=pup_ps) if parameter == "calls_per_10s": df = calls_per_adult_group_and_session else: df = adult_calls plot_pooled_boxplots(adult_ax, df, parameter, scale) adult_ax.set_xlim(-0.75, 1.75) plot_trajectories(trajectory_ax, params_per_pup_and_week.query("not before_deafening " "and calling_bat_age_in_weeks <= 24"), parameter, scale) trajectory_ax.xaxis.set_major_locator(mpl.ticker.MultipleLocator(4)) trajectory_ax.set_xlim(2, 25) trajectory_ax.spines["left"].set_position(("outward", 3)) if i == 0: trajectory_ax.set_xlabel("Week of age") trajectory_ax.xaxis.set_major_formatter( mpl.ticker.FuncFormatter(lambda x, pos: "{}th".format(int(x)))) else: for ax in [early_ax, late_ax, trajectory_ax]: ax.set_xticklabels([]) if i == len(call_parameters) - 1: early_ax.set_title("a)\n< 2 weeks\n", fontsize=8) late_ax.set_title("b)\n2–25 weeks\n", fontsize=8) adult_ax.set_title("c)\n2 years\n", fontsize=8) trajectory_ax.set_title("d)\nDevelopment trajectory of median parameter values" "\nat 2–25 weeks of age", fontsize=8) early_ax.annotate("hearing", xy=(0, 1), xycoords="axes fraction", xytext=(5, -10), textcoords="offset points", verticalalignment="top", horizontalalignment="left", color=group_colors["hearing"]) early_ax.annotate("deafened", xy=(0, 1), xycoords="axes fraction", xytext=(5, 0), textcoords="offset points", verticalalignment="top", horizontalalignment="left", color=group_colors["deaf"]) for ax in [early_ax, late_ax, adult_ax, trajectory_ax]: if ax is adult_ax and parameter == "calls_per_10s": ax.set_ylim(0, 400) ax.yaxis.set_major_locator(mpl.ticker.MultipleLocator(100)) else: ax.set_ylim(y_range[0], y_range[1]) ax.yaxis.set_major_locator(mpl.ticker.MultipleLocator(y_tick_interval)) for spine in ["right", "top"]: ax.spines[spine].set_visible(False) ax.spines["bottom"].set_position(("outward", 3)) for ax in [late_ax, adult_ax, trajectory_ax]: if (ax is not adult_ax and ax is not trajectory_ax) or parameter != "calls_per_10s": ax.set_yticklabels([]) for ax in [early_ax, late_ax, adult_ax]: ax.set_xticks([]) ax.spines["bottom"].set_visible(False) early_ax.set_ylabel(y_label) early_ax.yaxis.set_label_coords(-0.32 / pup_boxplot_width, 0.5) overall_ax.set_ylabel(roman_numerals[-i - 1], rotation="0", verticalalignment="center", labelpad=49) fig.savefig("../output/main_figure.pdf") ### # Parameter tables per_session_parameters = [("call_rms", 1, "Amplitude [dB]"), ("call_duration", 1e3, "Duration [ms]"), ("f0_min", 1e-3, "Minimum fundamental frequency [kHz]"), ("f0_mean", 1e-3, "Mean fundamental frequency [kHz]"), ("f0_max", 1e-3, "Maximum fundamental frequency [kHz]"), ("f0_start", 1e-3, "Fundamental frequency at call onset [kHz]"), ("f0_end", 1e-3, "Fundamental frequency at end of call [kHz]"), ("f_min", 1e-3, "Minimum frequency [kHz]"), ("f_max", 1e-3, "Maximum frequency [kHz]"), ("bandwidth", 1e-3, "Bandwidth [kHz]"), ("spectral_centroid", 1e-3, "Spectral centroid frequency [kHz]"), ("mean_aperiodicity", 1, "Aperiodicity [1]")] adult_calls.loc[:, "bandwidth"] = adult_calls["f_max"] - adult_calls["f_min"] pup_calls.loc[:, "bandwidth"] = pup_calls["f_max"] - pup_calls["f_min"] def q25(series): return series.quantile(0.25) def q75(series): return series.quantile(0.75) adult_groups = adult_calls[[p[0] for p in per_session_parameters] + ["group"]].groupby("group") adult_summary = adult_groups.agg([q25, "median", q75, "mean", "std"]) pup_groups = pup_calls.loc[~pup_calls["before_deafening"], [p[0] for p in per_session_parameters] + ["calling_bat_deaf"]].groupby("calling_bat_deaf") pup_summary = pup_groups.agg([q25, "median", q75, "mean", "std"]) pre_deafening_pup_groups = pup_calls.loc[pup_calls["before_deafening"], [p[0] for p in per_session_parameters] + ["calling_bat_deaf"]].groupby("calling_bat_deaf") pre_deafening_pup_summary = pre_deafening_pup_groups.agg([q25, "median", q75, "mean", "std"]) per_session_parameters.insert(0, ("calls_per_10s", 1, "Vocal activity [calls/10 s]")) adult_activity_summary = calls_per_adult_group_and_session.groupby("group")[["calls_per_10s"]] \ .agg([q25, "median", q75, "mean", "std"]) adult_summary = pd.concat([adult_activity_summary, adult_summary], axis=1) pup_activity_summary = calls_per_pup_and_session \ .query("not before_deafening") \ .groupby("calling_bat_deaf")[["calls_per_10s"]] \ .agg([q25, "median", q75, "mean", "std"]) pup_summary = pd.concat([pup_activity_summary, pup_summary], axis=1) pre_deafening_pup_activity_summary = calls_per_pup_and_session \ .query("before_deafening") \ .groupby("calling_bat_deaf")[["calls_per_10s"]] \ .agg([q25, "median", q75, "mean", "std"]) pre_deafening_pup_summary = \ pd.concat([pre_deafening_pup_activity_summary, pre_deafening_pup_summary], axis=1) pup_summary.rename({False: "hearing", True: "deaf"}, inplace=True) pre_deafening_pup_summary.rename({False: "hearing", True: "deaf"}, inplace=True) for group in ["hearing", "deaf"]: adult_summary.loc[group, ("count", "total")] = \ (adult_calls["group"] == group).sum() pup_summary.loc[group, ("count", "total")] = \ ((pup_calls["calling_bat_deaf"] == (group == "deaf")) & (~pup_calls["before_deafening"])).sum() pre_deafening_pup_summary.loc[group, ("count", "total")] = \ ((pup_calls["calling_bat_deaf"] == (group == "deaf")) & (pup_calls["before_deafening"])).sum() def write_parameters_table(summary, p_values): fout = io.StringIO() print("", file=fout) print("".format( int(summary.loc["hearing", ("count", "total")]), int(summary.loc["deaf", ("count", "total")])), file=fout, sep="") if p_values is not None: print("", file=fout, sep="") print("", file=fout) print("{}".format("" * 2), file=fout, end="") if p_values is not None: print("", file=fout, end="") print("", file=fout) for parameter_name, scale, parameter_description in per_session_parameters: print("".format(parameter_description), file=fout, end="") test_data = [] for group in ["hearing", "deaf"]: print("".format( summary.loc[group, (parameter_name, "median")] * scale, summary.loc[group, (parameter_name, "q25")] * scale, summary.loc[group, (parameter_name, "q75")] * scale), file=fout, end="") print("".format( summary.loc[group, (parameter_name, "mean")] * scale, summary.loc[group, (parameter_name, "std")] * scale), file=fout, end="") if p_values is not None: for p_name in ["p_group_adjusted", "p_age_adjusted", "p_interaction_adjusted"]: p = p_values.loc[parameter_name, p_name] if p < 0.05: print("", file=fout, end="") else: print("".format(p), file=fout, end="") print("", file=fout) print("
Hearing ({} vocalisations)Deaf ({} vocalisations)p < 0.05 (ANOVA)
Median (1st & 3rd quartile)Mean ± std. dev.Deafened/hearingAgeInteraction
{}{:.2f} ({:.2f}–{:.2f}){:.2f} ± {:.2f}*{:.2f}
", file=fout) return fout.getvalue() parameters_table = write_parameters_table(adult_summary, None) with open("../output/adult_parameters_table.html", "wt") as fout: print(parameters_table, file=fout, end="") parameters_table = write_parameters_table(pup_summary, pup_ps) with open("../output/pup_parameters_table.html", "wt") as fout: print(parameters_table, file=fout, end="") parameters_table = write_parameters_table(pre_deafening_pup_summary, None) with open("../output/pre_deafening_pup_parameters_table.html", "wt") as fout: print(parameters_table, file=fout, end="") print("Done, output files written to ../output/.")