123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import pandas as pd
- import numpy as np
- import scipy.io as sio
- import matplotlib as mpl
- import matplotlib.pyplot as plt
- import os
- import re
- FS = 192e3
- DEAF_BATS = {"b3", "b5", "b8"}
- def calculate_f0_slope(f0s):
- unmasked_indices, = np.where(~f0s.mask)
- if len(unmasked_indices) >= 2:
- first_index, last_index = unmasked_indices[0], unmasked_indices[-1]
- first_f0, last_f0 = f0s[first_index], f0s[last_index]
- return (last_f0 - first_f0) / (last_index - first_index) * 6000 # factor 6000: converts to Hz/seconds
- else:
- return np.nan
- def process_file(session_id, animals, session_start_time, filename):
- data = sio.loadmat(filename)
- if data["result"].size == 0:
- return pd.DataFrame()
- data = data["result"][0, 0]
- start_times = session_start_time + pd.to_timedelta(
- np.concatenate(data["call"][0, :]["call_start_sample"], axis=1)[0] / FS,
- unit="s")
- valid_indices = np.where(list(map(lambda x: x.shape[1] != 0, data["call"][0, :]["call_levels"])))[0]
- start_samples = np.concatenate(data["call"][0, valid_indices]["call_start_sample"], axis=1)[0]
- call_durations = np.concatenate(data["call"][0, valid_indices]["call_dur"], axis=1)[0]
- f_mins = np.concatenate(data["call"][0, valid_indices]["Fmin"], axis=1)[0]
- f_maxs = np.concatenate(data["call"][0, valid_indices]["Fmax"], axis=1)[0]
- f0s = [np.ma.masked_invalid(it[:, 1]) for it in data["call"][0, valid_indices]["f0"]]
- f0s_compressed = [it.compressed() for it in f0s]
- f0s_start = [it[np.isfinite(it)][0] if len(it) > 0 else np.nan for it in f0s_compressed]
- f0s_end = [it[np.isfinite(it)][-1] if len(it) > 0 else np.nan for it in f0s_compressed]
- f0s_slope = [calculate_f0_slope(it) for it in f0s]
- aperiodicities = [np.ma.masked_invalid(it[:, 2]).compressed() for it in data["call"][0, valid_indices]["f0"]]
- spectral_centroid = np.concatenate(data["call"][0, valid_indices]["SCF"], axis=1)[0]
- if animals:
- call_levels = np.concatenate(data["call"][0, valid_indices]["call_levels"])
- level_differences = np.mean(call_levels[:, [1, 4, 5]], axis=1) - np.mean(call_levels[:, [0, 2, 3]], axis=1)
- calling_bat = np.where(level_differences > 0, animals[1], animals[0])
- other_bat = np.where(level_differences > 0, animals[0], animals[1])
- else:
- level_differences = np.nan
- calling_bat = other_bat = ""
- call_rms = [10*np.log10(np.mean(it**2)) for it in data["call"][0, valid_indices]["loudest_call"]]
- return pd.DataFrame(dict(session_id=session_id,
- call_id=np.arange(len(start_times)),
- start_sample=start_samples,
- start_time=start_times,
- calling_bat=calling_bat,
- other_bat=other_bat,
- level_difference=level_differences,
- call_rms=call_rms,
- call_duration=call_durations,
- f_min=f_mins,
- f_max=f_maxs,
- mean_aperiodicity=[np.mean(it) for it in aperiodicities],
- f0_mean=[np.mean(it) for it in f0s],
- f0_min=[np.min(it) for it in f0s],
- f0_max=[np.max(it) for it in f0s],
- f0_start=f0s_start,
- f0_end=f0s_end,
- f0_slope=f0s_slope,
- spectral_centroid=spectral_centroid))
- ###
- # Pup sessions
- ###
- PUP_RESULTS_ROOT_DIR = "../raw_data/pups/results"
- sessions = []
- calls = []
- dn_re = re.compile(r"^vpl_(b\d)(..)$")
- fn_re = re.compile(r"^vpl_...._(\d\d)-(...)-(\d\d\d\d)_(\d\d)x(\d\d)x(\d\d)_m1.mat$")
- session_id = 0
- dlist = sorted(os.listdir(PUP_RESULTS_ROOT_DIR))
- for i, dn in enumerate(dlist):
- dpath = os.path.join(PUP_RESULTS_ROOT_DIR, dn)
- dn_mt = dn_re.match(dn)
- if not dn_mt:
- continue
- animals = [dn_mt.group(1), dn_mt.group(2)]
- for fn in sorted(os.listdir(dpath)):
- fn_mt = fn_re.match(fn)
- if not fn_mt:
- continue
- fpath = os.path.join(dpath, fn)
- print("\r{}/{}: {}... ".format(i + 1, len(dlist), fpath), end="")
- start_time = pd.to_datetime("{} {} {} {}:{}:{}".format(*[fn_mt.group(1 + i) for i in range(6)]))
- df = process_file(session_id, animals, start_time, fpath)
- calls.append(df)
- sessions.append((session_id, animals[0], animals[1], start_time))
- session_id += 1
- calls = pd.concat(calls)
- sessions = pd.DataFrame(sessions, columns=["session_id", "animal1", "animal2", "start_time"])
- sessions.set_index("session_id", inplace=True)
- calls.set_index(["session_id", "call_id"], inplace=True)
- calls.insert(calls.columns.get_loc("level_difference"),
- "calling_bat_deaf",
- calls["calling_bat"].isin(DEAF_BATS))
- sorted_sessions = sessions[sessions["animal2"].str.startswith("m")].sort_values("start_time")
- sessions["before_deafening"] = False
- calls.insert(calls.columns.get_loc("level_difference"),
- "before_deafening",
- False)
- for pup, sessions_per_pup in sorted_sessions.groupby("animal1"):
- first_id = sessions_per_pup.index[0]
- sessions.loc[first_id, "before_deafening"] = True
- calls.loc[calls.index.get_level_values(0) == first_id, "before_deafening"] = True
- calls.insert(calls.columns.get_loc("level_difference"),
- "calling_bat_mother",
- calls["calling_bat"].str.startswith("m"))
- for bool_column in ["before_deafening"]:
- sessions[bool_column] = sessions[bool_column].astype(np.int)
- for bool_column in ["calling_bat_deaf", "before_deafening", "calling_bat_mother"]:
- calls[bool_column] = calls[bool_column].astype(np.int)
- sessions.to_csv("../pup_sessions.csv")
- calls.to_csv("../pup_calls.csv")
- print("\npups done")
- ###
- # Adults
- ###
- ADULT_RESULTS_ROOT_DIR = "../raw_data/adults/"
- sessions = []
- calls = []
- fn_re = re.compile(r"^(\d\d\d\d)_(\d\d)_(\d\d)_(\d\d)_(\d\d)_(\d\d)_call_parameters.mat$")
- session_id = 0
- for dn in ["deaf", "hearing"]:
- dpath = os.path.join(ADULT_RESULTS_ROOT_DIR, dn)
- for fn in sorted(os.listdir(dpath)):
- fn_mt = fn_re.match(fn)
- if not fn_mt:
- continue
-
- print("\r{} {}... ".format(session_id, fn), end="")
- fpath = os.path.join(dpath, fn)
- groups = list(map(int, fn_mt.groups()))
- start_time = pd.Timestamp(year=groups[0], month=groups[1], day=groups[2],
- hour=groups[3], minute=groups[4], second=groups[5])
- df = process_file(session_id, None, start_time, fpath)
- if len(df) == 0:
- continue
- df.insert(df.columns.get_loc("level_difference"),
- "calling_bat_deaf",
- int(dn == "deaf"))
- calls.append(df)
- sessions.append((session_id, dn, start_time))
- session_id += 1
-
- calls = pd.concat(calls)
- calls.set_index(["session_id", "call_id"], inplace=True)
- calls.insert(calls.columns.get_loc("level_difference"),
- "before_deafening", 0)
- calls.insert(calls.columns.get_loc("level_difference"),
- "calling_bat_mother", 0)
- sessions = pd.DataFrame(sessions, columns=["session_id", "group", "start_time"])
- sessions.set_index(["session_id"], inplace=True)
- sessions.to_csv("../adult_sessions.csv")
- calls.to_csv("../adult_calls.csv")
- print("\nadults done")
- # vim:sw=4:sts=4:et:
|