Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

mnetonix.py 12 KB


  1. """
  2. mnetonix.py
  3. Usage:
  4. python mnetonix.py [--split-data] [--split-stimuli] <datafile> <montage>
  5. Arguments:
  6. datafile Either an EDF file or a BrainVision header file (vhdr).
  7. montage Any format montage file supported by MNE.
  8. Flags:
  9. --split-data If specified, each channel of raw data is stored in its own
  10. separate DataArray.
  11. --split-stimuli If specified, each stimulus type (identified by its label)
  12. is stored in a separate MultiTag (one MultiTag per
  13. stimulus type).
  14. (Requires Python 3)
  15. Command line script for reading EDF and BrainVision files using MNE
  16. (mne-python) and storing the data and metadata into a NIX file. Supports
  17. reading montage files for recording channel locations.
  18. To include in a script, call the 'write_raw_mne()' and provide a NIX filename
  19. and MNE Raw structure as arguments.
  20. NIX Format layout
  21. =================
  22. Data
  23. ----
  24. Raw Data are stored in either a single 2-dimensional DataArray or a collection
  25. of DataArrays (one per recording channel). The latter makes tagging easier
  26. since MultiTag positions and extents don't need to specify every channel they
  27. reference. However, creating multiple DataArrays makes file sizes much
  28. bigger.
  29. Stimuli
  30. -------
  31. MNE provides stimulus information through the Raw.annotations dictionary.
  32. Onsets correspond to the 'positions' array and durations correspond to the
  33. 'extents' array of the "Stimuli" MultiTag. If stimulus information is split
  34. by label, each MultiTag uses the label as its name.
  35. Metadata
  36. --------
  37. MNE collects metadata into a (nested) dictionary (Raw.info). All non-empty
  38. keys are converted into Properties in NIX. The nested structure of the
  39. dictionary is replicated in NIX by creating child Sections, starting with one
  40. root section with name "Info".
  41. Some extra metadata is kept in the '_raw_extras' private member when loading
  42. from EDF files. This seems to be missing from the 'Info' dictionary in order
  43. to keep it anonymous (missing keys are 'subject_info', 'meas_date', 'file_id',
  44. and 'meas_id'). The '_raw_extras' are also stored in the NIX file in a
  45. separate Section with name "Extras".
  46. """
  47. import sys
  48. import os
  49. from collections.abc import Iterable, Mapping
  50. import mne
  51. import matplotlib.pyplot as plt
  52. import numpy as np
  53. import nixio as nix
  54. DATA_BLOCK_NAME = "EEG Data Block"
  55. DATA_BLOCK_TYPE = "Recording"
  56. RAW_DATA_GROUP_NAME = "Raw Data Group"
  57. RAW_DATA_GROUP_TYPE = "EEG Channels"
  58. RAW_DATA_TYPE = "Raw Data"
  59. def plot_channel(data_array, index):
  60. signal = data_array[index]
  61. tdim = data_array.dimensions[1]
  62. datadim = data_array.dimensions[0]
  63. plt.plot(tdim.ticks, signal, label=datadim.labels[index])
  64. xlabel = f"({tdim.unit})"
  65. plt.xlabel(xlabel)
  66. ylabel = f"{datadim.labels[index]} ({data_array.unit})"
  67. plt.ylabel(ylabel)
  68. plt.legend()
  69. plt.show()
  70. def create_md_tree(section, values, block):
  71. if values is None:
  72. return
  73. for k, v in values.items():
  74. if v is None:
  75. continue
  76. if isinstance(v, Iterable):
  77. if not len(v):
  78. continue
  79. ndim = np.ndim(v)
  80. if ndim > 1:
  81. da = block.create_data_array(k, "Multidimensional Metadata",
  82. data=v)
  83. for _ in range(ndim):
  84. da.append_set_dimension()
  85. prop = section.create_property(k, da.id)
  86. prop.type = str(v.__class__)
  87. da.metadata = section
  88. continue
  89. # check element type
  90. if isinstance(v, Mapping):
  91. # Create a new Section to hold the metadata found in the
  92. # dictionary
  93. subsec = section.create_section(k, str(v.__class__))
  94. create_md_tree(subsec, v, block)
  95. continue
  96. if isinstance(v[0], Mapping):
  97. # Create a new subsection to hold each nested dictionary as
  98. # sub-subsections
  99. subsec = section.create_section(k, str(v.__class__))
  100. for idx, subd in enumerate(v):
  101. subsubsec = subsec.create_section(f"{k}-{idx}",
  102. str(subd.__class__))
  103. create_md_tree(subsubsec, subd, block)
  104. continue
  105. try:
  106. prop = section.create_property(k, v)
  107. except TypeError:
  108. # inconsistent iterable types: upgrade to floats
  109. prop = section.create_property(k, [float(vi) for vi in v])
  110. prop.type = str(v.__class__)
  111. def write_single_da(mneraw, block):
  112. # data and times
  113. data = mneraw.get_data()
  114. time = mneraw.times
  115. nchan = mneraw.info["nchan"]
  116. print(f"Found {nchan} channels with {mneraw.n_times} samples per channel")
  117. da = block.create_data_array("EEG Data", RAW_DATA_TYPE, data=data)
  118. block.groups[RAW_DATA_GROUP_NAME].data_arrays.append(da)
  119. da.unit = "V"
  120. for dimlen in data.shape:
  121. if dimlen == nchan:
  122. # channel labels: SetDimension
  123. da.append_set_dimension(labels=mneraw.ch_names)
  124. elif dimlen == mneraw.n_times:
  125. # times: RangeDimension
  126. # NOTE: EDF always uses seconds
  127. da.append_range_dimension(ticks=time, label="time", unit="s")
  128. def write_multi_da(mneraw, block):
  129. data = mneraw.get_data()
  130. time = mneraw.times
  131. nchan = mneraw.info["nchan"]
  132. channames = mneraw.ch_names
  133. print(f"Found {nchan} channels with {mneraw.n_times} samples per channel")
  134. # find the channel dimension to iterate over it
  135. for idx, dimlen in enumerate(data.shape):
  136. if dimlen == nchan:
  137. chanidx = idx
  138. break
  139. else:
  140. raise RuntimeError("Could not find data dimension that matches number "
  141. "of channels")
  142. for idx, chandata in enumerate(np.rollaxis(data, chanidx)):
  143. chname = channames[idx]
  144. da = block.create_data_array(chname, RAW_DATA_TYPE, data=chandata)
  145. block.groups[RAW_DATA_GROUP_NAME].data_arrays.append(da)
  146. da.unit = "V"
  147. # times: RangeDimension
  148. # NOTE: EDF always uses seconds
  149. da.append_range_dimension(ticks=time, label="time", unit="s")
  150. def separate_stimulus_types(stimuli):
  151. # separate stimuli based on label
  152. stimdict = dict()
  153. for label, onset, duration in zip(stimuli.description,
  154. stimuli.onset,
  155. stimuli.duration):
  156. if label not in stimdict:
  157. stimdict[label] = [(label, onset, duration)]
  158. else:
  159. stimdict[label].append((label, onset, duration))
  160. return stimdict
  161. def write_stim_tags(mneraw, block, split):
  162. stimuli = mneraw.annotations
  163. if split:
  164. stimtuples = separate_stimulus_types(stimuli)
  165. for label, st in stimtuples.items():
  166. label = label.replace("/", "|")
  167. create_stimulus_multi_tag(st, block, mneraw, mtagname=label)
  168. else:
  169. stimtuples = [(l, o, d) for l, o, d in zip(stimuli.description,
  170. stimuli.onset,
  171. stimuli.duration)]
  172. create_stimulus_multi_tag(stimtuples, block, mneraw)
  173. def create_stimulus_multi_tag(stimtuples, block, mneraw, mtagname="Stimuli"):
  174. # check dimensionality of data
  175. datashape = block.groups[RAW_DATA_GROUP_NAME].data_arrays[0].shape
  176. labels = [st[0] for st in stimtuples]
  177. onsets = [st[1] for st in stimtuples]
  178. durations = [st[2] for st in stimtuples]
  179. ndim = len(datashape)
  180. if ndim == 1:
  181. positions = onsets
  182. extents = durations
  183. else:
  184. channelextent = mneraw.info["nchan"] - 1
  185. positions = [(0, p) for p in onsets]
  186. extents = [(channelextent, e) for e in durations]
  187. posda = block.create_data_array(f"{mtagname} onset", "Stimuli Positions",
  188. data=positions)
  189. posda.append_set_dimension(labels=labels)
  190. extda = block.create_data_array(f"{mtagname} durations", "Stimuli Extents",
  191. data=extents)
  192. extda.append_set_dimension(labels=labels)
  193. for _ in range(ndim-1):
  194. # extra set dimensions for any extra data dimensions (beyond the first)
  195. posda.append_set_dimension()
  196. extda.append_set_dimension()
  197. stimmtag = block.create_multi_tag(mtagname, "EEG Stimuli",
  198. positions=posda)
  199. stimmtag.extents = extda
  200. block.groups[RAW_DATA_GROUP_NAME].multi_tags.append(stimmtag)
  201. for da in block.groups[RAW_DATA_GROUP_NAME].data_arrays:
  202. if da.type == RAW_DATA_TYPE:
  203. stimmtag.references.append(da)
  204. def write_raw_mne(nfname, mneraw,
  205. split_data_channels=False, split_stimuli=False):
  206. """
  207. Writes the provided Raw MNE structure to a NIX file with the given name.
  208. :param nfname: Name for the NIX file to write to. Existing file will be
  209. overwritten.
  210. :param mneraw: An MNE Raw structure (any mne.io.BaseRaw subclass).
  211. :param split_data_channels: If True, each raw data channel will be stored
  212. in a separate DataArray.
  213. :param split_stimuli: If True, stimuli will be split into separate
  214. MultiTags based on the stimulus type (label).
  215. :rtype: None
  216. """
  217. mneinfo = mneraw.info
  218. extrainfo = mneraw._raw_extras
  219. # Create NIX file
  220. nf = nix.File(nfname, nix.FileMode.Overwrite)
  221. # Write Data to NIX
  222. block = nf.create_block(DATA_BLOCK_NAME, DATA_BLOCK_TYPE,
  223. compression=nix.Compression.DeflateNormal)
  224. block.create_group(RAW_DATA_GROUP_NAME, RAW_DATA_GROUP_TYPE)
  225. if split_data_channels:
  226. write_multi_da(mneraw, block)
  227. else:
  228. write_single_da(mneraw, block)
  229. if mneraw.annotations:
  230. write_stim_tags(mneraw, block, split_stimuli)
  231. # Write metadata to NIX
  232. # info dictionary
  233. infomd = nf.create_section("Info", "File metadata")
  234. create_md_tree(infomd, mneinfo, block)
  235. # extras
  236. if len(extrainfo) > 1:
  237. for idx, emd_i in enumerate(extrainfo):
  238. extrasmd = nf.create_section(f"Extras-{idx}",
  239. "Raw Extras metadata")
  240. create_md_tree(extrasmd, emd_i, block)
  241. elif extrainfo:
  242. extrasmd = nf.create_section("Extras", "Raw Extras metadata")
  243. create_md_tree(extrasmd, extrainfo[0], block)
  244. # all done
  245. nf.close()
  246. print(f"Created NIX file at '{nfname}'")
  247. print("Done")
  248. def main():
  249. args = sys.argv
  250. if len(args) < 2:
  251. print("Please provide either a BrainVision vhdr or "
  252. "an EDF filename as the first argument")
  253. sys.exit(1)
  254. splitdata = False
  255. if "--split-data" in args:
  256. splitdata = True
  257. args.remove("--split-data")
  258. splitstim = False
  259. if "--split-stimuli" in args:
  260. splitstim = True
  261. args.remove("--split-stimuli")
  262. datafilename = args[1]
  263. montage = None
  264. if len(args) > 2:
  265. montage = args[2]
  266. montage = os.path.abspath(montage)
  267. root, ext = os.path.splitext(datafilename)
  268. nfname = root + os.path.extsep + "nix"
  269. if ext.casefold() == ".edf".casefold():
  270. mneraw = mne.io.read_raw_edf(datafilename, montage=montage,
  271. preload=True, stim_channel=False)
  272. elif ext.casefold() == ".vhdr".casefold():
  273. mneraw = mne.io.read_raw_brainvision(datafilename, montage=montage,
  274. preload=True, stim_channel=False)
  275. else:
  276. raise RuntimeError(f"Unknown extension '{ext}'")
  277. print(f"Converting '{datafilename}' to NIX")
  278. if splitdata:
  279. print(" Creating one DataArray per channel")
  280. if splitstim:
  281. print(" Creating one MultiTag for each stimulus type")
  282. write_raw_mne(nfname, mneraw, splitdata, splitstim)
  283. mneraw.close()
  284. if __name__ == "__main__":
  285. main()