emotions.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. #!/usr/bin/python
  2. #
  3. # This source code is (C) by Michael Hanke <michael.hanke@gmail.com> and
  4. # made available under the terms of the Creative Common Attribution-ShareAlike
  5. # 4.0 International (CC BY-SA 4.0) license.
  6. #
  7. import numpy as np
  8. #
  9. # Load data
  10. #
  11. def get_nsecond_segments(n=1):
  12. onsets = np.recfromcsv(
  13. opj('src', 'locations', 'data', 'structure.csv'),
  14. names=('start', 'title', 'major', 'setting', 'locale', 'intext', 'temp', 'tod'))['start']
  15. max = float(onsets[-1])
  16. return np.array((np.arange(0, max - n, n), np.arange(n, max, n))).T
  17. def get_av_ratings():
  18. import glob
  19. return [np.recfromcsv(f) for f in glob.glob(
  20. opj('src', 'emotions', 'data', 'raw', 'av*.csv'))]
  21. def get_ao_ratings():
  22. import glob
  23. return [np.recfromcsv(f) for f in glob.glob(
  24. opj('src', 'emotions', 'data', 'raw', 'ao*.csv'))]
  25. #
  26. # Segmentation
  27. #
  28. def mk_thresh_emotion_episodes(rat, thresh, segments):
  29. # yield per character list of emotion episodes with a minimum inter-observer
  30. # agreement wrt any emotion attribute
  31. chars = get_unique_characters(rat)
  32. episodes = {}
  33. def _postprocess(e):
  34. return {k: np.median(v) for k, v in e.items()}
  35. for char in chars:
  36. ep = episodes.get(char, [])
  37. ind = [get_arousal_modulation(rat, segments, char=char)]
  38. labels = ['arousal']
  39. for l, d in (('v_pos', dict(valence='POS')),
  40. ('v_neg', dict(valence='NEG')),
  41. ('d_self', dict(direction='SELF')),
  42. ('d_other', dict(direction='OTHER')),
  43. ('e_admiration', dict(emotion='ADMIRATION')),
  44. ('e_anger/rage', dict(emotion='ANGER/RAGE')),
  45. ('e_contempt', dict(emotion='CONTEMPT')),
  46. ('e_disappointment', dict(emotion='DISAPPOINTMENT')),
  47. ('e_fear', dict(emotion='FEAR')),
  48. ('e_fears_confirmed', dict(emotion='FEARS_CONFIRMED')),
  49. ('e_gloating', dict(emotion='GLOATING')),
  50. ('e_gratification', dict(emotion='GRATIFICATION')),
  51. ('e_gratitude', dict(emotion='GRATITUDE')),
  52. ('e_happiness', dict(emotion='HAPPINESS')),
  53. ('e_happy-for', dict(emotion='HAPPY-FOR')),
  54. ('e_hate', dict(emotion='HATE')),
  55. ('e_hope', dict(emotion='HOPE')),
  56. ('e_love', dict(emotion='LOVE')),
  57. ('e_pity/compassion', dict(emotion='PITY/COMPASSION')),
  58. ('e_pride', dict(emotion='PRIDE')),
  59. ('e_relief', dict(emotion='RELIEF')),
  60. ('e_remorse', dict(emotion='REMORSE')),
  61. ('e_resent', dict(emotion='RESENTMENT')),
  62. ('e_sadness', dict(emotion='SADNESS')),
  63. ('e_satisfaction', dict(emotion='SATISFACTION')),
  64. ('e_shame', dict(emotion='SHAME')),
  65. ('c_audio', dict(oncue='AUDIO')),
  66. ('c_context', dict(oncue='CONTEXT')),
  67. ('c_face', dict(oncue='FACE')),
  68. ('c_gesture', dict(oncue='GESTURE')),
  69. ('c_narrator', dict(oncue='NARRATOR')),
  70. ('c_verbal', dict(oncue='VERBAL')),
  71. ):
  72. ind.append(_get_modulation(rat, segments, character=char, **d))
  73. labels.append(l)
  74. ind = np.array(ind)
  75. # where is any above threshold agreement
  76. flags = np.abs(ind) >= thresh
  77. staging = None
  78. last_ind = np.array([False] * len(ind))
  79. # for each segment
  80. for i, f in enumerate(flags.T):
  81. # print i, f,
  82. if not np.sum(f):
  83. if staging:
  84. ep.append(_postprocess(staging))
  85. staging = None
  86. # print 'commit',
  87. last_ind = f
  88. # print 'skip'
  89. continue
  90. # continuing episode?
  91. if np.all(f == last_ind):
  92. # end of annotation is end of current segment
  93. staging['end'] = segments[i, 1]
  94. for nl, l in enumerate(labels):
  95. staging[l].append(ind[nl, i])
  96. # print 'extend'
  97. else:
  98. # new episode
  99. if staging:
  100. # print 'commit',
  101. ep.append(_postprocess(staging))
  102. # print 'new'
  103. staging = dict(start=segments[i, 0],
  104. end=segments[i, 1])
  105. last_ind = f
  106. for nl, l in enumerate(labels):
  107. staging[l] = [ind[nl, i]]
  108. episodes[char] = ep
  109. return episodes, labels
  110. def emo2eventstsv(data, labels):
  111. # format output of `mk_thresh_emotion_episodes()` into a format that is
  112. # importable by Advene, while merging all episodes of all characters
  113. # into a single file
  114. episodes = []
  115. s = 'onset\tduration\tcharacter\tarousal\tvalence_positive\tvalence_negative\t'
  116. s += '\t'.join(l for l in sorted(labels) if not l in ('arousal', 'v_pos', 'v_neg'))
  117. s += '\n'
  118. for char, ep in data.items():
  119. for e in ep:
  120. e['character'] = char
  121. episodes.append(e)
  122. episodes = sorted(episodes, key=lambda x: x['start'])
  123. fmt = '{onset}\t{duration}\t{character}\t{arousal}\t{valence_positive}\t{valence_negative}\t'
  124. fmt += '\t'.join('{%s}' % l for l in sorted(labels) if not l in ('arousal', 'v_pos', 'v_neg'))
  125. fmt += '\n'
  126. for e in episodes:
  127. s += fmt.format(
  128. onset=e['start'],
  129. duration=e['end'] - e['start'],
  130. valence_positive=e['v_pos'],
  131. valence_negative=e['v_neg'],
  132. **e)
  133. return s
  134. #
  135. # Helpers
  136. #
  137. def get_unique_characters(rat):
  138. return np.unique(
  139. np.concatenate(
  140. [np.unique([a['character'] for a in an])
  141. for an in rat]))
  142. def get_unique_emotions(rat):
  143. return [e for e in np.unique(
  144. np.concatenate(
  145. [np.unique(
  146. np.concatenate([a['emotion'].split() for a in an]))
  147. for an in rat])) if not '?' in e]
  148. def get_unique_oncues(rat):
  149. return [e for e in np.unique(
  150. np.concatenate(
  151. [np.unique(
  152. np.concatenate([a['oncue'].split() for a in an]))
  153. for an in rat])) if not '?' in e]
  154. def slice2segments(ratings, cond, segments):
  155. # compute a time series of inter-observer agreement wrt a particular
  156. # emotion property (or combinations thereof)
  157. # annotations given with start and stop time, are converted into a
  158. # timeseries with data point locations given by the sequence of
  159. # `segments`. Segments intersecting with a given annotation from an
  160. # individual observer are set to one, the rest to zero. The mean
  161. # across observers for any segment is returned
  162. slicer = np.zeros(len(segments))
  163. for rat in ratings:
  164. rslicer = np.zeros(len(segments))
  165. for e in rat:
  166. use = True
  167. for k, v in cond.items():
  168. if v == '*':
  169. continue
  170. if k in ('oncue', 'offcue', 'emotion'):
  171. if not v in e[k].split():
  172. use = False
  173. else:
  174. if not v == e[k]:
  175. use = False
  176. if not use:
  177. continue
  178. select = np.logical_and(segments.T[1] > e['start'],
  179. segments.T[0] < e['end'])
  180. rslicer[select] += 1
  181. slicer += rslicer > 0
  182. slicer = slicer.astype(float) / len(ratings)
  183. return slicer
  184. def get_timeseries(rat, urat, segments, char='*'):
  185. # yield time series representations of all relevant emotion attributes
  186. # from raw annotations
  187. vars = [get_arousal_modulation(rat, segments, char=char),
  188. get_valence_modulation(rat, segments, char=char),
  189. get_direction_modulation(rat, segments, char=char)]
  190. labels = ['arousal', 'valence', 'direction']
  191. for emo in get_unique_emotions(urat):
  192. vars.append(_get_modulation(rat, segments, emotion=emo, character=char))
  193. labels.append(emo.lower())
  194. for oc in get_unique_oncues(urat):
  195. vars.append(_get_modulation(rat, segments, oncue=oc, character=char))
  196. labels.append(oc.lower())
  197. return np.array(vars).T, labels
  198. def _get_modulation(ratings, segments, **kwargs):
  199. return slice2segments(ratings, kwargs, segments)
  200. def get_arousal_modulation(ratings, segments, char='*'):
  201. ts = _get_modulation(ratings, segments, character=char, arousal='HIGH') \
  202. - _get_modulation(ratings, segments, character=char, arousal='LOW')
  203. return ts
  204. def get_valence_modulation(ratings, segments, char='*'):
  205. ts = _get_modulation(ratings, segments, character=char, valence='POS') \
  206. - _get_modulation(ratings, segments, character=char, valence='NEG')
  207. return ts
  208. def get_direction_modulation(ratings, segments, char='*'):
  209. ts = _get_modulation(ratings, segments, character=char, direction='SELF') \
  210. - _get_modulation(ratings, segments, character=char, direction='OTHER')
  211. return ts
  212. if __name__ == '__main__':
  213. # main function: compute stats, generate derived data, make figures
  214. import os
  215. from os.path import join as opj
  216. outpath = 'researchcut'
  217. if not os.path.exists(outpath):
  218. os.makedirs(outpath)
  219. second_segments = get_nsecond_segments()
  220. avr = get_av_ratings()
  221. aor = get_ao_ratings()
  222. open(opj(outpath, 'emotions_av_1s_events.tsv'), 'w').write(
  223. emo2eventstsv(
  224. *mk_thresh_emotion_episodes(avr, .5, get_nsecond_segments(1))))
  225. open(opj(outpath, 'emotions_ao_1s_events.tsv'), 'w').write(
  226. emo2eventstsv(
  227. *mk_thresh_emotion_episodes(aor, .5, get_nsecond_segments(1))))