nixio.py 50 KB


  1. # -*- coding: utf-8 -*-
  2. # Copyright (c) 2016, German Neuroinformatics Node (G-Node)
  3. # Achilleas Koutsou <achilleas.k@gmail.com>
  4. #
  5. # All rights reserved.
  6. #
  7. # Redistribution and use in source and binary forms, with or without
  8. # modification, are permitted under the terms of the BSD License. See
  9. # LICENSE file in the root of the Project.
  10. """
  11. Module for reading data from files in the NIX format.
  12. Author: Achilleas Koutsou
  13. This IO supports both writing and reading of NIX files. Reading is supported
  14. only if the NIX file was created using this IO.
  15. Details on how the Neo object tree is mapped to NIX, as well as details on
  16. behaviours specific to this IO, can be found on the wiki of the G-Node fork of
  17. Neo: https://github.com/G-Node/python-neo/wiki
  18. """
  19. from __future__ import absolute_import
  20. import time
  21. from datetime import datetime
  22. from collections import Iterable, OrderedDict
  23. import itertools
  24. from uuid import uuid4
  25. import quantities as pq
  26. import numpy as np
  27. from .baseio import BaseIO
  28. from ..core import (Block, Segment, ChannelIndex, AnalogSignal,
  29. IrregularlySampledSignal, Epoch, Event, SpikeTrain, Unit)
  30. from ..version import version as neover
  31. try:
  32. import nixio as nix
  33. HAVE_NIX = True
  34. except ImportError:
  35. HAVE_NIX = False
  36. try:
  37. string_types = basestring
  38. except NameError:
  39. string_types = str
  40. EMPTYANNOTATION = "EMPTYLIST"
  41. def stringify(value):
  42. if value is None:
  43. return value
  44. if isinstance(value, bytes):
  45. value = value.decode()
  46. return str(value)
  47. def create_quantity(values, unitstr):
  48. if "*" in unitstr:
  49. unit = pq.CompoundUnit(stringify(unitstr))
  50. else:
  51. unit = unitstr
  52. return pq.Quantity(values, unit)
  53. def units_to_string(pqunit):
  54. dim = str(pqunit.dimensionality)
  55. if dim.startswith("(") and dim.endswith(")"):
  56. return dim.strip("()")
  57. return dim
  58. def calculate_timestamp(dt):
  59. if isinstance(dt, datetime):
  60. return int(time.mktime(dt.timetuple()))
  61. return int(dt)
  62. class NixIO(BaseIO):
  63. """
  64. Class for reading and writing NIX files.
  65. """
  66. is_readable = True
  67. is_writable = True
  68. supported_objects = [Block, Segment, ChannelIndex,
  69. AnalogSignal, IrregularlySampledSignal,
  70. Epoch, Event, SpikeTrain, Unit]
  71. readable_objects = [Block]
  72. writeable_objects = [Block]
  73. name = "NIX"
  74. extensions = ["h5", "nix"]
  75. mode = "file"
  76. nix_version = nix.__version__ if HAVE_NIX else "NIX NOT FOUND"
  77. def __init__(self, filename, mode="rw"):
  78. """
  79. Initialise IO instance and NIX file.
  80. :param filename: Full path to the file
  81. """
  82. if not HAVE_NIX:
  83. raise Exception("Failed to import NIX. "
  84. "The NixIO requires the Python bindings for NIX "
  85. "(nixio on PyPi). Try `pip install nixio`.")
  86. BaseIO.__init__(self, filename)
  87. self.filename = filename
  88. if mode == "ro":
  89. filemode = nix.FileMode.ReadOnly
  90. elif mode == "rw":
  91. filemode = nix.FileMode.ReadWrite
  92. elif mode == "ow":
  93. filemode = nix.FileMode.Overwrite
  94. else:
  95. raise ValueError("Invalid mode specified '{}'. "
  96. "Valid modes: 'ro' (ReadOnly)', 'rw' (ReadWrite),"
  97. " 'ow' (Overwrite).".format(mode))
  98. self.nix_file = nix.File.open(self.filename, filemode)
  99. if self.nix_file.mode == nix.FileMode.ReadOnly:
  100. self._file_version = '0.5.2'
  101. if "neo" in self.nix_file.sections:
  102. self._file_version = self.nix_file.sections["neo"]["version"]
  103. elif self.nix_file.mode == nix.FileMode.ReadWrite:
  104. if "neo" in self.nix_file.sections:
  105. self._file_version = self.nix_file.sections["neo"]["version"]
  106. else:
  107. self._file_version = '0.5.2'
  108. filemd = self.nix_file.create_section("neo", "neo.metadata")
  109. filemd["version"] = self._file_version
  110. else:
  111. # new file
  112. filemd = self.nix_file.create_section("neo", "neo.metadata")
  113. filemd["version"] = neover
  114. self._file_version = neover
  115. self._block_read_counter = 0
  116. # helper maps
  117. self._neo_map = dict()
  118. self._ref_map = dict()
  119. self._signal_map = dict()
  120. # _names_ok is used to guard against name check duplication
  121. self._names_ok = False
  122. def __enter__(self):
  123. return self
  124. def __exit__(self, *args):
  125. self.close()
  126. def read_all_blocks(self, lazy=False):
  127. if lazy:
  128. raise Exception("Lazy loading is not supported for NixIO")
  129. return list(self._nix_to_neo_block(blk)
  130. for blk in self.nix_file.blocks)
  131. def read_block(self, index=None, nixname=None, neoname=None, lazy=False):
  132. """
  133. Loads a Block from the NIX file along with all contained child objects
  134. and returns the equivalent Neo Block.
  135. The Block to read can be specified in one of three ways:
  136. - Index (position) in the file
  137. - Name of the NIX Block (see [...] for details on the naming)
  138. - Name of the original Neo Block
  139. If no arguments are specified, the first Block is returned and
  140. consecutive calls to the function return the next Block in the file.
  141. After all Blocks have been loaded this way, the function returns None.
  142. If more than one argument is specified, the precedence order is:
  143. index, nixname, neoname
  144. Note that Neo objects can be anonymous or have non-unique names,
  145. so specifying a Neo name may be ambiguous.
  146. See also :meth:`NixIO.iter_blocks`.
  147. :param index: The position of the Block to be loaded (creation order)
  148. :param nixname: The name of the Block in NIX
  149. :param neoname: The name of the original Neo Block
  150. """
  151. if lazy:
  152. raise Exception("Lazy loading is not supported for NixIO")
  153. nix_block = None
  154. if index is not None:
  155. nix_block = self.nix_file.blocks[index]
  156. elif nixname is not None:
  157. nix_block = self.nix_file.blocks[nixname]
  158. elif neoname is not None:
  159. for blk in self.nix_file.blocks:
  160. if ("neo_name" in blk.metadata
  161. and blk.metadata["neo_name"] == neoname):
  162. nix_block = blk
  163. break
  164. else:
  165. raise KeyError(
  166. "Block with Neo name '{}' does not exist".format(neoname)
  167. )
  168. else:
  169. index = self._block_read_counter
  170. if index >= len(self.nix_file.blocks):
  171. return None
  172. nix_block = self.nix_file.blocks[index]
  173. self._block_read_counter += 1
  174. return self._nix_to_neo_block(nix_block)
  175. def iter_blocks(self):
  176. """
  177. Returns an iterator which can be used to consecutively load and convert
  178. all Blocks from the NIX File.
  179. """
  180. for blk in self.nix_file.blocks:
  181. yield self._nix_to_neo_block(blk)
  182. def _nix_to_neo_block(self, nix_block):
  183. neo_attrs = self._nix_attr_to_neo(nix_block)
  184. neo_block = Block(**neo_attrs)
  185. neo_block.rec_datetime = datetime.fromtimestamp(
  186. nix_block.created_at
  187. )
  188. # descend into Groups
  189. for grp in nix_block.groups:
  190. newseg = self._nix_to_neo_segment(grp)
  191. neo_block.segments.append(newseg)
  192. # parent reference
  193. newseg.block = neo_block
  194. # find free floating (Groupless) signals and spiketrains
  195. blockdas = self._group_signals(nix_block.data_arrays)
  196. for name, das in blockdas.items():
  197. if name not in self._neo_map:
  198. if das[0].type == "neo.analogsignal":
  199. self._nix_to_neo_analogsignal(das)
  200. elif das[0].type == "neo.irregularlysampledsignal":
  201. self._nix_to_neo_irregularlysampledsignal(das)
  202. for mt in nix_block.multi_tags:
  203. if mt.type == "neo.spiketrain" and mt.name not in self._neo_map:
  204. self._nix_to_neo_spiketrain(mt)
  205. # descend into Sources
  206. for src in nix_block.sources:
  207. newchx = self._nix_to_neo_channelindex(src)
  208. neo_block.channel_indexes.append(newchx)
  209. # parent reference
  210. newchx.block = neo_block
  211. # reset maps
  212. self._neo_map = dict()
  213. self._ref_map = dict()
  214. self._signal_map = dict()
  215. return neo_block
  216. def _nix_to_neo_segment(self, nix_group):
  217. neo_attrs = self._nix_attr_to_neo(nix_group)
  218. neo_segment = Segment(**neo_attrs)
  219. neo_segment.rec_datetime = datetime.fromtimestamp(
  220. nix_group.created_at
  221. )
  222. self._neo_map[nix_group.name] = neo_segment
  223. # this will probably get all the DAs anyway, but if we change any part
  224. # of the mapping to add other kinds of DataArrays to a group, such as
  225. # MultiTag positions and extents, this filter will be necessary
  226. dataarrays = list(filter(
  227. lambda da: da.type in ("neo.analogsignal",
  228. "neo.irregularlysampledsignal"),
  229. nix_group.data_arrays))
  230. dataarrays = self._group_signals(dataarrays)
  231. # descend into DataArrays
  232. for name, das in dataarrays.items():
  233. if das[0].type == "neo.analogsignal":
  234. newasig = self._nix_to_neo_analogsignal(das)
  235. neo_segment.analogsignals.append(newasig)
  236. # parent reference
  237. newasig.segment = neo_segment
  238. elif das[0].type == "neo.irregularlysampledsignal":
  239. newisig = self._nix_to_neo_irregularlysampledsignal(das)
  240. neo_segment.irregularlysampledsignals.append(newisig)
  241. # parent reference
  242. newisig.segment = neo_segment
  243. # descend into MultiTags
  244. for mtag in nix_group.multi_tags:
  245. if mtag.type == "neo.event":
  246. newevent = self._nix_to_neo_event(mtag)
  247. neo_segment.events.append(newevent)
  248. # parent reference
  249. newevent.segment = neo_segment
  250. elif mtag.type == "neo.epoch":
  251. newepoch = self._nix_to_neo_epoch(mtag)
  252. neo_segment.epochs.append(newepoch)
  253. # parent reference
  254. newepoch.segment = neo_segment
  255. elif mtag.type == "neo.spiketrain":
  256. newst = self._nix_to_neo_spiketrain(mtag)
  257. neo_segment.spiketrains.append(newst)
  258. # parent reference
  259. newst.segment = neo_segment
  260. return neo_segment
  261. def _nix_to_neo_channelindex(self, nix_source):
  262. neo_attrs = self._nix_attr_to_neo(nix_source)
  263. channels = list(self._nix_attr_to_neo(c)
  264. for c in nix_source.sources
  265. if c.type == "neo.channelindex")
  266. neo_attrs["index"] = np.array([c["index"]
  267. for c in channels])
  268. if len(channels):
  269. chan_names = list(c["neo_name"]
  270. for c in channels if "neo_name" in c)
  271. chan_ids = list(c["channel_id"]
  272. for c in channels if "channel_id" in c)
  273. if chan_names:
  274. neo_attrs["channel_names"] = chan_names
  275. if chan_ids:
  276. neo_attrs["channel_ids"] = chan_ids
  277. if "coordinates" in channels[0]:
  278. neo_attrs["coordinates"] = list(c["coordinates"]
  279. for c in channels)
  280. neo_chx = ChannelIndex(**neo_attrs)
  281. self._neo_map[nix_source.name] = neo_chx
  282. # create references to Signals
  283. signals = self._ref_map.get(nix_source.name, list())
  284. for sig in signals:
  285. if isinstance(sig, AnalogSignal):
  286. neo_chx.analogsignals.append(sig)
  287. elif isinstance(sig, IrregularlySampledSignal):
  288. neo_chx.irregularlysampledsignals.append(sig)
  289. # else error?
  290. # descend into Sources
  291. for src in nix_source.sources:
  292. if src.type == "neo.unit":
  293. newunit = self._nix_to_neo_unit(src)
  294. neo_chx.units.append(newunit)
  295. # parent reference
  296. newunit.channel_index = neo_chx
  297. return neo_chx
  298. def _nix_to_neo_unit(self, nix_source):
  299. neo_attrs = self._nix_attr_to_neo(nix_source)
  300. neo_unit = Unit(**neo_attrs)
  301. self._neo_map[nix_source.name] = neo_unit
  302. # create references to SpikeTrains
  303. neo_unit.spiketrains.extend(self._ref_map.get(nix_source.name, list()))
  304. return neo_unit
  305. def _nix_to_neo_analogsignal(self, nix_da_group):
  306. """
  307. Convert a group of NIX DataArrays to a Neo AnalogSignal. This method
  308. expects a list of data arrays that all represent the same,
  309. multidimensional Neo AnalogSignal object.
  310. :param nix_da_group: a list of NIX DataArray objects
  311. :return: a Neo AnalogSignal object
  312. """
  313. neo_attrs = self._nix_attr_to_neo(nix_da_group[0])
  314. metadata = nix_da_group[0].metadata
  315. neo_attrs["nix_name"] = metadata.name # use the common base name
  316. unit = nix_da_group[0].unit
  317. signaldata = np.array([d[:] for d in nix_da_group]).transpose()
  318. signaldata = create_quantity(signaldata, unit)
  319. timedim = self._get_time_dimension(nix_da_group[0])
  320. sampling_period = create_quantity(timedim.sampling_interval,
  321. timedim.unit)
  322. # t_start should have been added to neo_attrs via the NIX
  323. # object's metadata. This may not be present since in older
  324. # versions, we didn't store t_start in the metadata when it
  325. # wasn't necessary, such as when the timedim.offset and unit
  326. # did not require rescaling.
  327. if "t_start" in neo_attrs:
  328. t_start = neo_attrs["t_start"]
  329. del neo_attrs["t_start"]
  330. else:
  331. t_start = create_quantity(timedim.offset, timedim.unit)
  332. neo_signal = AnalogSignal(
  333. signal=signaldata, sampling_period=sampling_period,
  334. t_start=t_start, **neo_attrs
  335. )
  336. self._neo_map[neo_attrs["nix_name"]] = neo_signal
  337. # all DAs reference the same sources
  338. srcnames = list(src.name for src in nix_da_group[0].sources)
  339. for n in srcnames:
  340. if n not in self._ref_map:
  341. self._ref_map[n] = list()
  342. self._ref_map[n].append(neo_signal)
  343. return neo_signal
  344. def _nix_to_neo_irregularlysampledsignal(self, nix_da_group):
  345. """
  346. Convert a group of NIX DataArrays to a Neo IrregularlySampledSignal.
  347. This method expects a list of data arrays that all represent the same,
  348. multidimensional Neo IrregularlySampledSignal object.
  349. :param nix_da_group: a list of NIX DataArray objects
  350. :return: a Neo IrregularlySampledSignal object
  351. """
  352. neo_attrs = self._nix_attr_to_neo(nix_da_group[0])
  353. metadata = nix_da_group[0].metadata
  354. neo_attrs["nix_name"] = metadata.name # use the common base name
  355. unit = nix_da_group[0].unit
  356. signaldata = np.array([d[:] for d in nix_da_group]).transpose()
  357. signaldata = create_quantity(signaldata, unit)
  358. timedim = self._get_time_dimension(nix_da_group[0])
  359. times = create_quantity(timedim.ticks, timedim.unit)
  360. neo_signal = IrregularlySampledSignal(
  361. signal=signaldata, times=times, **neo_attrs
  362. )
  363. self._neo_map[neo_attrs["nix_name"]] = neo_signal
  364. # all DAs reference the same sources
  365. srcnames = list(src.name for src in nix_da_group[0].sources)
  366. for n in srcnames:
  367. if n not in self._ref_map:
  368. self._ref_map[n] = list()
  369. self._ref_map[n].append(neo_signal)
  370. return neo_signal
  371. def _nix_to_neo_event(self, nix_mtag):
  372. neo_attrs = self._nix_attr_to_neo(nix_mtag)
  373. time_unit = nix_mtag.positions.unit
  374. times = create_quantity(nix_mtag.positions, time_unit)
  375. labels = np.array(nix_mtag.positions.dimensions[0].labels,
  376. dtype="S")
  377. neo_event = Event(times=times, labels=labels, **neo_attrs)
  378. self._neo_map[nix_mtag.name] = neo_event
  379. return neo_event
  380. def _nix_to_neo_epoch(self, nix_mtag):
  381. neo_attrs = self._nix_attr_to_neo(nix_mtag)
  382. time_unit = nix_mtag.positions.unit
  383. times = create_quantity(nix_mtag.positions, time_unit)
  384. durations = create_quantity(nix_mtag.extents,
  385. nix_mtag.extents.unit)
  386. if len(nix_mtag.positions.dimensions[0].labels) > 0:
  387. labels = np.array(nix_mtag.positions.dimensions[0].labels,
  388. dtype="S")
  389. else:
  390. labels = None
  391. neo_epoch = Epoch(times=times, durations=durations, labels=labels,
  392. **neo_attrs)
  393. self._neo_map[nix_mtag.name] = neo_epoch
  394. return neo_epoch
  395. def _nix_to_neo_spiketrain(self, nix_mtag):
  396. neo_attrs = self._nix_attr_to_neo(nix_mtag)
  397. time_unit = nix_mtag.positions.unit
  398. times = create_quantity(nix_mtag.positions, time_unit)
  399. neo_spiketrain = SpikeTrain(times=times, **neo_attrs)
  400. if nix_mtag.features:
  401. wfda = nix_mtag.features[0].data
  402. wftime = self._get_time_dimension(wfda)
  403. neo_spiketrain.waveforms = create_quantity(wfda, wfda.unit)
  404. interval_units = wftime.unit
  405. neo_spiketrain.sampling_period = create_quantity(
  406. wftime.sampling_interval, interval_units
  407. )
  408. left_sweep_units = wftime.unit
  409. if "left_sweep" in wfda.metadata:
  410. neo_spiketrain.left_sweep = create_quantity(
  411. wfda.metadata["left_sweep"], left_sweep_units
  412. )
  413. self._neo_map[nix_mtag.name] = neo_spiketrain
  414. srcnames = list(src.name for src in nix_mtag.sources)
  415. for n in srcnames:
  416. if n not in self._ref_map:
  417. self._ref_map[n] = list()
  418. self._ref_map[n].append(neo_spiketrain)
  419. return neo_spiketrain
  420. def write_all_blocks(self, neo_blocks, use_obj_names=False):
  421. """
  422. Convert all ``neo_blocks`` to the NIX equivalent and write them to the
  423. file.
  424. :param neo_blocks: List (or iterable) containing Neo blocks
  425. :param use_obj_names: If True, will not generate unique object names
  426. but will instead try to use the name of each Neo object. If these are
  427. not unique, an exception will be raised.
  428. """
  429. if use_obj_names:
  430. self._use_obj_names(neo_blocks)
  431. self._names_ok = True
  432. for bl in neo_blocks:
  433. self.write_block(bl, use_obj_names)
  434. def write_block(self, block, use_obj_names=False):
  435. """
  436. Convert the provided Neo Block to a NIX Block and write it to
  437. the NIX file.
  438. :param block: Neo Block to be written
  439. :param use_obj_names: If True, will not generate unique object names
  440. but will instead try to use the name of each Neo object. If these are
  441. not unique, an exception will be raised.
  442. """
  443. if use_obj_names:
  444. if not self._names_ok:
  445. # _names_ok guards against check duplication
  446. # If it's False, it means write_block() was called directly
  447. self._use_obj_names([block])
  448. if "nix_name" in block.annotations:
  449. nix_name = block.annotations["nix_name"]
  450. else:
  451. nix_name = "neo.block.{}".format(self._generate_nix_name())
  452. block.annotate(nix_name=nix_name)
  453. if nix_name in self.nix_file.blocks:
  454. nixblock = self.nix_file.blocks[nix_name]
  455. del self.nix_file.blocks[nix_name]
  456. del self.nix_file.sections[nix_name]
  457. nixblock = self.nix_file.create_block(nix_name, "neo.block")
  458. nixblock.metadata = self.nix_file.create_section(
  459. nix_name, "neo.block.metadata"
  460. )
  461. metadata = nixblock.metadata
  462. neoname = block.name if block.name is not None else ""
  463. metadata["neo_name"] = neoname
  464. nixblock.definition = block.description
  465. if block.rec_datetime:
  466. nixblock.force_created_at(
  467. calculate_timestamp(block.rec_datetime)
  468. )
  469. if block.file_datetime:
  470. fdt = calculate_timestamp(block.file_datetime)
  471. metadata["file_datetime"] = fdt
  472. if block.annotations:
  473. for k, v in block.annotations.items():
  474. self._write_property(metadata, k, v)
  475. # descend into Segments
  476. for seg in block.segments:
  477. self._write_segment(seg, nixblock)
  478. # descend into ChannelIndexes
  479. for chx in block.channel_indexes:
  480. self._write_channelindex(chx, nixblock)
  481. self._create_source_links(block, nixblock)
  482. def _write_channelindex(self, chx, nixblock):
  483. """
  484. Convert the provided Neo ChannelIndex to a NIX Source and write it to
  485. the NIX file. For each index in the ChannelIndex object, a child
  486. NIX Source is also created.
  487. :param chx: The Neo ChannelIndex to be written
  488. :param nixblock: NIX Block where the Source will be created
  489. """
  490. if "nix_name" in chx.annotations:
  491. nix_name = chx.annotations["nix_name"]
  492. else:
  493. nix_name = "neo.channelindex.{}".format(self._generate_nix_name())
  494. chx.annotate(nix_name=nix_name)
  495. nixsource = nixblock.create_source(nix_name, "neo.channelindex")
  496. nixsource.metadata = nixblock.metadata.create_section(
  497. nix_name, "neo.channelindex.metadata"
  498. )
  499. metadata = nixsource.metadata
  500. neoname = chx.name if chx.name is not None else ""
  501. metadata["neo_name"] = neoname
  502. nixsource.definition = chx.description
  503. if chx.annotations:
  504. for k, v in chx.annotations.items():
  505. self._write_property(metadata, k, v)
  506. for idx, channel in enumerate(chx.index):
  507. channame = "{}.ChannelIndex{}".format(nix_name, idx)
  508. nixchan = nixsource.create_source(channame, "neo.channelindex")
  509. nixchan.metadata = nixsource.metadata.create_section(
  510. nixchan.name, "neo.channelindex.metadata"
  511. )
  512. nixchan.definition = nixsource.definition
  513. chanmd = nixchan.metadata
  514. chanmd["index"] = int(channel)
  515. if len(chx.channel_names):
  516. neochanname = stringify(chx.channel_names[idx])
  517. chanmd["neo_name"] = neochanname
  518. if len(chx.channel_ids):
  519. chanid = chx.channel_ids[idx]
  520. chanmd["channel_id"] = chanid
  521. if chx.coordinates is not None:
  522. coords = chx.coordinates[idx]
  523. coordunits = stringify(coords[0].dimensionality)
  524. nixcoords = tuple(c.magnitude.item() for c in coords)
  525. chanprop = chanmd.create_property("coordinates", nixcoords)
  526. chanprop.unit = coordunits
  527. # Descend into Units
  528. for unit in chx.units:
  529. self._write_unit(unit, nixsource)
  530. def _write_segment(self, segment, nixblock):
  531. """
  532. Convert the provided Neo Segment to a NIX Group and write it to the
  533. NIX file.
  534. :param segment: Neo Segment to be written
  535. :param nixblock: NIX Block where the Group will be created
  536. """
  537. if "nix_name" in segment.annotations:
  538. nix_name = segment.annotations["nix_name"]
  539. else:
  540. nix_name = "neo.segment.{}".format(self._generate_nix_name())
  541. segment.annotate(nix_name=nix_name)
  542. nixgroup = nixblock.create_group(nix_name, "neo.segment")
  543. nixgroup.metadata = nixblock.metadata.create_section(
  544. nix_name, "neo.segment.metadata"
  545. )
  546. metadata = nixgroup.metadata
  547. neoname = segment.name if segment.name is not None else ""
  548. metadata["neo_name"] = neoname
  549. nixgroup.definition = segment.description
  550. if segment.rec_datetime:
  551. nixgroup.force_created_at(
  552. calculate_timestamp(segment.rec_datetime)
  553. )
  554. if segment.file_datetime:
  555. fdt = calculate_timestamp(segment.file_datetime)
  556. metadata["file_datetime"] = fdt
  557. if segment.annotations:
  558. for k, v in segment.annotations.items():
  559. self._write_property(metadata, k, v)
  560. # write signals, events, epochs, and spiketrains
  561. for asig in segment.analogsignals:
  562. self._write_analogsignal(asig, nixblock, nixgroup)
  563. for isig in segment.irregularlysampledsignals:
  564. self._write_irregularlysampledsignal(isig, nixblock, nixgroup)
  565. for event in segment.events:
  566. self._write_event(event, nixblock, nixgroup)
  567. for epoch in segment.epochs:
  568. self._write_epoch(epoch, nixblock, nixgroup)
  569. for spiketrain in segment.spiketrains:
  570. self._write_spiketrain(spiketrain, nixblock, nixgroup)
  571. def _write_analogsignal(self, anasig, nixblock, nixgroup):
  572. """
  573. Convert the provided ``anasig`` (AnalogSignal) to a list of NIX
  574. DataArray objects and write them to the NIX file. All DataArray objects
  575. created from the same AnalogSignal have their metadata section point to
  576. the same object.
  577. :param anasig: The Neo AnalogSignal to be written
  578. :param nixblock: NIX Block where the DataArrays will be created
  579. :param nixgroup: NIX Group where the DataArrays will be attached
  580. """
  581. if "nix_name" in anasig.annotations:
  582. nix_name = anasig.annotations["nix_name"]
  583. else:
  584. nix_name = "neo.analogsignal.{}".format(self._generate_nix_name())
  585. anasig.annotate(nix_name=nix_name)
  586. if "{}.0".format(nix_name) in nixblock.data_arrays and nixgroup:
  587. # AnalogSignal is in multiple Segments.
  588. # Append DataArrays to Group and return.
  589. dalist = list()
  590. for idx in itertools.count():
  591. daname = "{}.{}".format(nix_name, idx)
  592. if daname in nixblock.data_arrays:
  593. dalist.append(nixblock.data_arrays[daname])
  594. else:
  595. break
  596. nixgroup.data_arrays.extend(dalist)
  597. return
  598. data = np.transpose(anasig[:].magnitude)
  599. parentmd = nixgroup.metadata if nixgroup else nixblock.metadata
  600. metadata = parentmd.create_section(nix_name,
  601. "neo.analogsignal.metadata")
  602. nixdas = list()
  603. for idx, row in enumerate(data):
  604. daname = "{}.{}".format(nix_name, idx)
  605. da = nixblock.create_data_array(daname, "neo.analogsignal",
  606. data=row)
  607. da.metadata = metadata
  608. da.definition = anasig.description
  609. da.unit = units_to_string(anasig.units)
  610. timedim = da.append_sampled_dimension(
  611. anasig.sampling_period.magnitude.item()
  612. )
  613. timedim.unit = units_to_string(anasig.sampling_period.units)
  614. tstart = anasig.t_start
  615. metadata["t_start"] = tstart.magnitude.item()
  616. metadata.props["t_start"].unit = units_to_string(tstart.units)
  617. timedim.offset = tstart.rescale(timedim.unit).magnitude.item()
  618. timedim.label = "time"
  619. nixdas.append(da)
  620. if nixgroup:
  621. nixgroup.data_arrays.append(da)
  622. neoname = anasig.name if anasig.name is not None else ""
  623. metadata["neo_name"] = neoname
  624. if anasig.annotations:
  625. for k, v in anasig.annotations.items():
  626. self._write_property(metadata, k, v)
  627. self._signal_map[nix_name] = nixdas
  628. def _write_irregularlysampledsignal(self, irsig, nixblock, nixgroup):
  629. """
  630. Convert the provided ``irsig`` (IrregularlySampledSignal) to a list of
  631. NIX DataArray objects and write them to the NIX file at the location.
  632. All DataArray objects created from the same IrregularlySampledSignal
  633. have their metadata section point to the same object.
  634. :param irsig: The Neo IrregularlySampledSignal to be written
  635. :param nixblock: NIX Block where the DataArrays will be created
  636. :param nixgroup: NIX Group where the DataArrays will be attached
  637. """
  638. if "nix_name" in irsig.annotations:
  639. nix_name = irsig.annotations["nix_name"]
  640. else:
  641. nix_name = "neo.irregularlysampledsignal.{}".format(
  642. self._generate_nix_name()
  643. )
  644. irsig.annotate(nix_name=nix_name)
  645. if "{}.0".format(nix_name) in nixblock.data_arrays and nixgroup:
  646. # IrregularlySampledSignal is in multiple Segments.
  647. # Append DataArrays to Group and return.
  648. dalist = list()
  649. for idx in itertools.count():
  650. daname = "{}.{}".format(nix_name, idx)
  651. if daname in nixblock.data_arrays:
  652. dalist.append(nixblock.data_arrays[daname])
  653. else:
  654. break
  655. nixgroup.data_arrays.extend(dalist)
  656. return
  657. data = np.transpose(irsig[:].magnitude)
  658. parentmd = nixgroup.metadata if nixgroup else nixblock.metadata
  659. metadata = parentmd.create_section(
  660. nix_name, "neo.irregularlysampledsignal.metadata"
  661. )
  662. nixdas = list()
  663. for idx, row in enumerate(data):
  664. daname = "{}.{}".format(nix_name, idx)
  665. da = nixblock.create_data_array(
  666. daname, "neo.irregularlysampledsignal", data=row
  667. )
  668. da.metadata = metadata
  669. da.definition = irsig.description
  670. da.unit = units_to_string(irsig.units)
  671. timedim = da.append_range_dimension(irsig.times.magnitude)
  672. timedim.unit = units_to_string(irsig.times.units)
  673. timedim.label = "time"
  674. nixdas.append(da)
  675. if nixgroup:
  676. nixgroup.data_arrays.append(da)
  677. neoname = irsig.name if irsig.name is not None else ""
  678. metadata["neo_name"] = neoname
  679. if irsig.annotations:
  680. for k, v in irsig.annotations.items():
  681. self._write_property(metadata, k, v)
  682. self._signal_map[nix_name] = nixdas
  683. def _write_event(self, event, nixblock, nixgroup):
  684. """
  685. Convert the provided Neo Event to a NIX MultiTag and write it to the
  686. NIX file.
  687. :param event: The Neo Event to be written
  688. :param nixblock: NIX Block where the MultiTag will be created
  689. :param nixgroup: NIX Group where the MultiTag will be attached
  690. """
  691. if "nix_name" in event.annotations:
  692. nix_name = event.annotations["nix_name"]
  693. else:
  694. nix_name = "neo.event.{}".format(self._generate_nix_name())
  695. event.annotate(nix_name=nix_name)
  696. if nix_name in nixblock.multi_tags:
  697. # Event is in multiple Segments. Append to Group and return.
  698. mt = nixblock.multi_tags[nix_name]
  699. nixgroup.multi_tags.append(mt)
  700. return
  701. times = event.times.magnitude
  702. units = units_to_string(event.times.units)
  703. timesda = nixblock.create_data_array(
  704. "{}.times".format(nix_name), "neo.event.times", data=times
  705. )
  706. timesda.unit = units
  707. nixmt = nixblock.create_multi_tag(nix_name, "neo.event",
  708. positions=timesda)
  709. nixmt.metadata = nixgroup.metadata.create_section(
  710. nix_name, "neo.event.metadata"
  711. )
  712. metadata = nixmt.metadata
  713. labeldim = timesda.append_set_dimension()
  714. labeldim.labels = event.labels
  715. neoname = event.name if event.name is not None else ""
  716. metadata["neo_name"] = neoname
  717. nixmt.definition = event.description
  718. if event.annotations:
  719. for k, v in event.annotations.items():
  720. self._write_property(metadata, k, v)
  721. nixgroup.multi_tags.append(nixmt)
  722. # reference all AnalogSignals and IrregularlySampledSignals in Group
  723. for da in nixgroup.data_arrays:
  724. if da.type in ("neo.analogsignal", "neo.irregularlysampledsignal"):
  725. nixmt.references.append(da)
  726. def _write_epoch(self, epoch, nixblock, nixgroup):
  727. """
  728. Convert the provided Neo Epoch to a NIX MultiTag and write it to the
  729. NIX file.
  730. :param epoch: The Neo Epoch to be written
  731. :param nixblock: NIX Block where the MultiTag will be created
  732. :param nixgroup: NIX Group where the MultiTag will be attached
  733. """
  734. if "nix_name" in epoch.annotations:
  735. nix_name = epoch.annotations["nix_name"]
  736. else:
  737. nix_name = "neo.epoch.{}".format(self._generate_nix_name())
  738. epoch.annotate(nix_name=nix_name)
  739. if nix_name in nixblock.multi_tags:
  740. # Epoch is in multiple Segments. Append to Group and return.
  741. mt = nixblock.multi_tags[nix_name]
  742. nixgroup.multi_tags.append(mt)
  743. return
  744. times = epoch.times.magnitude
  745. tunits = units_to_string(epoch.times.units)
  746. durations = epoch.durations.magnitude
  747. dunits = units_to_string(epoch.durations.units)
  748. timesda = nixblock.create_data_array(
  749. "{}.times".format(nix_name), "neo.epoch.times", data=times
  750. )
  751. timesda.unit = tunits
  752. nixmt = nixblock.create_multi_tag(nix_name, "neo.epoch",
  753. positions=timesda)
  754. durada = nixblock.create_data_array(
  755. "{}.durations".format(nix_name), "neo.epoch.durations",
  756. data=durations
  757. )
  758. durada.unit = dunits
  759. nixmt.extents = durada
  760. nixmt.metadata = nixgroup.metadata.create_section(
  761. nix_name, "neo.epoch.metadata"
  762. )
  763. metadata = nixmt.metadata
  764. labeldim = timesda.append_set_dimension()
  765. labeldim.labels = epoch.labels
  766. neoname = epoch.name if epoch.name is not None else ""
  767. metadata["neo_name"] = neoname
  768. nixmt.definition = epoch.description
  769. if epoch.annotations:
  770. for k, v in epoch.annotations.items():
  771. self._write_property(metadata, k, v)
  772. nixgroup.multi_tags.append(nixmt)
  773. # reference all AnalogSignals and IrregularlySampledSignals in Group
  774. for da in nixgroup.data_arrays:
  775. if da.type in ("neo.analogsignal", "neo.irregularlysampledsignal"):
  776. nixmt.references.append(da)
  777. def _write_spiketrain(self, spiketrain, nixblock, nixgroup):
  778. """
  779. Convert the provided Neo SpikeTrain to a NIX MultiTag and write it to
  780. the NIX file.
  781. :param spiketrain: The Neo SpikeTrain to be written
  782. :param nixblock: NIX Block where the MultiTag will be created
  783. :param nixgroup: NIX Group where the MultiTag will be attached
  784. """
  785. if "nix_name" in spiketrain.annotations:
  786. nix_name = spiketrain.annotations["nix_name"]
  787. else:
  788. nix_name = "neo.spiketrain.{}".format(self._generate_nix_name())
  789. spiketrain.annotate(nix_name=nix_name)
  790. if nix_name in nixblock.multi_tags and nixgroup:
  791. # SpikeTrain is in multiple Segments. Append to Group and return.
  792. mt = nixblock.multi_tags[nix_name]
  793. nixgroup.multi_tags.append(mt)
  794. return
  795. times = spiketrain.times.magnitude
  796. tunits = units_to_string(spiketrain.times.units)
  797. timesda = nixblock.create_data_array(
  798. "{}.times".format(nix_name), "neo.spiketrain.times", data=times
  799. )
  800. timesda.unit = tunits
  801. nixmt = nixblock.create_multi_tag(nix_name, "neo.spiketrain",
  802. positions=timesda)
  803. parentmd = nixgroup.metadata if nixgroup else nixblock.metadata
  804. nixmt.metadata = parentmd.create_section(nix_name,
  805. "neo.spiketrain.metadata")
  806. metadata = nixmt.metadata
  807. neoname = spiketrain.name if spiketrain.name is not None else ""
  808. metadata["neo_name"] = neoname
  809. nixmt.definition = spiketrain.description
  810. self._write_property(metadata, "t_start", spiketrain.t_start)
  811. self._write_property(metadata, "t_stop", spiketrain.t_stop)
  812. if spiketrain.annotations:
  813. for k, v in spiketrain.annotations.items():
  814. self._write_property(metadata, k, v)
  815. if nixgroup:
  816. nixgroup.multi_tags.append(nixmt)
  817. if spiketrain.waveforms is not None:
  818. wfdata = list(wf.magnitude for wf in
  819. list(wfgroup for wfgroup in
  820. spiketrain.waveforms))
  821. wfunits = units_to_string(spiketrain.waveforms.units)
  822. wfda = nixblock.create_data_array(
  823. "{}.waveforms".format(nix_name), "neo.waveforms",
  824. data=wfdata
  825. )
  826. wfda.unit = wfunits
  827. wfda.metadata = nixmt.metadata.create_section(
  828. wfda.name, "neo.waveforms.metadata"
  829. )
  830. nixmt.create_feature(wfda, nix.LinkType.Indexed)
  831. # TODO: Move time dimension first for PR #457
  832. # https://github.com/NeuralEnsemble/python-neo/pull/457
  833. wfda.append_set_dimension()
  834. wfda.append_set_dimension()
  835. wftime = wfda.append_sampled_dimension(
  836. spiketrain.sampling_period.magnitude.item()
  837. )
  838. wftime.unit = units_to_string(spiketrain.sampling_period.units)
  839. wftime.label = "time"
  840. if spiketrain.left_sweep is not None:
  841. self._write_property(wfda.metadata, "left_sweep",
  842. spiketrain.left_sweep)
  843. def _write_unit(self, neounit, nixchxsource):
  844. """
  845. Convert the provided Neo Unit to a NIX Source and write it to the
  846. NIX file.
  847. :param neounit: The Neo Unit to be written
  848. :param nixchxsource: NIX Source (ChannelIndex) where the new Source
  849. (Unit) will be created
  850. """
  851. if "nix_name" in neounit.annotations:
  852. nix_name = neounit.annotations["nix_name"]
  853. else:
  854. nix_name = "neo.unit.{}".format(self._generate_nix_name())
  855. neounit.annotate(nix_name=nix_name)
  856. nixunitsource = nixchxsource.create_source(nix_name,
  857. "neo.unit")
  858. nixunitsource.metadata = nixchxsource.metadata.create_section(
  859. nix_name, "neo.unit.metadata"
  860. )
  861. metadata = nixunitsource.metadata
  862. neoname = neounit.name if neounit.name is not None else ""
  863. metadata["neo_name"] = neoname
  864. nixunitsource.definition = neounit.description
  865. if neounit.annotations:
  866. for k, v in neounit.annotations.items():
  867. self._write_property(metadata, k, v)
  868. def _create_source_links(self, neoblock, nixblock):
  869. """
  870. Creates references between objects in a NIX Block to store the
  871. references in the Neo ChannelIndex and Unit objects.
  872. Specifically:
  873. - If a Neo ChannelIndex references a Neo AnalogSignal or
  874. IrregularlySampledSignal, the corresponding signal DataArray will
  875. reference the corresponding NIX Source object which represents the
  876. Neo ChannelIndex.
  877. - If a Neo Unit references a Neo SpikeTrain, the corresponding
  878. MultiTag will reference the NIX Source objects which represent the
  879. Neo Unit and its parent ChannelIndex.
  880. The two arguments must represent the same Block in each corresponding
  881. format.
  882. Neo objects that have not been converted yet (i.e., AnalogSignal,
  883. IrregularlySampledSignal, or SpikeTrain objects that are not attached
  884. to a Segment) are created on the nixblock.
  885. :param neoblock: A Neo Block object
  886. :param nixblock: The corresponding NIX Block
  887. """
  888. for chx in neoblock.channel_indexes:
  889. signames = []
  890. for asig in chx.analogsignals:
  891. if not ("nix_name" in asig.annotations and
  892. asig.annotations["nix_name"] in self._signal_map):
  893. self._write_analogsignal(asig, nixblock, None)
  894. signames.append(asig.annotations["nix_name"])
  895. for isig in chx.irregularlysampledsignals:
  896. if not ("nix_name" in isig.annotations and
  897. isig.annotations["nix_name"] in self._signal_map):
  898. self._write_irregularlysampledsignal(isig, nixblock, None)
  899. signames.append(isig.annotations["nix_name"])
  900. chxsource = nixblock.sources[chx.annotations["nix_name"]]
  901. for name in signames:
  902. for da in self._signal_map[name]:
  903. da.sources.append(chxsource)
  904. for unit in chx.units:
  905. unitsource = chxsource.sources[unit.annotations["nix_name"]]
  906. for st in unit.spiketrains:
  907. if not ("nix_name" in st.annotations and
  908. st.annotations["nix_name"] in nixblock.multi_tags):
  909. self._write_spiketrain(st, nixblock, None)
  910. stmt = nixblock.multi_tags[st.annotations["nix_name"]]
  911. stmt.sources.append(chxsource)
  912. stmt.sources.append(unitsource)
  913. @staticmethod
  914. def _generate_nix_name():
  915. return uuid4().hex
  916. def _write_property(self, section, name, v):
  917. """
  918. Create a metadata property with a given name and value on the provided
  919. metadata section.
  920. :param section: The metadata section to hold the new property
  921. :param name: The name of the property
  922. :param v: The value to write
  923. :return: The newly created property
  924. """
  925. if isinstance(v, pq.Quantity):
  926. if len(v.shape):
  927. section.create_property(name, tuple(v.magnitude))
  928. else:
  929. section.create_property(name, v.magnitude.item())
  930. section.props[name].unit = str(v.dimensionality)
  931. elif isinstance(v, datetime):
  932. section.create_property(name, calculate_timestamp(v))
  933. elif isinstance(v, string_types):
  934. if len(v):
  935. section.create_property(name, v)
  936. else:
  937. section.create_property(name, nix.DataType.String)
  938. elif isinstance(v, bytes):
  939. section.create_property(name, v.decode())
  940. elif isinstance(v, Iterable):
  941. values = []
  942. unit = None
  943. definition = None
  944. if len(v) == 0:
  945. # NIX supports empty properties but dtype must be specified
  946. # Defaulting to String and using definition to signify empty
  947. # iterable as opposed to empty string
  948. values = nix.DataType.String
  949. definition = EMPTYANNOTATION
  950. elif hasattr(v, "ndim") and v.ndim == 0:
  951. values = v.item()
  952. if isinstance(v, pq.Quantity):
  953. unit = str(v.dimensionality)
  954. else:
  955. for item in v:
  956. if isinstance(item, string_types):
  957. item = item
  958. elif isinstance(item, pq.Quantity):
  959. unit = str(item.dimensionality)
  960. item = item.magnitude.item()
  961. elif isinstance(item, Iterable):
  962. self.logger.warn("Multidimensional arrays and nested "
  963. "containers are not currently "
  964. "supported when writing to NIX.")
  965. return None
  966. else:
  967. item = item
  968. values.append(item)
  969. section.create_property(name, values)
  970. section.props[name].unit = unit
  971. if definition:
  972. section.props[name].definition = definition
  973. elif type(v).__module__ == "numpy":
  974. section.create_property(name, v.item())
  975. else:
  976. section.create_property(name, v)
  977. return section.props[name]
  978. @staticmethod
  979. def _nix_attr_to_neo(nix_obj):
  980. """
  981. Reads common attributes and metadata from a NIX object and populates a
  982. dictionary with Neo-compatible attributes and annotations.
  983. Common attributes: neo_name, nix_name, description,
  984. file_datetime (if applicable).
  985. Metadata: For properties that specify a 'unit', a Quantity object is
  986. created.
  987. """
  988. neo_attrs = dict()
  989. neo_attrs["nix_name"] = nix_obj.name
  990. neo_attrs["description"] = stringify(nix_obj.definition)
  991. if nix_obj.metadata:
  992. for prop in nix_obj.metadata.inherited_properties():
  993. values = prop.values
  994. if prop.unit:
  995. units = prop.unit
  996. values = create_quantity(values, units)
  997. if not len(values):
  998. if prop.definition == EMPTYANNOTATION:
  999. values = list()
  1000. elif prop.data_type == nix.DataType.String:
  1001. values = ""
  1002. elif len(values) == 1:
  1003. values = values[0]
  1004. else:
  1005. values = list(values)
  1006. neo_attrs[prop.name] = values
  1007. neo_attrs["name"] = stringify(neo_attrs.get("neo_name"))
  1008. if "file_datetime" in neo_attrs:
  1009. neo_attrs["file_datetime"] = datetime.fromtimestamp(
  1010. neo_attrs["file_datetime"]
  1011. )
  1012. return neo_attrs
  1013. @staticmethod
  1014. def _group_signals(dataarrays):
  1015. """
  1016. Groups data arrays that were generated by the same Neo Signal object.
  1017. The collection can contain both AnalogSignals and
  1018. IrregularlySampledSignals.
  1019. :param dataarrays: A collection of DataArray objects to group
  1020. :return: A dictionary mapping a base name to a list of DataArrays which
  1021. belong to the same Signal
  1022. """
  1023. # now start grouping
  1024. groups = OrderedDict()
  1025. for da in dataarrays:
  1026. basename = ".".join(da.name.split(".")[:-1])
  1027. if basename not in groups:
  1028. groups[basename] = list()
  1029. groups[basename].append(da)
  1030. return groups
  1031. @staticmethod
  1032. def _get_time_dimension(obj):
  1033. for dim in obj.dimensions:
  1034. if hasattr(dim, "label") and dim.label == "time":
  1035. return dim
  1036. return None
  1037. def _use_obj_names(self, blocks):
  1038. errmsg = "use_obj_names enabled: found conflict or anonymous object"
  1039. allobjs = []
  1040. def check_unique(objs):
  1041. names = list(o.name for o in objs)
  1042. if None in names or "" in names:
  1043. raise ValueError(names)
  1044. if len(names) != len(set(names)):
  1045. self._names_ok = False
  1046. raise ValueError(names)
  1047. # collect objs if ok
  1048. allobjs.extend(objs)
  1049. try:
  1050. check_unique(blocks)
  1051. except ValueError as ve:
  1052. raise ValueError("{} in Blocks {}".format(errmsg, ve))
  1053. for blk in blocks:
  1054. try:
  1055. # Segments
  1056. check_unique(blk.segments)
  1057. except ValueError as ve:
  1058. raise ValueError("{} at Block '{}' > segments > "
  1059. "{}".format(errmsg, blk.name, ve))
  1060. # collect all signals in all segments
  1061. signals = []
  1062. # collect all events, epochs, and spiketrains in all segments
  1063. eests = []
  1064. for seg in blk.segments:
  1065. signals.extend(seg.analogsignals)
  1066. signals.extend(seg.irregularlysampledsignals)
  1067. eests.extend(seg.events)
  1068. eests.extend(seg.epochs)
  1069. eests.extend(seg.spiketrains)
  1070. try:
  1071. # AnalogSignals and IrregularlySampledSignals
  1072. check_unique(signals)
  1073. except ValueError as ve:
  1074. raise ValueError(
  1075. "{} in Signal names "
  1076. "of Block '{}' {}".format(errmsg, blk.name, ve)
  1077. )
  1078. try:
  1079. # Events, Epochs, and SpikeTrains
  1080. check_unique(eests)
  1081. except ValueError as ve:
  1082. raise ValueError(
  1083. "{} in Event, Epoch, and Spiketrain names "
  1084. "of Block '{}' {}".format(errmsg, blk.name, ve)
  1085. )
  1086. try:
  1087. # ChannelIndexes
  1088. check_unique(blk.channel_indexes)
  1089. except ValueError as ve:
  1090. raise ValueError(
  1091. "{} in ChannelIndex names "
  1092. "of Block '{}' {}".format(errmsg, blk.name, ve)
  1093. )
  1094. for chx in blk.channel_indexes:
  1095. try:
  1096. check_unique(chx.units)
  1097. except ValueError as ve:
  1098. raise ValueError(
  1099. "{} in Unit names of Block "
  1100. "'{}' > ChannelIndex '{}' {}".format(errmsg, blk.name,
  1101. chx.name, ve)
  1102. )
  1103. # names are OK: assign annotations
  1104. for o in allobjs:
  1105. o.annotations["nix_name"] = o.name
  1106. def close(self):
  1107. """
  1108. Closes the open nix file and resets maps.
  1109. """
  1110. if (hasattr(self, "nix_file") and
  1111. self.nix_file and self.nix_file.is_open()):
  1112. self.nix_file.close()
  1113. self.nix_file = None
  1114. self._neo_map = None
  1115. self._ref_map = None
  1116. self._signal_map = None
  1117. self._block_read_counter = None
  1118. def __del__(self):
  1119. self.close()