#!/usr/bin/env python3 # -*- coding: utf-8 -*- import pandas as pd import numpy as np import scipy.io as sio import matplotlib as mpl import matplotlib.pyplot as plt import os import re FS = 192e3 DEAF_BATS = {"b3", "b5", "b8"} def calculate_f0_slope(f0s): unmasked_indices, = np.where(~f0s.mask) if len(unmasked_indices) >= 2: first_index, last_index = unmasked_indices[0], unmasked_indices[-1] first_f0, last_f0 = f0s[first_index], f0s[last_index] return (last_f0 - first_f0) / (last_index - first_index) * 6000 # factor 6000: converts to Hz/seconds else: return np.nan def process_file(session_id, animals, session_start_time, filename): data = sio.loadmat(filename) if data["result"].size == 0: return pd.DataFrame() data = data["result"][0, 0] start_times = session_start_time + pd.to_timedelta( np.concatenate(data["call"][0, :]["call_start_sample"], axis=1)[0] / FS, unit="s") valid_indices = np.where(list(map(lambda x: x.shape[1] != 0, data["call"][0, :]["call_levels"])))[0] start_samples = np.concatenate(data["call"][0, valid_indices]["call_start_sample"], axis=1)[0] call_durations = np.concatenate(data["call"][0, valid_indices]["call_dur"], axis=1)[0] f_mins = np.concatenate(data["call"][0, valid_indices]["Fmin"], axis=1)[0] f_maxs = np.concatenate(data["call"][0, valid_indices]["Fmax"], axis=1)[0] f0s = [np.ma.masked_invalid(it[:, 1]) for it in data["call"][0, valid_indices]["f0"]] f0s_compressed = [it.compressed() for it in f0s] f0s_start = [it[np.isfinite(it)][0] if len(it) > 0 else np.nan for it in f0s_compressed] f0s_end = [it[np.isfinite(it)][-1] if len(it) > 0 else np.nan for it in f0s_compressed] f0s_slope = [calculate_f0_slope(it) for it in f0s] aperiodicities = [np.ma.masked_invalid(it[:, 2]).compressed() for it in data["call"][0, valid_indices]["f0"]] spectral_centroid = np.concatenate(data["call"][0, valid_indices]["SCF"], axis=1)[0] if animals: call_levels = np.concatenate(data["call"][0, valid_indices]["call_levels"]) level_differences = np.mean(call_levels[:, [1, 4, 5]], axis=1) - np.mean(call_levels[:, [0, 2, 3]], axis=1) calling_bat = np.where(level_differences > 0, animals[1], animals[0]) other_bat = np.where(level_differences > 0, animals[0], animals[1]) else: level_differences = np.nan calling_bat = other_bat = "" call_rms = [10*np.log10(np.mean(it**2)) for it in data["call"][0, valid_indices]["loudest_call"]] return pd.DataFrame(dict(session_id=session_id, call_id=np.arange(len(start_times)), start_sample=start_samples, start_time=start_times, calling_bat=calling_bat, other_bat=other_bat, level_difference=level_differences, call_rms=call_rms, call_duration=call_durations, f_min=f_mins, f_max=f_maxs, mean_aperiodicity=[np.mean(it) for it in aperiodicities], f0_mean=[np.mean(it) for it in f0s], f0_min=[np.min(it) for it in f0s], f0_max=[np.max(it) for it in f0s], f0_start=f0s_start, f0_end=f0s_end, f0_slope=f0s_slope, spectral_centroid=spectral_centroid)) ### # Pup sessions ### PUP_RESULTS_ROOT_DIR = "../raw_data/pups/results" sessions = [] calls = [] dn_re = re.compile(r"^vpl_(b\d)(..)$") fn_re = re.compile(r"^vpl_...._(\d\d)-(...)-(\d\d\d\d)_(\d\d)x(\d\d)x(\d\d)_m1.mat$") session_id = 0 dlist = sorted(os.listdir(PUP_RESULTS_ROOT_DIR)) for i, dn in enumerate(dlist): dpath = os.path.join(PUP_RESULTS_ROOT_DIR, dn) dn_mt = dn_re.match(dn) if not dn_mt: continue animals = [dn_mt.group(1), dn_mt.group(2)] for fn in sorted(os.listdir(dpath)): fn_mt = fn_re.match(fn) if not fn_mt: continue fpath = os.path.join(dpath, fn) print("\r{}/{}: {}... ".format(i + 1, len(dlist), fpath), end="") start_time = pd.to_datetime("{} {} {} {}:{}:{}".format(*[fn_mt.group(1 + i) for i in range(6)])) df = process_file(session_id, animals, start_time, fpath) calls.append(df) sessions.append((session_id, animals[0], animals[1], start_time)) session_id += 1 calls = pd.concat(calls) sessions = pd.DataFrame(sessions, columns=["session_id", "animal1", "animal2", "start_time"]) sessions.set_index("session_id", inplace=True) calls.set_index(["session_id", "call_id"], inplace=True) calls.insert(calls.columns.get_loc("level_difference"), "calling_bat_deaf", calls["calling_bat"].isin(DEAF_BATS)) sorted_sessions = sessions[sessions["animal2"].str.startswith("m")].sort_values("start_time") sessions["before_deafening"] = False calls.insert(calls.columns.get_loc("level_difference"), "before_deafening", False) for pup, sessions_per_pup in sorted_sessions.groupby("animal1"): first_id = sessions_per_pup.index[0] sessions.loc[first_id, "before_deafening"] = True calls.loc[calls.index.get_level_values(0) == first_id, "before_deafening"] = True calls.insert(calls.columns.get_loc("level_difference"), "calling_bat_mother", calls["calling_bat"].str.startswith("m")) for bool_column in ["before_deafening"]: sessions[bool_column] = sessions[bool_column].astype(np.int) for bool_column in ["calling_bat_deaf", "before_deafening", "calling_bat_mother"]: calls[bool_column] = calls[bool_column].astype(np.int) sessions.to_csv("../pup_sessions.csv") calls.to_csv("../pup_calls.csv") print("\npups done") ### # Adults ### ADULT_RESULTS_ROOT_DIR = "../raw_data/adults/" sessions = [] calls = [] fn_re = re.compile(r"^(\d\d\d\d)_(\d\d)_(\d\d)_(\d\d)_(\d\d)_(\d\d)_call_parameters.mat$") session_id = 0 for dn in ["deaf", "hearing"]: dpath = os.path.join(ADULT_RESULTS_ROOT_DIR, dn) for fn in sorted(os.listdir(dpath)): fn_mt = fn_re.match(fn) if not fn_mt: continue print("\r{} {}... ".format(session_id, fn), end="") fpath = os.path.join(dpath, fn) groups = list(map(int, fn_mt.groups())) start_time = pd.Timestamp(year=groups[0], month=groups[1], day=groups[2], hour=groups[3], minute=groups[4], second=groups[5]) df = process_file(session_id, None, start_time, fpath) if len(df) == 0: continue df.insert(df.columns.get_loc("level_difference"), "calling_bat_deaf", int(dn == "deaf")) calls.append(df) sessions.append((session_id, dn, start_time)) session_id += 1 calls = pd.concat(calls) calls.set_index(["session_id", "call_id"], inplace=True) calls.insert(calls.columns.get_loc("level_difference"), "before_deafening", 0) calls.insert(calls.columns.get_loc("level_difference"), "calling_bat_mother", 0) sessions = pd.DataFrame(sessions, columns=["session_id", "group", "start_time"]) sessions.set_index(["session_id"], inplace=True) sessions.to_csv("../adult_sessions.csv") calls.to_csv("../adult_calls.csv") print("\nadults done") # vim:sw=4:sts=4:et: