1
1

nixio.py 65 KB


  1. # Copyright (c) 2016, German Neuroinformatics Node (G-Node)
  2. # Achilleas Koutsou <achilleas.k@gmail.com>
  3. #
  4. # All rights reserved.
  5. #
  6. # Redistribution and use in source and binary forms, with or without
  7. # modification, are permitted under the terms of the BSD License. See
  8. # LICENSE file in the root of the Project.
  9. """
  10. Module for reading data from files in the NIX format.
  11. Author: Achilleas Koutsou
  12. This IO supports both writing and reading of NIX files. Reading is supported
  13. only if the NIX file was created using this IO.
  14. Details on how the Neo object tree is mapped to NIX, as well as details on
  15. behaviours specific to this IO, can be found on the wiki of the G-Node fork of
  16. Neo: https://github.com/G-Node/python-neo/wiki
  17. """
  18. from datetime import date, time, datetime
  19. from collections.abc import Iterable
  20. from collections import OrderedDict
  21. import itertools
  22. from uuid import uuid4
  23. import warnings
  24. from distutils.version import LooseVersion as Version
  25. from itertools import chain
  26. import quantities as pq
  27. import numpy as np
  28. from .baseio import BaseIO
  29. from ..core import (Block, Segment, ChannelIndex, AnalogSignal,
  30. IrregularlySampledSignal, Epoch, Event, SpikeTrain,
  31. ImageSequence, Unit, ChannelView, Group)
  32. from ..io.proxyobjects import BaseProxy
  33. from ..version import version as neover
  34. try:
  35. import nixio as nix
  36. HAVE_NIX = True
  37. except ImportError:
  38. HAVE_NIX = False
  39. datetime_types = (date, time, datetime)
  40. EMPTYANNOTATION = "EMPTYLIST"
  41. ARRAYANNOTATION = "ARRAYANNOTATION"
  42. DATETIMEANNOTATION = "DATETIME"
  43. DATEANNOTATION = "DATE"
  44. TIMEANNOTATION = "TIME"
  45. MIN_NIX_VER = Version("1.5.0")
  46. datefmt = "%Y-%m-%d"
  47. timefmt = "%H:%M:%S.%f"
  48. datetimefmt = datefmt + "T" + timefmt
  49. def stringify(value):
  50. if value is None:
  51. return value
  52. if isinstance(value, bytes):
  53. value = value.decode()
  54. return str(value)
  55. def create_quantity(values, unitstr):
  56. if "*" in unitstr:
  57. unit = pq.CompoundUnit(stringify(unitstr))
  58. else:
  59. unit = unitstr
  60. return pq.Quantity(values, unit)
  61. def units_to_string(pqunit):
  62. dim = str(pqunit.dimensionality)
  63. if dim.startswith("(") and dim.endswith(")"):
  64. return dim.strip("()")
  65. return dim
  66. def dt_to_nix(dt):
  67. """
  68. Converts date, time, and datetime objects to an ISO string representation
  69. appropriate for storing in NIX. Returns the converted value and the
  70. annotation type definition for converting back to the original value
  71. type.
  72. """
  73. if isinstance(dt, datetime):
  74. return dt.strftime(datetimefmt), DATETIMEANNOTATION
  75. if isinstance(dt, date):
  76. return dt.strftime(datefmt), DATEANNOTATION
  77. if isinstance(dt, time):
  78. return dt.strftime(timefmt), TIMEANNOTATION
  79. # Unknown: returning as is
  80. return dt
  81. def dt_from_nix(nixdt, annotype):
  82. """
  83. Inverse function of 'dt_to_nix()'. Requires the stored annotation type to
  84. distinguish between the three source types (date, time, and datetime).
  85. """
  86. if annotype == DATEANNOTATION:
  87. dt = datetime.strptime(nixdt, datefmt)
  88. return dt.date()
  89. if annotype == TIMEANNOTATION:
  90. dt = datetime.strptime(nixdt, timefmt)
  91. return dt.time()
  92. if annotype == DATETIMEANNOTATION:
  93. dt = datetime.strptime(nixdt, datetimefmt)
  94. return dt
  95. # Unknown type: older (or newer) IO version?
  96. # Returning as is to avoid data loss.
  97. return nixdt
  98. def check_nix_version():
  99. if not HAVE_NIX:
  100. raise Exception(
  101. "Failed to import NIX. "
  102. "The NixIO requires the Python package for NIX "
  103. "(nixio on PyPi). Try `pip install nixio`."
  104. )
  105. # nixio version numbers have a 'v' prefix which breaks the comparison
  106. nixverstr = nix.__version__.lstrip("v")
  107. try:
  108. nixver = Version(nixverstr)
  109. except ValueError:
  110. warnings.warn(
  111. f"Could not understand NIX Python version {nixverstr}. "
  112. f"The NixIO requires version {MIN_NIX_VER} of the Python package for NIX. "
  113. "The IO may not work correctly."
  114. )
  115. return
  116. if nixver < MIN_NIX_VER:
  117. raise Exception(
  118. "NIX version not supported. "
  119. f"The NixIO requires version {MIN_NIX_VER} or higher of the Python package "
  120. f"for NIX. Found version {nixverstr}"
  121. )
  122. class NixIO(BaseIO):
  123. """
  124. Class for reading and writing NIX files.
  125. """
  126. is_readable = True
  127. is_writable = True
  128. supported_objects = [Block, Segment, ChannelIndex, Group, ChannelView,
  129. AnalogSignal, IrregularlySampledSignal,
  130. Epoch, Event, SpikeTrain, Unit]
  131. readable_objects = [Block]
  132. writeable_objects = [Block]
  133. name = "NIX"
  134. extensions = ["h5", "nix"]
  135. mode = "file"
  136. def __init__(self, filename, mode="rw"):
  137. """
  138. Initialise IO instance and NIX file.
  139. :param filename: Full path to the file
  140. """
  141. check_nix_version()
  142. BaseIO.__init__(self, filename)
  143. self.filename = filename
  144. if mode == "ro":
  145. filemode = nix.FileMode.ReadOnly
  146. elif mode == "rw":
  147. filemode = nix.FileMode.ReadWrite
  148. elif mode == "ow":
  149. filemode = nix.FileMode.Overwrite
  150. else:
  151. raise ValueError(f"Invalid mode specified '{mode}'. "
  152. "Valid modes: 'ro' (ReadOnly)', 'rw' (ReadWrite),"
  153. " 'ow' (Overwrite).")
  154. self.nix_file = nix.File.open(self.filename, filemode)
  155. if self.nix_file.mode == nix.FileMode.ReadOnly:
  156. self._file_version = '0.5.2'
  157. if "neo" in self.nix_file.sections:
  158. self._file_version = self.nix_file.sections["neo"]["version"]
  159. elif self.nix_file.mode == nix.FileMode.ReadWrite:
  160. if "neo" in self.nix_file.sections:
  161. self._file_version = self.nix_file.sections["neo"]["version"]
  162. else:
  163. self._file_version = '0.5.2'
  164. filemd = self.nix_file.create_section("neo", "neo.metadata")
  165. filemd["version"] = self._file_version
  166. else:
  167. # new file
  168. filemd = self.nix_file.create_section("neo", "neo.metadata")
  169. filemd["version"] = neover
  170. self._file_version = neover
  171. self._block_read_counter = 0
  172. # helper maps
  173. self._neo_map = dict()
  174. self._ref_map = dict()
  175. self._signal_map = dict()
  176. self._view_map = dict()
  177. # _names_ok is used to guard against name check duplication
  178. self._names_ok = False
  179. def __enter__(self):
  180. return self
  181. def __exit__(self, *args):
  182. self.close()
  183. def read_all_blocks(self, lazy=False):
  184. if lazy:
  185. raise Exception("Lazy loading is not supported for NixIO")
  186. return list(self._nix_to_neo_block(blk)
  187. for blk in self.nix_file.blocks)
  188. def read_block(self, index=None, nixname=None, neoname=None, lazy=False):
  189. """
  190. Loads a Block from the NIX file along with all contained child objects
  191. and returns the equivalent Neo Block.
  192. The Block to read can be specified in one of three ways:
  193. - Index (position) in the file
  194. - Name of the NIX Block (see [...] for details on the naming)
  195. - Name of the original Neo Block
  196. If no arguments are specified, the first Block is returned and
  197. consecutive calls to the function return the next Block in the file.
  198. After all Blocks have been loaded this way, the function returns None.
  199. If more than one argument is specified, the precedence order is:
  200. index, nixname, neoname
  201. Note that Neo objects can be anonymous or have non-unique names,
  202. so specifying a Neo name may be ambiguous.
  203. See also :meth:`NixIO.iter_blocks`.
  204. :param index: The position of the Block to be loaded (creation order)
  205. :param nixname: The name of the Block in NIX
  206. :param neoname: The name of the original Neo Block
  207. """
  208. if lazy:
  209. raise Exception("Lazy loading is not supported for NixIO")
  210. nix_block = None
  211. if index is not None:
  212. nix_block = self.nix_file.blocks[index]
  213. elif nixname is not None:
  214. nix_block = self.nix_file.blocks[nixname]
  215. elif neoname is not None:
  216. for blk in self.nix_file.blocks:
  217. if ("neo_name" in blk.metadata
  218. and blk.metadata["neo_name"] == neoname):
  219. nix_block = blk
  220. break
  221. else:
  222. raise KeyError(f"Block with Neo name '{neoname}' does not exist")
  223. else:
  224. index = self._block_read_counter
  225. if index >= len(self.nix_file.blocks):
  226. return None
  227. nix_block = self.nix_file.blocks[index]
  228. self._block_read_counter += 1
  229. return self._nix_to_neo_block(nix_block)
  230. def iter_blocks(self):
  231. """
  232. Returns an iterator which can be used to consecutively load and convert
  233. all Blocks from the NIX File.
  234. """
  235. for blk in self.nix_file.blocks:
  236. yield self._nix_to_neo_block(blk)
  237. def _nix_to_neo_block(self, nix_block):
  238. neo_attrs = self._nix_attr_to_neo(nix_block)
  239. neo_block = Block(**neo_attrs)
  240. neo_block.rec_datetime = datetime.fromtimestamp(nix_block.created_at)
  241. # descend into Groups
  242. groups_to_resolve = []
  243. for grp in nix_block.groups:
  244. if grp.type == "neo.segment":
  245. newseg = self._nix_to_neo_segment(grp)
  246. neo_block.segments.append(newseg)
  247. # parent reference
  248. newseg.block = neo_block
  249. elif grp.type == "neo.group":
  250. newgrp, parent_name = self._nix_to_neo_group(grp)
  251. assert parent_name is None
  252. neo_block.groups.append(newgrp)
  253. # parent reference
  254. newgrp.block = neo_block
  255. elif grp.type == "neo.subgroup":
  256. newgrp, parent_name = self._nix_to_neo_group(grp)
  257. groups_to_resolve.append((newgrp, parent_name))
  258. else:
  259. raise Exception("Unexpected group type")
  260. # link subgroups to parents
  261. for newgrp, parent_name in groups_to_resolve:
  262. parent = self._neo_map[parent_name]
  263. parent.groups.append(newgrp)
  264. # find free floating (Groupless) signals and spiketrains
  265. blockdas = self._group_signals(nix_block.data_arrays)
  266. for name, das in blockdas.items():
  267. if name not in self._neo_map:
  268. if das[0].type == "neo.analogsignal":
  269. self._nix_to_neo_analogsignal(das)
  270. elif das[0].type == "neo.irregularlysampledsignal":
  271. self._nix_to_neo_irregularlysampledsignal(das)
  272. elif das[0].type == "neo.imagesequence":
  273. self._nix_to_neo_imagesequence(das)
  274. for mt in nix_block.multi_tags:
  275. if mt.type == "neo.spiketrain" and mt.name not in self._neo_map:
  276. self._nix_to_neo_spiketrain(mt)
  277. # descend into Sources
  278. for src in nix_block.sources:
  279. newchx = self._nix_to_neo_channelindex(src)
  280. neo_block.channel_indexes.append(newchx)
  281. # parent reference
  282. newchx.block = neo_block
  283. # create object links
  284. neo_block.create_relationship()
  285. # reset maps
  286. self._neo_map = dict()
  287. self._ref_map = dict()
  288. self._signal_map = dict()
  289. self._view_map = dict()
  290. return neo_block
  291. def _nix_to_neo_segment(self, nix_group):
  292. neo_attrs = self._nix_attr_to_neo(nix_group)
  293. neo_segment = Segment(**neo_attrs)
  294. neo_segment.rec_datetime = datetime.fromtimestamp(nix_group.created_at)
  295. self._neo_map[nix_group.name] = neo_segment
  296. # this will probably get all the DAs anyway, but if we change any part
  297. # of the mapping to add other kinds of DataArrays to a group, such as
  298. # MultiTag positions and extents, this filter will be necessary
  299. dataarrays = list(filter(
  300. lambda da: da.type in ("neo.analogsignal",
  301. "neo.irregularlysampledsignal",
  302. "neo.imagesequence",),
  303. nix_group.data_arrays))
  304. dataarrays = self._group_signals(dataarrays)
  305. # descend into DataArrays
  306. for name, das in dataarrays.items():
  307. if das[0].type == "neo.analogsignal":
  308. newasig = self._nix_to_neo_analogsignal(das)
  309. neo_segment.analogsignals.append(newasig)
  310. # parent reference
  311. newasig.segment = neo_segment
  312. elif das[0].type == "neo.irregularlysampledsignal":
  313. newisig = self._nix_to_neo_irregularlysampledsignal(das)
  314. neo_segment.irregularlysampledsignals.append(newisig)
  315. # parent reference
  316. newisig.segment = neo_segment
  317. elif das[0].type == "neo.imagesequence":
  318. new_imgseq = self._nix_to_neo_imagesequence(das)
  319. neo_segment.imagesequences.append(new_imgseq)
  320. # parent reference
  321. new_imgseq.segment = neo_segment
  322. # descend into MultiTags
  323. for mtag in nix_group.multi_tags:
  324. if mtag.type == "neo.event":
  325. newevent = self._nix_to_neo_event(mtag)
  326. neo_segment.events.append(newevent)
  327. # parent reference
  328. newevent.segment = neo_segment
  329. elif mtag.type == "neo.epoch":
  330. newepoch = self._nix_to_neo_epoch(mtag)
  331. neo_segment.epochs.append(newepoch)
  332. # parent reference
  333. newepoch.segment = neo_segment
  334. elif mtag.type == "neo.spiketrain":
  335. newst = self._nix_to_neo_spiketrain(mtag)
  336. neo_segment.spiketrains.append(newst)
  337. # parent reference
  338. newst.segment = neo_segment
  339. return neo_segment
  340. def _nix_to_neo_group(self, nix_group):
  341. neo_attrs = self._nix_attr_to_neo(nix_group)
  342. parent_name = neo_attrs.pop("neo_parent", None)
  343. neo_group = Group(**neo_attrs)
  344. self._neo_map[nix_group.name] = neo_group
  345. dataarrays = list(filter(
  346. lambda da: da.type in ("neo.analogsignal",
  347. "neo.irregularlysampledsignal",
  348. "neo.imagesequence",),
  349. nix_group.data_arrays))
  350. dataarrays = self._group_signals(dataarrays)
  351. # descend into DataArrays
  352. for name in dataarrays:
  353. obj = self._neo_map[name]
  354. neo_group.add(obj)
  355. # descend into MultiTags
  356. for mtag in nix_group.multi_tags:
  357. if mtag.type == "neo.channelview" and mtag.name not in self._neo_map:
  358. self._nix_to_neo_channelview(mtag)
  359. obj = self._neo_map[mtag.name]
  360. neo_group.add(obj)
  361. return neo_group, parent_name
  362. def _nix_to_neo_channelindex(self, nix_source):
  363. neo_attrs = self._nix_attr_to_neo(nix_source)
  364. channels = list(self._nix_attr_to_neo(c) for c in nix_source.sources
  365. if c.type == "neo.channelindex")
  366. neo_attrs["index"] = np.array([c["index"] for c in channels])
  367. if len(channels):
  368. chan_names = list(c["name"] for c in channels
  369. if "name" in c and c["name"] is not None)
  370. chan_ids = list(c["channel_id"] for c in channels if "channel_id" in c)
  371. if chan_names:
  372. neo_attrs["channel_names"] = chan_names
  373. if chan_ids:
  374. neo_attrs["channel_ids"] = chan_ids
  375. if "coordinates" in channels[0]:
  376. neo_attrs["coordinates"] = list(c["coordinates"] for c in channels)
  377. neo_chx = ChannelIndex(**neo_attrs)
  378. self._neo_map[nix_source.name] = neo_chx
  379. # create references to Signals
  380. signals = self._ref_map.get(nix_source.name, list())
  381. for sig in signals:
  382. if isinstance(sig, AnalogSignal):
  383. neo_chx.analogsignals.append(sig)
  384. elif isinstance(sig, IrregularlySampledSignal):
  385. neo_chx.irregularlysampledsignals.append(sig)
  386. # else error?
  387. # descend into Sources
  388. for src in nix_source.sources:
  389. if src.type == "neo.unit":
  390. newunit = self._nix_to_neo_unit(src)
  391. neo_chx.units.append(newunit)
  392. # parent reference
  393. newunit.channel_index = neo_chx
  394. return neo_chx
  395. def _nix_to_neo_channelview(self, nix_mtag):
  396. neo_attrs = self._nix_attr_to_neo(nix_mtag)
  397. index = nix_mtag.positions
  398. nix_name, = self._group_signals(nix_mtag.references).keys()
  399. obj = self._neo_map[nix_name]
  400. neo_chview = ChannelView(obj, index, **neo_attrs)
  401. self._neo_map[nix_mtag.name] = neo_chview
  402. return neo_chview
  403. def _nix_to_neo_unit(self, nix_source):
  404. neo_attrs = self._nix_attr_to_neo(nix_source)
  405. neo_unit = Unit(**neo_attrs)
  406. self._neo_map[nix_source.name] = neo_unit
  407. # create references to SpikeTrains
  408. neo_unit.spiketrains.extend(self._ref_map.get(nix_source.name, list()))
  409. return neo_unit
  410. def _nix_to_neo_analogsignal(self, nix_da_group):
  411. """
  412. Convert a group of NIX DataArrays to a Neo AnalogSignal. This method
  413. expects a list of data arrays that all represent the same,
  414. multidimensional Neo AnalogSignal object.
  415. :param nix_da_group: a list of NIX DataArray objects
  416. :return: a Neo AnalogSignal object
  417. """
  418. neo_attrs = self._nix_attr_to_neo(nix_da_group[0])
  419. metadata = nix_da_group[0].metadata
  420. neo_attrs["nix_name"] = metadata.name # use the common base name
  421. unit = nix_da_group[0].unit
  422. signaldata = np.array([d[:] for d in nix_da_group]).transpose()
  423. signaldata = create_quantity(signaldata, unit)
  424. timedim = self._get_time_dimension(nix_da_group[0])
  425. sampling_period = create_quantity(timedim.sampling_interval, timedim.unit)
  426. # t_start should have been added to neo_attrs via the NIX
  427. # object's metadata. This may not be present since in older
  428. # versions, we didn't store t_start in the metadata when it
  429. # wasn't necessary, such as when the timedim.offset and unit
  430. # did not require rescaling.
  431. if "t_start" in neo_attrs:
  432. t_start = neo_attrs["t_start"]
  433. del neo_attrs["t_start"]
  434. else:
  435. t_start = create_quantity(timedim.offset, timedim.unit)
  436. neo_signal = AnalogSignal(signal=signaldata, sampling_period=sampling_period,
  437. t_start=t_start, **neo_attrs)
  438. self._neo_map[neo_attrs["nix_name"]] = neo_signal
  439. # all DAs reference the same sources
  440. srcnames = list(src.name for src in nix_da_group[0].sources)
  441. for n in srcnames:
  442. if n not in self._ref_map:
  443. self._ref_map[n] = list()
  444. self._ref_map[n].append(neo_signal)
  445. return neo_signal
  446. def _nix_to_neo_imagesequence(self, nix_da_group):
  447. """
  448. Convert a group of NIX DataArrays to a Neo ImageSequence. This method
  449. expects a list of data arrays that all represent the same,
  450. multidimensional Neo ImageSequence object.
  451. :param nix_da_group: a list of NIX DataArray objects
  452. :return: a Neo ImageSequence object
  453. """
  454. neo_attrs = self._nix_attr_to_neo(nix_da_group[0])
  455. metadata = nix_da_group[0].metadata
  456. neo_attrs["nix_name"] = metadata.name # use the common base name
  457. unit = nix_da_group[0].unit
  458. imgseq = np.array([d[:] for d in nix_da_group]).transpose()
  459. sampling_rate = neo_attrs["sampling_rate"]
  460. del neo_attrs["sampling_rate"]
  461. spatial_scale = neo_attrs["spatial_scale"]
  462. del neo_attrs["spatial_scale"]
  463. if "t_start" in neo_attrs:
  464. t_start = neo_attrs["t_start"]
  465. del neo_attrs["t_start"]
  466. else:
  467. t_start = 0.0 * pq.ms
  468. neo_seq = ImageSequence(image_data=imgseq, sampling_rate=sampling_rate,
  469. spatial_scale=spatial_scale, units=unit,
  470. t_start=t_start, **neo_attrs)
  471. self._neo_map[neo_attrs["nix_name"]] = neo_seq
  472. # all DAs reference the same sources
  473. srcnames = list(src.name for src in nix_da_group[0].sources)
  474. for n in srcnames:
  475. if n not in self._ref_map:
  476. self._ref_map[n] = list()
  477. self._ref_map[n].append(neo_seq)
  478. return neo_seq
  479. def _nix_to_neo_irregularlysampledsignal(self, nix_da_group):
  480. """
  481. Convert a group of NIX DataArrays to a Neo IrregularlySampledSignal.
  482. This method expects a list of data arrays that all represent the same,
  483. multidimensional Neo IrregularlySampledSignal object.
  484. :param nix_da_group: a list of NIX DataArray objects
  485. :return: a Neo IrregularlySampledSignal object
  486. """
  487. neo_attrs = self._nix_attr_to_neo(nix_da_group[0])
  488. metadata = nix_da_group[0].metadata
  489. neo_attrs["nix_name"] = metadata.name # use the common base name
  490. unit = nix_da_group[0].unit
  491. signaldata = np.array([d[:] for d in nix_da_group]).transpose()
  492. signaldata = create_quantity(signaldata, unit)
  493. timedim = self._get_time_dimension(nix_da_group[0])
  494. times = create_quantity(timedim.ticks, timedim.unit)
  495. neo_signal = IrregularlySampledSignal(signal=signaldata, times=times, **neo_attrs)
  496. self._neo_map[neo_attrs["nix_name"]] = neo_signal
  497. # all DAs reference the same sources
  498. srcnames = list(src.name for src in nix_da_group[0].sources)
  499. for n in srcnames:
  500. if n not in self._ref_map:
  501. self._ref_map[n] = list()
  502. self._ref_map[n].append(neo_signal)
  503. return neo_signal
  504. def _nix_to_neo_event(self, nix_mtag):
  505. neo_attrs = self._nix_attr_to_neo(nix_mtag)
  506. time_unit = nix_mtag.positions.unit
  507. times = create_quantity(nix_mtag.positions, time_unit)
  508. labels = np.array(nix_mtag.positions.dimensions[0].labels, dtype="U")
  509. neo_event = Event(times=times, labels=labels, **neo_attrs)
  510. self._neo_map[nix_mtag.name] = neo_event
  511. return neo_event
  512. def _nix_to_neo_epoch(self, nix_mtag):
  513. neo_attrs = self._nix_attr_to_neo(nix_mtag)
  514. time_unit = nix_mtag.positions.unit
  515. times = create_quantity(nix_mtag.positions, time_unit)
  516. durations = create_quantity(nix_mtag.extents, nix_mtag.extents.unit)
  517. if len(nix_mtag.positions.dimensions[0].labels) > 0:
  518. labels = np.array(nix_mtag.positions.dimensions[0].labels, dtype="U")
  519. else:
  520. labels = None
  521. neo_epoch = Epoch(times=times, durations=durations, labels=labels, **neo_attrs)
  522. self._neo_map[nix_mtag.name] = neo_epoch
  523. return neo_epoch
  524. def _nix_to_neo_spiketrain(self, nix_mtag):
  525. neo_attrs = self._nix_attr_to_neo(nix_mtag)
  526. time_unit = nix_mtag.positions.unit
  527. times = create_quantity(nix_mtag.positions, time_unit)
  528. neo_spiketrain = SpikeTrain(times=times, **neo_attrs)
  529. if nix_mtag.features:
  530. wfda = nix_mtag.features[0].data
  531. wftime = self._get_time_dimension(wfda)
  532. neo_spiketrain.waveforms = create_quantity(wfda, wfda.unit)
  533. interval_units = wftime.unit
  534. neo_spiketrain.sampling_period = create_quantity(wftime.sampling_interval,
  535. interval_units)
  536. left_sweep_units = wftime.unit
  537. if "left_sweep" in wfda.metadata:
  538. neo_spiketrain.left_sweep = create_quantity(wfda.metadata["left_sweep"],
  539. left_sweep_units)
  540. self._neo_map[nix_mtag.name] = neo_spiketrain
  541. srcnames = list(src.name for src in nix_mtag.sources)
  542. for n in srcnames:
  543. if n not in self._ref_map:
  544. self._ref_map[n] = list()
  545. self._ref_map[n].append(neo_spiketrain)
  546. return neo_spiketrain
  547. def write_all_blocks(self, neo_blocks, use_obj_names=False):
  548. """
  549. Convert all ``neo_blocks`` to the NIX equivalent and write them to the
  550. file.
  551. :param neo_blocks: List (or iterable) containing Neo blocks
  552. :param use_obj_names: If True, will not generate unique object names
  553. but will instead try to use the name of each Neo object. If these are
  554. not unique, an exception will be raised.
  555. """
  556. if use_obj_names:
  557. self._use_obj_names(neo_blocks)
  558. self._names_ok = True
  559. for bl in neo_blocks:
  560. self.write_block(bl, use_obj_names)
  561. def write_block(self, block, use_obj_names=False):
  562. """
  563. Convert the provided Neo Block to a NIX Block and write it to
  564. the NIX file.
  565. :param block: Neo Block to be written
  566. :param use_obj_names: If True, will not generate unique object names
  567. but will instead try to use the name of each Neo object. If these are
  568. not unique, an exception will be raised.
  569. """
  570. if use_obj_names:
  571. if not self._names_ok:
  572. # _names_ok guards against check duplication
  573. # If it's False, it means write_block() was called directly
  574. self._use_obj_names([block])
  575. if "nix_name" in block.annotations:
  576. nix_name = block.annotations["nix_name"]
  577. else:
  578. nix_name = f"neo.block.{self._generate_nix_name()}"
  579. block.annotate(nix_name=nix_name)
  580. if nix_name in self.nix_file.blocks:
  581. nixblock = self.nix_file.blocks[nix_name]
  582. del self.nix_file.blocks[nix_name]
  583. del self.nix_file.sections[nix_name]
  584. nixblock = self.nix_file.create_block(nix_name, "neo.block")
  585. nixblock.metadata = self.nix_file.create_section(nix_name, "neo.block.metadata")
  586. metadata = nixblock.metadata
  587. neoname = block.name if block.name is not None else ""
  588. metadata["neo_name"] = neoname
  589. nixblock.definition = block.description
  590. if block.rec_datetime:
  591. nix_rec_dt = int(block.rec_datetime.strftime("%s"))
  592. nixblock.force_created_at(nix_rec_dt)
  593. if block.file_datetime:
  594. fdt, annotype = dt_to_nix(block.file_datetime)
  595. fdtprop = metadata.create_property("file_datetime", fdt)
  596. fdtprop.definition = annotype
  597. if block.annotations:
  598. for k, v in block.annotations.items():
  599. self._write_property(metadata, k, v)
  600. # descend into Segments
  601. for seg in block.segments:
  602. self._write_segment(seg, nixblock)
  603. # descend into ChannelIndexes
  604. for chx in block.channel_indexes:
  605. self._write_channelindex(chx, nixblock)
  606. # descend into Neo Groups
  607. for group in block.groups:
  608. self._write_group(group, nixblock)
  609. self._create_source_links(block, nixblock)
  610. def _write_channelindex(self, chx, nixblock):
  611. """
  612. Convert the provided Neo ChannelIndex to a NIX Source and write it to
  613. the NIX file. For each index in the ChannelIndex object, a child
  614. NIX Source is also created.
  615. :param chx: The Neo ChannelIndex to be written
  616. :param nixblock: NIX Block where the Source will be created
  617. """
  618. if "nix_name" in chx.annotations:
  619. nix_name = chx.annotations["nix_name"]
  620. else:
  621. nix_name = f"neo.channelindex.{self._generate_nix_name()}"
  622. chx.annotate(nix_name=nix_name)
  623. nixsource = nixblock.create_source(nix_name, "neo.channelindex")
  624. nixsource.metadata = nixblock.metadata.create_section(nix_name,
  625. "neo.channelindex.metadata")
  626. metadata = nixsource.metadata
  627. neoname = chx.name if chx.name is not None else ""
  628. metadata["neo_name"] = neoname
  629. nixsource.definition = chx.description
  630. if chx.annotations:
  631. for k, v in chx.annotations.items():
  632. self._write_property(metadata, k, v)
  633. coordinates = chx.coordinates
  634. if coordinates is not None and np.ndim(coordinates) == 1:
  635. # support 1D coordinates for single ChannelIndex
  636. coordinates = [coordinates]
  637. for idx, channel in enumerate(chx.index):
  638. channame = f"{nix_name}.ChannelIndex{idx}"
  639. nixchan = nixsource.create_source(channame, "neo.channelindex")
  640. nixchan.metadata = nixsource.metadata.create_section(nixchan.name,
  641. "neo.channelindex.metadata")
  642. nixchan.definition = nixsource.definition
  643. chanmd = nixchan.metadata
  644. chanmd["index"] = int(channel)
  645. if len(chx.channel_names):
  646. neochanname = stringify(chx.channel_names[idx])
  647. chanmd["neo_name"] = neochanname
  648. if len(chx.channel_ids):
  649. chanid = chx.channel_ids[idx]
  650. chanmd["channel_id"] = chanid
  651. if coordinates is not None:
  652. coords = coordinates[idx]
  653. coordunits = stringify(coords[0].dimensionality)
  654. nixcoords = tuple(c.magnitude.item() for c in coords)
  655. chanprop = chanmd.create_property("coordinates", nixcoords)
  656. chanprop.unit = coordunits
  657. # Descend into Units
  658. for unit in chx.units:
  659. self._write_unit(unit, nixsource)
  660. def _write_channelview(self, chview, nixblock, nixgroup):
  661. """
  662. Convert the provided Neo ChannelView to a NIX MultiTag and write it to
  663. the NIX file.
  664. :param chx: The Neo ChannelView to be written
  665. :param nixblock: NIX Block where the MultiTag will be created
  666. """
  667. if "nix_name" in chview.annotations:
  668. nix_name = chview.annotations["nix_name"]
  669. else:
  670. nix_name = "neo.channelview.{}".format(self._generate_nix_name())
  671. chview.annotate(nix_name=nix_name)
  672. # create a new data array if this channelview was not saved yet
  673. if not nix_name in self._view_map:
  674. channels = nixblock.create_data_array(
  675. "{}.index".format(nix_name), "neo.channelview.index", data=chview.index
  676. )
  677. nixmt = nixblock.create_multi_tag(nix_name, "neo.channelview",
  678. positions=channels)
  679. nixmt.metadata = nixgroup.metadata.create_section(
  680. nix_name, "neo.channelview.metadata"
  681. )
  682. metadata = nixmt.metadata
  683. neoname = chview.name if chview.name is not None else ""
  684. metadata["neo_name"] = neoname
  685. nixmt.definition = chview.description
  686. if chview.annotations:
  687. for k, v in chview.annotations.items():
  688. self._write_property(metadata, k, v)
  689. self._view_map[nix_name] = nixmt
  690. # link tag to the data array for the ChannelView's signal
  691. if not ("nix_name" in chview.obj.annotations
  692. and chview.obj.annotations["nix_name"] in self._signal_map):
  693. # the following restriction could be relaxed later
  694. # but for a first pass this simplifies my mental model
  695. raise Exception("Need to save signals before saving views")
  696. nix_name = chview.obj.annotations["nix_name"]
  697. nixmt.references.extend(self._signal_map[nix_name])
  698. else:
  699. nixmt = self._view_map[nix_name]
  700. nixgroup.multi_tags.append(nixmt)
  701. def _write_segment(self, segment, nixblock):
  702. """
  703. Convert the provided Neo Segment to a NIX Group and write it to the
  704. NIX file.
  705. :param segment: Neo Segment to be written
  706. :param nixblock: NIX Block where the Group will be created
  707. """
  708. if "nix_name" in segment.annotations:
  709. nix_name = segment.annotations["nix_name"]
  710. else:
  711. nix_name = f"neo.segment.{self._generate_nix_name()}"
  712. segment.annotate(nix_name=nix_name)
  713. nixgroup = nixblock.create_group(nix_name, "neo.segment")
  714. nixgroup.metadata = nixblock.metadata.create_section(nix_name,
  715. "neo.segment.metadata")
  716. metadata = nixgroup.metadata
  717. neoname = segment.name if segment.name is not None else ""
  718. metadata["neo_name"] = neoname
  719. nixgroup.definition = segment.description
  720. if segment.rec_datetime:
  721. nix_rec_dt = int(segment.rec_datetime.strftime("%s"))
  722. nixgroup.force_created_at(nix_rec_dt)
  723. if segment.file_datetime:
  724. fdt, annotype = dt_to_nix(segment.file_datetime)
  725. fdtprop = metadata.create_property("file_datetime", fdt)
  726. fdtprop.definition = annotype
  727. if segment.annotations:
  728. for k, v in segment.annotations.items():
  729. self._write_property(metadata, k, v)
  730. # write signals, events, epochs, and spiketrains
  731. for asig in segment.analogsignals:
  732. self._write_analogsignal(asig, nixblock, nixgroup)
  733. for isig in segment.irregularlysampledsignals:
  734. self._write_irregularlysampledsignal(isig, nixblock, nixgroup)
  735. for event in segment.events:
  736. self._write_event(event, nixblock, nixgroup)
  737. for epoch in segment.epochs:
  738. self._write_epoch(epoch, nixblock, nixgroup)
  739. for spiketrain in segment.spiketrains:
  740. self._write_spiketrain(spiketrain, nixblock, nixgroup)
  741. for imagesequence in segment.imagesequences:
  742. self._write_imagesequence(imagesequence, nixblock, nixgroup)
  743. def _write_group(self, neo_group, nixblock, parent=None):
  744. """
  745. Convert the provided Neo Group to a NIX Group and write it to the
  746. NIX file.
  747. :param neo_group: Neo Group to be written
  748. :param nixblock: NIX Block where the NIX Group will be created
  749. :param parent: for sub-groups, the parent Neo Group
  750. """
  751. if parent:
  752. label = "neo.subgroup"
  753. # note that the use of a different label for top-level groups and sub-groups is not
  754. # strictly necessary, the presence of the "neo_parent" annotation is sufficient.
  755. # However, I think it adds clarity and helps in debugging and testing.
  756. else:
  757. label = "neo.group"
  758. if "nix_name" in neo_group.annotations:
  759. nix_name = neo_group.annotations["nix_name"]
  760. else:
  761. nix_name = "{}.{}".format(label, self._generate_nix_name())
  762. neo_group.annotate(nix_name=nix_name)
  763. nixgroup = nixblock.create_group(nix_name, label)
  764. nixgroup.metadata = nixblock.metadata.create_section(
  765. nix_name, f"{label}.metadata"
  766. )
  767. metadata = nixgroup.metadata
  768. neoname = neo_group.name if neo_group.name is not None else ""
  769. metadata["neo_name"] = neoname
  770. if parent:
  771. metadata["neo_parent"] = parent.annotations["nix_name"]
  772. nixgroup.definition = neo_group.description
  773. if neo_group.annotations:
  774. for k, v in neo_group.annotations.items():
  775. self._write_property(metadata, k, v)
  776. # link signals and image sequences
  777. objnames = []
  778. for obj in chain(
  779. neo_group.analogsignals,
  780. neo_group.irregularlysampledsignals,
  781. neo_group.imagesequences,
  782. ):
  783. if not ("nix_name" in obj.annotations
  784. and obj.annotations["nix_name"] in self._signal_map):
  785. # the following restriction could be relaxed later
  786. # but for a first pass this simplifies my mental model
  787. raise Exception("Orphan signals/image sequences cannot be stored, needs to belong to a Segment")
  788. objnames.append(obj.annotations["nix_name"])
  789. for name in objnames:
  790. for da in self._signal_map[name]:
  791. nixgroup.data_arrays.append(da)
  792. # link events, epochs and spiketrains
  793. objnames = []
  794. for obj in chain(
  795. neo_group.events,
  796. neo_group.epochs,
  797. neo_group.spiketrains,
  798. ):
  799. if not ("nix_name" in obj.annotations
  800. and obj.annotations["nix_name"] in nixblock.multi_tags):
  801. # the following restriction could be relaxed later
  802. # but for a first pass this simplifies my mental model
  803. raise Exception("Orphan epochs/events/spiketrains cannot be stored, needs to belong to a Segment")
  804. objnames.append(obj.annotations["nix_name"])
  805. for name in objnames:
  806. mt = nixblock.multi_tags[name]
  807. nixgroup.multi_tags.append(mt)
  808. # save channel views
  809. for chview in neo_group.channelviews:
  810. self._write_channelview(chview, nixblock, nixgroup)
  811. # save sub-groups
  812. for subgroup in neo_group.groups:
  813. self._write_group(subgroup, nixblock, parent=neo_group)
  814. def _write_analogsignal(self, anasig, nixblock, nixgroup):
  815. """
  816. Convert the provided ``anasig`` (AnalogSignal) to a list of NIX
  817. DataArray objects and write them to the NIX file. All DataArray objects
  818. created from the same AnalogSignal have their metadata section point to
  819. the same object.
  820. :param anasig: The Neo AnalogSignal to be written
  821. :param nixblock: NIX Block where the DataArrays will be created
  822. :param nixgroup: NIX Group where the DataArrays will be attached
  823. """
  824. if "nix_name" in anasig.annotations:
  825. nix_name = anasig.annotations["nix_name"]
  826. else:
  827. nix_name = f"neo.analogsignal.{self._generate_nix_name()}"
  828. anasig.annotate(nix_name=nix_name)
  829. if f"{nix_name}.0" in nixblock.data_arrays and nixgroup:
  830. # AnalogSignal is in multiple Segments.
  831. # Append DataArrays to Group and return.
  832. dalist = list()
  833. for idx in itertools.count():
  834. daname = f"{nix_name}.{idx}"
  835. if daname in nixblock.data_arrays:
  836. dalist.append(nixblock.data_arrays[daname])
  837. else:
  838. break
  839. nixgroup.data_arrays.extend(dalist)
  840. return
  841. if isinstance(anasig, BaseProxy):
  842. data = np.transpose(anasig.load()[:].magnitude)
  843. else:
  844. data = np.transpose(anasig[:].magnitude)
  845. parentmd = nixgroup.metadata if nixgroup else nixblock.metadata
  846. metadata = parentmd.create_section(nix_name, "neo.analogsignal.metadata")
  847. nixdas = list()
  848. for idx, row in enumerate(data):
  849. daname = f"{nix_name}.{idx}"
  850. da = nixblock.create_data_array(daname, "neo.analogsignal", data=row)
  851. da.metadata = metadata
  852. da.definition = anasig.description
  853. da.unit = units_to_string(anasig.units)
  854. sampling_period = anasig.sampling_period.magnitude.item()
  855. timedim = da.append_sampled_dimension(sampling_period)
  856. timedim.unit = units_to_string(anasig.sampling_period.units)
  857. tstart = anasig.t_start
  858. metadata["t_start"] = tstart.magnitude.item()
  859. metadata.props["t_start"].unit = units_to_string(tstart.units)
  860. timedim.offset = tstart.rescale(timedim.unit).magnitude.item()
  861. timedim.label = "time"
  862. nixdas.append(da)
  863. if nixgroup:
  864. nixgroup.data_arrays.append(da)
  865. neoname = anasig.name if anasig.name is not None else ""
  866. metadata["neo_name"] = neoname
  867. if anasig.annotations:
  868. for k, v in anasig.annotations.items():
  869. self._write_property(metadata, k, v)
  870. if anasig.array_annotations:
  871. for k, v in anasig.array_annotations.items():
  872. p = self._write_property(metadata, k, v)
  873. p.type = ARRAYANNOTATION
  874. self._signal_map[nix_name] = nixdas
  875. def _write_imagesequence(self, imgseq, nixblock, nixgroup):
  876. """
  877. Convert the provided ``imgseq`` (ImageSequence) to a list of NIX
  878. DataArray objects and write them to the NIX file. All DataArray objects
  879. created from the same ImageSequence have their metadata section point to
  880. the same object.
  881. :param anasig: The Neo ImageSequence to be written
  882. :param nixblock: NIX Block where the DataArrays will be created
  883. :param nixgroup: NIX Group where the DataArrays will be attached
  884. """
  885. if "nix_name" in imgseq.annotations:
  886. nix_name = imgseq.annotations["nix_name"]
  887. else:
  888. nix_name = f"neo.imagesequence.{self._generate_nix_name()}"
  889. imgseq.annotate(nix_name=nix_name)
  890. if f"{nix_name}.0" in nixblock.data_arrays and nixgroup:
  891. dalist = list()
  892. for idx in itertools.count():
  893. daname = f"{nix_name}.{idx}"
  894. if daname in nixblock.data_arrays:
  895. dalist.append(nixblock.data_arrays[daname])
  896. else:
  897. break
  898. nixgroup.data_arrays.extend(dalist)
  899. return
  900. if isinstance(imgseq, BaseProxy):
  901. data = np.transpose(imgseq.load()[:].magnitude)
  902. else:
  903. data = np.transpose(imgseq[:].magnitude)
  904. parentmd = nixgroup.metadata if nixgroup else nixblock.metadata
  905. metadata = parentmd.create_section(nix_name, "neo.imagesequence.metadata")
  906. nixdas = list()
  907. for idx, row in enumerate(data):
  908. daname = f"{nix_name}.{idx}"
  909. da = nixblock.create_data_array(daname, "neo.imagesequence", data=row)
  910. da.metadata = metadata
  911. da.definition = imgseq.description
  912. da.unit = units_to_string(imgseq.units)
  913. metadata["sampling_rate"] = imgseq.sampling_rate.magnitude.item()
  914. units = imgseq.sampling_rate.units
  915. metadata.props["sampling_rate"].unit = units_to_string(units)
  916. metadata["spatial_scale"] = imgseq.spatial_scale.magnitude.item()
  917. units = imgseq.spatial_scale.units
  918. metadata.props["spatial_scale"].unit = units_to_string(units)
  919. metadata["t_start"] = imgseq.t_start.magnitude.item()
  920. units = imgseq.t_start.units
  921. metadata.props["t_start"].unit = units_to_string(units)
  922. nixdas.append(da)
  923. if nixgroup:
  924. nixgroup.data_arrays.append(da)
  925. neoname = imgseq.name if imgseq.name is not None else ""
  926. metadata["neo_name"] = neoname
  927. if imgseq.annotations:
  928. for k, v in imgseq.annotations.items():
  929. self._write_property(metadata, k, v)
  930. self._signal_map[nix_name] = nixdas
  931. def _write_irregularlysampledsignal(self, irsig, nixblock, nixgroup):
  932. """
  933. Convert the provided ``irsig`` (IrregularlySampledSignal) to a list of
  934. NIX DataArray objects and write them to the NIX file at the location.
  935. All DataArray objects created from the same IrregularlySampledSignal
  936. have their metadata section point to the same object.
  937. :param irsig: The Neo IrregularlySampledSignal to be written
  938. :param nixblock: NIX Block where the DataArrays will be created
  939. :param nixgroup: NIX Group where the DataArrays will be attached
  940. """
  941. if "nix_name" in irsig.annotations:
  942. nix_name = irsig.annotations["nix_name"]
  943. else:
  944. nix_name = f"neo.irregularlysampledsignal.{self._generate_nix_name()}"
  945. irsig.annotate(nix_name=nix_name)
  946. if f"{nix_name}.0" in nixblock.data_arrays and nixgroup:
  947. # IrregularlySampledSignal is in multiple Segments.
  948. # Append DataArrays to Group and return.
  949. dalist = list()
  950. for idx in itertools.count():
  951. daname = f"{nix_name}.{idx}"
  952. if daname in nixblock.data_arrays:
  953. dalist.append(nixblock.data_arrays[daname])
  954. else:
  955. break
  956. nixgroup.data_arrays.extend(dalist)
  957. return
  958. if isinstance(irsig, BaseProxy):
  959. data = np.transpose(irsig.load()[:].magnitude)
  960. else:
  961. data = np.transpose(irsig[:].magnitude)
  962. parentmd = nixgroup.metadata if nixgroup else nixblock.metadata
  963. metadata = parentmd.create_section(nix_name, "neo.irregularlysampledsignal.metadata")
  964. nixdas = list()
  965. for idx, row in enumerate(data):
  966. daname = f"{nix_name}.{idx}"
  967. da = nixblock.create_data_array(daname, "neo.irregularlysampledsignal", data=row)
  968. da.metadata = metadata
  969. da.definition = irsig.description
  970. da.unit = units_to_string(irsig.units)
  971. timedim = da.append_range_dimension(irsig.times.magnitude)
  972. timedim.unit = units_to_string(irsig.times.units)
  973. timedim.label = "time"
  974. nixdas.append(da)
  975. if nixgroup:
  976. nixgroup.data_arrays.append(da)
  977. neoname = irsig.name if irsig.name is not None else ""
  978. metadata["neo_name"] = neoname
  979. if irsig.annotations:
  980. for k, v in irsig.annotations.items():
  981. self._write_property(metadata, k, v)
  982. if irsig.array_annotations:
  983. for k, v in irsig.array_annotations.items():
  984. p = self._write_property(metadata, k, v)
  985. p.type = ARRAYANNOTATION
  986. self._signal_map[nix_name] = nixdas
  987. def _write_event(self, event, nixblock, nixgroup):
  988. """
  989. Convert the provided Neo Event to a NIX MultiTag and write it to the
  990. NIX file.
  991. :param event: The Neo Event to be written
  992. :param nixblock: NIX Block where the MultiTag will be created
  993. :param nixgroup: NIX Group where the MultiTag will be attached
  994. """
  995. if "nix_name" in event.annotations:
  996. nix_name = event.annotations["nix_name"]
  997. else:
  998. nix_name = f"neo.event.{self._generate_nix_name()}"
  999. event.annotate(nix_name=nix_name)
  1000. if nix_name in nixblock.multi_tags:
  1001. # Event is in multiple Segments. Append to Group and return.
  1002. mt = nixblock.multi_tags[nix_name]
  1003. nixgroup.multi_tags.append(mt)
  1004. return
  1005. if isinstance(event, BaseProxy):
  1006. event = event.load()
  1007. times = event.times.magnitude
  1008. units = units_to_string(event.times.units)
  1009. labels = event.labels
  1010. timesda = nixblock.create_data_array(f"{nix_name}.times", "neo.event.times", data=times)
  1011. timesda.unit = units
  1012. nixmt = nixblock.create_multi_tag(nix_name, "neo.event", positions=timesda)
  1013. nixmt.metadata = nixgroup.metadata.create_section(nix_name, "neo.event.metadata")
  1014. metadata = nixmt.metadata
  1015. labeldim = timesda.append_set_dimension()
  1016. labeldim.labels = labels
  1017. neoname = event.name if event.name is not None else ""
  1018. metadata["neo_name"] = neoname
  1019. nixmt.definition = event.description
  1020. if event.annotations:
  1021. for k, v in event.annotations.items():
  1022. self._write_property(metadata, k, v)
  1023. if event.array_annotations:
  1024. for k, v in event.array_annotations.items():
  1025. p = self._write_property(metadata, k, v)
  1026. p.type = ARRAYANNOTATION
  1027. nixgroup.multi_tags.append(nixmt)
  1028. # reference all AnalogSignals and IrregularlySampledSignals in Group
  1029. for da in nixgroup.data_arrays:
  1030. if da.type in ("neo.analogsignal", "neo.irregularlysampledsignal"):
  1031. nixmt.references.append(da)
  1032. def _write_epoch(self, epoch, nixblock, nixgroup):
  1033. """
  1034. Convert the provided Neo Epoch to a NIX MultiTag and write it to the
  1035. NIX file.
  1036. :param epoch: The Neo Epoch to be written
  1037. :param nixblock: NIX Block where the MultiTag will be created
  1038. :param nixgroup: NIX Group where the MultiTag will be attached
  1039. """
  1040. if "nix_name" in epoch.annotations:
  1041. nix_name = epoch.annotations["nix_name"]
  1042. else:
  1043. nix_name = f"neo.epoch.{self._generate_nix_name()}"
  1044. epoch.annotate(nix_name=nix_name)
  1045. if nix_name in nixblock.multi_tags:
  1046. # Epoch is in multiple Segments. Append to Group and return.
  1047. mt = nixblock.multi_tags[nix_name]
  1048. nixgroup.multi_tags.append(mt)
  1049. return
  1050. if isinstance(epoch, BaseProxy):
  1051. epoch = epoch.load()
  1052. times = epoch.times.magnitude
  1053. tunits = units_to_string(epoch.times.units)
  1054. durations = epoch.durations.magnitude
  1055. dunits = units_to_string(epoch.durations.units)
  1056. timesda = nixblock.create_data_array(f"{nix_name}.times", "neo.epoch.times", data=times)
  1057. timesda.unit = tunits
  1058. nixmt = nixblock.create_multi_tag(nix_name, "neo.epoch", positions=timesda)
  1059. durada = nixblock.create_data_array(f"{nix_name}.durations", "neo.epoch.durations",
  1060. data=durations)
  1061. durada.unit = dunits
  1062. nixmt.extents = durada
  1063. nixmt.metadata = nixgroup.metadata.create_section(nix_name, "neo.epoch.metadata")
  1064. metadata = nixmt.metadata
  1065. labeldim = timesda.append_set_dimension()
  1066. labeldim.labels = epoch.labels
  1067. neoname = epoch.name if epoch.name is not None else ""
  1068. metadata["neo_name"] = neoname
  1069. nixmt.definition = epoch.description
  1070. if epoch.annotations:
  1071. for k, v in epoch.annotations.items():
  1072. self._write_property(metadata, k, v)
  1073. if epoch.array_annotations:
  1074. for k, v in epoch.array_annotations.items():
  1075. p = self._write_property(metadata, k, v)
  1076. p.type = ARRAYANNOTATION
  1077. nixgroup.multi_tags.append(nixmt)
  1078. # reference all AnalogSignals and IrregularlySampledSignals in Group
  1079. for da in nixgroup.data_arrays:
  1080. if da.type in ("neo.analogsignal", "neo.irregularlysampledsignal"):
  1081. nixmt.references.append(da)
  1082. def _write_spiketrain(self, spiketrain, nixblock, nixgroup):
  1083. """
  1084. Convert the provided Neo SpikeTrain to a NIX MultiTag and write it to
  1085. the NIX file.
  1086. :param spiketrain: The Neo SpikeTrain to be written
  1087. :param nixblock: NIX Block where the MultiTag will be created
  1088. :param nixgroup: NIX Group where the MultiTag will be attached
  1089. """
  1090. if "nix_name" in spiketrain.annotations:
  1091. nix_name = spiketrain.annotations["nix_name"]
  1092. else:
  1093. nix_name = f"neo.spiketrain.{self._generate_nix_name()}"
  1094. spiketrain.annotate(nix_name=nix_name)
  1095. if nix_name in nixblock.multi_tags and nixgroup:
  1096. # SpikeTrain is in multiple Segments. Append to Group and return.
  1097. mt = nixblock.multi_tags[nix_name]
  1098. nixgroup.multi_tags.append(mt)
  1099. return
  1100. if isinstance(spiketrain, BaseProxy):
  1101. spiketrain = spiketrain.load()
  1102. times = spiketrain.times.magnitude
  1103. tunits = units_to_string(spiketrain.times.units)
  1104. waveforms = spiketrain.waveforms
  1105. timesda = nixblock.create_data_array(f"{nix_name}.times", "neo.spiketrain.times",
  1106. data=times)
  1107. timesda.unit = tunits
  1108. nixmt = nixblock.create_multi_tag(nix_name, "neo.spiketrain", positions=timesda)
  1109. parentmd = nixgroup.metadata if nixgroup else nixblock.metadata
  1110. nixmt.metadata = parentmd.create_section(nix_name, "neo.spiketrain.metadata")
  1111. metadata = nixmt.metadata
  1112. neoname = spiketrain.name if spiketrain.name is not None else ""
  1113. metadata["neo_name"] = neoname
  1114. nixmt.definition = spiketrain.description
  1115. self._write_property(metadata, "t_start", spiketrain.t_start)
  1116. self._write_property(metadata, "t_stop", spiketrain.t_stop)
  1117. if spiketrain.annotations:
  1118. for k, v in spiketrain.annotations.items():
  1119. self._write_property(metadata, k, v)
  1120. if spiketrain.array_annotations:
  1121. for k, v in spiketrain.array_annotations.items():
  1122. p = self._write_property(metadata, k, v)
  1123. p.type = ARRAYANNOTATION
  1124. if nixgroup:
  1125. nixgroup.multi_tags.append(nixmt)
  1126. if waveforms is not None:
  1127. wfdata = list(wf.magnitude for wf in
  1128. list(wfgroup for wfgroup in spiketrain.waveforms))
  1129. wfunits = units_to_string(spiketrain.waveforms.units)
  1130. wfda = nixblock.create_data_array(f"{nix_name}.waveforms", "neo.waveforms",
  1131. data=wfdata)
  1132. wfda.unit = wfunits
  1133. wfda.metadata = nixmt.metadata.create_section(wfda.name, "neo.waveforms.metadata")
  1134. nixmt.create_feature(wfda, nix.LinkType.Indexed)
  1135. # TODO: Move time dimension first for PR #457
  1136. # https://github.com/NeuralEnsemble/python-neo/pull/457
  1137. wfda.append_set_dimension()
  1138. wfda.append_set_dimension()
  1139. wftime = wfda.append_sampled_dimension(spiketrain.sampling_period.magnitude.item())
  1140. wftime.unit = units_to_string(spiketrain.sampling_period.units)
  1141. wftime.label = "time"
  1142. if spiketrain.left_sweep is not None:
  1143. self._write_property(wfda.metadata, "left_sweep", spiketrain.left_sweep)
  1144. def _write_unit(self, neounit, nixchxsource):
  1145. """
  1146. Convert the provided Neo Unit to a NIX Source and write it to the
  1147. NIX file.
  1148. :param neounit: The Neo Unit to be written
  1149. :param nixchxsource: NIX Source (ChannelIndex) where the new Source
  1150. (Unit) will be created
  1151. """
  1152. if "nix_name" in neounit.annotations:
  1153. nix_name = neounit.annotations["nix_name"]
  1154. else:
  1155. nix_name = f"neo.unit.{self._generate_nix_name()}"
  1156. neounit.annotate(nix_name=nix_name)
  1157. nixunitsource = nixchxsource.create_source(nix_name, "neo.unit")
  1158. nixunitsource.metadata = nixchxsource.metadata.create_section(nix_name,
  1159. "neo.unit.metadata")
  1160. metadata = nixunitsource.metadata
  1161. neoname = neounit.name if neounit.name is not None else ""
  1162. metadata["neo_name"] = neoname
  1163. nixunitsource.definition = neounit.description
  1164. if neounit.annotations:
  1165. for k, v in neounit.annotations.items():
  1166. self._write_property(metadata, k, v)
  1167. def _create_source_links(self, neoblock, nixblock):
  1168. """
  1169. Creates references between objects in a NIX Block to store the
  1170. references in the Neo ChannelIndex and Unit objects.
  1171. Specifically:
  1172. - If a Neo ChannelIndex references a Neo AnalogSignal or
  1173. IrregularlySampledSignal, the corresponding signal DataArray will
  1174. reference the corresponding NIX Source object which represents the
  1175. Neo ChannelIndex.
  1176. - If a Neo Unit references a Neo SpikeTrain, the corresponding
  1177. MultiTag will reference the NIX Source objects which represent the
  1178. Neo Unit and its parent ChannelIndex.
  1179. The two arguments must represent the same Block in each corresponding
  1180. format.
  1181. Neo objects that have not been converted yet (i.e., AnalogSignal,
  1182. IrregularlySampledSignal, or SpikeTrain objects that are not attached
  1183. to a Segment) are created on the nixblock.
  1184. :param neoblock: A Neo Block object
  1185. :param nixblock: The corresponding NIX Block
  1186. """
  1187. for chx in neoblock.channel_indexes:
  1188. signames = []
  1189. for asig in chx.analogsignals:
  1190. if not ("nix_name" in asig.annotations
  1191. and asig.annotations["nix_name"] in self._signal_map):
  1192. self._write_analogsignal(asig, nixblock, None)
  1193. signames.append(asig.annotations["nix_name"])
  1194. for isig in chx.irregularlysampledsignals:
  1195. if not ("nix_name" in isig.annotations
  1196. and isig.annotations["nix_name"] in self._signal_map):
  1197. self._write_irregularlysampledsignal(isig, nixblock, None)
  1198. signames.append(isig.annotations["nix_name"])
  1199. chxsource = nixblock.sources[chx.annotations["nix_name"]]
  1200. for name in signames:
  1201. for da in self._signal_map[name]:
  1202. da.sources.append(chxsource)
  1203. for unit in chx.units:
  1204. unitsource = chxsource.sources[unit.annotations["nix_name"]]
  1205. for st in unit.spiketrains:
  1206. mtags = nixblock.multi_tags
  1207. if not ("nix_name" in st.annotations
  1208. and st.annotations["nix_name"] in mtags):
  1209. self._write_spiketrain(st, nixblock, None)
  1210. stmt = mtags[st.annotations["nix_name"]]
  1211. stmt.sources.append(chxsource)
  1212. stmt.sources.append(unitsource)
  1213. @staticmethod
  1214. def _generate_nix_name():
  1215. return uuid4().hex
  1216. def _write_property(self, section, name, v):
  1217. """
  1218. Create a metadata property with a given name and value on the provided
  1219. metadata section.
  1220. :param section: The metadata section to hold the new property
  1221. :param name: The name of the property
  1222. :param v: The value to write
  1223. :return: The newly created property
  1224. """
  1225. if isinstance(v, pq.Quantity):
  1226. if len(v.shape):
  1227. section.create_property(name, tuple(v.magnitude))
  1228. else:
  1229. section.create_property(name, v.magnitude.item())
  1230. section.props[name].unit = str(v.dimensionality)
  1231. elif isinstance(v, datetime_types):
  1232. value, annotype = dt_to_nix(v)
  1233. prop = section.create_property(name, value)
  1234. prop.definition = annotype
  1235. elif isinstance(v, str):
  1236. if len(v):
  1237. section.create_property(name, v)
  1238. else:
  1239. section.create_property(name, nix.DataType.String)
  1240. elif isinstance(v, bytes):
  1241. section.create_property(name, v.decode())
  1242. elif isinstance(v, Iterable):
  1243. values = []
  1244. unit = None
  1245. definition = None
  1246. if len(v) == 0:
  1247. # NIX supports empty properties but dtype must be specified
  1248. # Defaulting to String and using definition to signify empty
  1249. # iterable as opposed to empty string
  1250. values = nix.DataType.String
  1251. definition = EMPTYANNOTATION
  1252. elif hasattr(v, "ndim") and v.ndim == 0:
  1253. values = v.item()
  1254. if isinstance(v, pq.Quantity):
  1255. unit = str(v.dimensionality)
  1256. else:
  1257. for item in v:
  1258. if isinstance(item, str):
  1259. item = item
  1260. elif isinstance(item, pq.Quantity):
  1261. unit = str(item.dimensionality)
  1262. item = item.magnitude.item()
  1263. elif isinstance(item, Iterable):
  1264. self.logger.warn("Multidimensional arrays and nested "
  1265. "containers are not currently "
  1266. "supported when writing to NIX.")
  1267. return None
  1268. else:
  1269. item = item
  1270. values.append(item)
  1271. section.create_property(name, values)
  1272. section.props[name].unit = unit
  1273. section.props[name].definition = definition
  1274. elif type(v).__module__ == "numpy":
  1275. section.create_property(name, v.item())
  1276. else:
  1277. section.create_property(name, v)
  1278. return section.props[name]
  1279. @staticmethod
  1280. def _nix_attr_to_neo(nix_obj):
  1281. """
  1282. Reads common attributes and metadata from a NIX object and populates a
  1283. dictionary with Neo-compatible attributes and annotations.
  1284. Common attributes: neo_name, nix_name, description,
  1285. file_datetime (if applicable).
  1286. Metadata: For properties that specify a 'unit', a Quantity object is
  1287. created.
  1288. """
  1289. neo_attrs = dict()
  1290. neo_attrs["nix_name"] = nix_obj.name
  1291. neo_attrs["description"] = stringify(nix_obj.definition)
  1292. if nix_obj.metadata:
  1293. for prop in nix_obj.metadata.inherited_properties():
  1294. values = list(prop.values)
  1295. if prop.unit:
  1296. units = prop.unit
  1297. values = create_quantity(values, units)
  1298. if not len(values):
  1299. if prop.definition == EMPTYANNOTATION:
  1300. values = list()
  1301. elif prop.data_type == nix.DataType.String:
  1302. values = ""
  1303. elif len(values) == 1:
  1304. values = values[0]
  1305. if prop.definition in (DATEANNOTATION, TIMEANNOTATION, DATETIMEANNOTATION):
  1306. values = dt_from_nix(values, prop.definition)
  1307. if prop.type == ARRAYANNOTATION:
  1308. if 'array_annotations' in neo_attrs:
  1309. neo_attrs['array_annotations'][prop.name] = values
  1310. else:
  1311. neo_attrs['array_annotations'] = {prop.name: values}
  1312. else:
  1313. neo_attrs[prop.name] = values
  1314. # since the 'neo_name' NIX property becomes the actual object's name,
  1315. # there's no reason to keep it in the annotations
  1316. neo_attrs["name"] = stringify(neo_attrs.pop("neo_name", None))
  1317. return neo_attrs
  1318. @staticmethod
  1319. def _group_signals(dataarrays):
  1320. """
  1321. Groups data arrays that were generated by the same Neo Signal object.
  1322. The collection can contain both AnalogSignals and
  1323. IrregularlySampledSignals.
  1324. :param dataarrays: A collection of DataArray objects to group
  1325. :return: A dictionary mapping a base name to a list of DataArrays which
  1326. belong to the same Signal
  1327. """
  1328. # now start grouping
  1329. groups = OrderedDict()
  1330. for da in dataarrays:
  1331. basename = ".".join(da.name.split(".")[:-1])
  1332. if basename not in groups:
  1333. groups[basename] = list()
  1334. groups[basename].append(da)
  1335. return groups
  1336. @staticmethod
  1337. def _get_time_dimension(obj):
  1338. for dim in obj.dimensions:
  1339. if hasattr(dim, "label") and dim.label == "time":
  1340. return dim
  1341. return None
  1342. def _use_obj_names(self, blocks):
  1343. errmsg = "use_obj_names enabled: found conflict or anonymous object"
  1344. allobjs = []
  1345. def check_unique(objs):
  1346. names = list(o.name for o in objs)
  1347. if None in names or "" in names:
  1348. raise ValueError(names)
  1349. if len(names) != len(set(names)):
  1350. self._names_ok = False
  1351. raise ValueError(names)
  1352. # collect objs if ok
  1353. allobjs.extend(objs)
  1354. try:
  1355. check_unique(blocks)
  1356. except ValueError as exc:
  1357. raise ValueError(f"{errmsg} in Blocks") from exc
  1358. for blk in blocks:
  1359. try:
  1360. # Segments
  1361. check_unique(blk.segments)
  1362. except ValueError as exc:
  1363. raise ValueError(f"{errmsg} at Block '{blk.name}' > segments") from exc
  1364. # collect all signals in all segments
  1365. signals = []
  1366. # collect all events, epochs, and spiketrains in all segments
  1367. eests = []
  1368. for seg in blk.segments:
  1369. signals.extend(seg.analogsignals)
  1370. signals.extend(seg.irregularlysampledsignals)
  1371. signals.extend(seg.imagesequences)
  1372. eests.extend(seg.events)
  1373. eests.extend(seg.epochs)
  1374. eests.extend(seg.spiketrains)
  1375. try:
  1376. # AnalogSignals and IrregularlySampledSignals
  1377. check_unique(signals)
  1378. except ValueError as exc:
  1379. raise ValueError(f"{errmsg} in Signal names of Block '{blk.name}'") from exc
  1380. try:
  1381. # Events, Epochs, and SpikeTrains
  1382. check_unique(eests)
  1383. except ValueError as exc:
  1384. raise ValueError(
  1385. f"{errmsg} in Event, Epoch, and Spiketrain names of Block '{blk.name}'"
  1386. ) from exc
  1387. try:
  1388. # ChannelIndexes
  1389. check_unique(blk.channel_indexes)
  1390. except ValueError as exc:
  1391. raise ValueError(f"{errmsg} in ChannelIndex names of Block '{blk.name}'") from exc
  1392. for chx in blk.channel_indexes:
  1393. try:
  1394. check_unique(chx.units)
  1395. except ValueError as exc:
  1396. raise ValueError(f"{errmsg} in Unit names of Block '{blk.name}' > "
  1397. f"ChannelIndex '{chx.name}'") from exc
  1398. # names are OK: assign annotations
  1399. for o in allobjs:
  1400. o.annotations["nix_name"] = o.name
  1401. def close(self):
  1402. """
  1403. Closes the open nix file and resets maps.
  1404. """
  1405. if (hasattr(self, "nix_file") and self.nix_file and self.nix_file.is_open()):
  1406. self.nix_file.close()
  1407. self.nix_file = None
  1408. self._neo_map = None
  1409. self._ref_map = None
  1410. self._signal_map = None
  1411. self._view_map = None
  1412. self._block_read_counter = None
  1413. def __del__(self):
  1414. self.close()