# Copyright (c) 2016, German Neuroinformatics Node (G-Node) # Achilleas Koutsou # # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted under the terms of the BSD License. See # LICENSE file in the root of the Project. """ Module for reading data from files in the NIX format. Author: Achilleas Koutsou This IO supports both writing and reading of NIX files. Reading is supported only if the NIX file was created using this IO. Details on how the Neo object tree is mapped to NIX, as well as details on behaviours specific to this IO, can be found on the wiki of the G-Node fork of Neo: https://github.com/G-Node/python-neo/wiki """ from datetime import date, time, datetime from collections.abc import Iterable from collections import OrderedDict import itertools from uuid import uuid4 import warnings from distutils.version import LooseVersion as Version from itertools import chain import quantities as pq import numpy as np from .baseio import BaseIO from ..core import (Block, Segment, ChannelIndex, AnalogSignal, IrregularlySampledSignal, Epoch, Event, SpikeTrain, ImageSequence, Unit, ChannelView, Group) from ..io.proxyobjects import BaseProxy from ..version import version as neover try: import nixio as nix HAVE_NIX = True except ImportError: HAVE_NIX = False datetime_types = (date, time, datetime) EMPTYANNOTATION = "EMPTYLIST" ARRAYANNOTATION = "ARRAYANNOTATION" DATETIMEANNOTATION = "DATETIME" DATEANNOTATION = "DATE" TIMEANNOTATION = "TIME" MIN_NIX_VER = Version("1.5.0") datefmt = "%Y-%m-%d" timefmt = "%H:%M:%S.%f" datetimefmt = datefmt + "T" + timefmt def stringify(value): if value is None: return value if isinstance(value, bytes): value = value.decode() return str(value) def create_quantity(values, unitstr): if "*" in unitstr: unit = pq.CompoundUnit(stringify(unitstr)) else: unit = unitstr return pq.Quantity(values, unit) def units_to_string(pqunit): dim = str(pqunit.dimensionality) if dim.startswith("(") and dim.endswith(")"): return dim.strip("()") return dim def dt_to_nix(dt): """ Converts date, time, and datetime objects to an ISO string representation appropriate for storing in NIX. Returns the converted value and the annotation type definition for converting back to the original value type. """ if isinstance(dt, datetime): return dt.strftime(datetimefmt), DATETIMEANNOTATION if isinstance(dt, date): return dt.strftime(datefmt), DATEANNOTATION if isinstance(dt, time): return dt.strftime(timefmt), TIMEANNOTATION # Unknown: returning as is return dt def dt_from_nix(nixdt, annotype): """ Inverse function of 'dt_to_nix()'. Requires the stored annotation type to distinguish between the three source types (date, time, and datetime). """ if annotype == DATEANNOTATION: dt = datetime.strptime(nixdt, datefmt) return dt.date() if annotype == TIMEANNOTATION: dt = datetime.strptime(nixdt, timefmt) return dt.time() if annotype == DATETIMEANNOTATION: dt = datetime.strptime(nixdt, datetimefmt) return dt # Unknown type: older (or newer) IO version? # Returning as is to avoid data loss. return nixdt def check_nix_version(): if not HAVE_NIX: raise Exception( "Failed to import NIX. " "The NixIO requires the Python package for NIX " "(nixio on PyPi). Try `pip install nixio`." ) # nixio version numbers have a 'v' prefix which breaks the comparison nixverstr = nix.__version__.lstrip("v") try: nixver = Version(nixverstr) except ValueError: warnings.warn( f"Could not understand NIX Python version {nixverstr}. " f"The NixIO requires version {MIN_NIX_VER} of the Python package for NIX. " "The IO may not work correctly." ) return if nixver < MIN_NIX_VER: raise Exception( "NIX version not supported. " f"The NixIO requires version {MIN_NIX_VER} or higher of the Python package " f"for NIX. Found version {nixverstr}" ) class NixIO(BaseIO): """ Class for reading and writing NIX files. """ is_readable = True is_writable = True supported_objects = [Block, Segment, ChannelIndex, Group, ChannelView, AnalogSignal, IrregularlySampledSignal, Epoch, Event, SpikeTrain, Unit] readable_objects = [Block] writeable_objects = [Block] name = "NIX" extensions = ["h5", "nix"] mode = "file" def __init__(self, filename, mode="rw"): """ Initialise IO instance and NIX file. :param filename: Full path to the file """ check_nix_version() BaseIO.__init__(self, filename) self.filename = filename if mode == "ro": filemode = nix.FileMode.ReadOnly elif mode == "rw": filemode = nix.FileMode.ReadWrite elif mode == "ow": filemode = nix.FileMode.Overwrite else: raise ValueError(f"Invalid mode specified '{mode}'. " "Valid modes: 'ro' (ReadOnly)', 'rw' (ReadWrite)," " 'ow' (Overwrite).") self.nix_file = nix.File.open(self.filename, filemode) if self.nix_file.mode == nix.FileMode.ReadOnly: self._file_version = '0.5.2' if "neo" in self.nix_file.sections: self._file_version = self.nix_file.sections["neo"]["version"] elif self.nix_file.mode == nix.FileMode.ReadWrite: if "neo" in self.nix_file.sections: self._file_version = self.nix_file.sections["neo"]["version"] else: self._file_version = '0.5.2' filemd = self.nix_file.create_section("neo", "neo.metadata") filemd["version"] = self._file_version else: # new file filemd = self.nix_file.create_section("neo", "neo.metadata") filemd["version"] = neover self._file_version = neover self._block_read_counter = 0 # helper maps self._neo_map = dict() self._ref_map = dict() self._signal_map = dict() self._view_map = dict() # _names_ok is used to guard against name check duplication self._names_ok = False def __enter__(self): return self def __exit__(self, *args): self.close() def read_all_blocks(self, lazy=False): if lazy: raise Exception("Lazy loading is not supported for NixIO") return list(self._nix_to_neo_block(blk) for blk in self.nix_file.blocks) def read_block(self, index=None, nixname=None, neoname=None, lazy=False): """ Loads a Block from the NIX file along with all contained child objects and returns the equivalent Neo Block. The Block to read can be specified in one of three ways: - Index (position) in the file - Name of the NIX Block (see [...] for details on the naming) - Name of the original Neo Block If no arguments are specified, the first Block is returned and consecutive calls to the function return the next Block in the file. After all Blocks have been loaded this way, the function returns None. If more than one argument is specified, the precedence order is: index, nixname, neoname Note that Neo objects can be anonymous or have non-unique names, so specifying a Neo name may be ambiguous. See also :meth:`NixIO.iter_blocks`. :param index: The position of the Block to be loaded (creation order) :param nixname: The name of the Block in NIX :param neoname: The name of the original Neo Block """ if lazy: raise Exception("Lazy loading is not supported for NixIO") nix_block = None if index is not None: nix_block = self.nix_file.blocks[index] elif nixname is not None: nix_block = self.nix_file.blocks[nixname] elif neoname is not None: for blk in self.nix_file.blocks: if ("neo_name" in blk.metadata and blk.metadata["neo_name"] == neoname): nix_block = blk break else: raise KeyError(f"Block with Neo name '{neoname}' does not exist") else: index = self._block_read_counter if index >= len(self.nix_file.blocks): return None nix_block = self.nix_file.blocks[index] self._block_read_counter += 1 return self._nix_to_neo_block(nix_block) def iter_blocks(self): """ Returns an iterator which can be used to consecutively load and convert all Blocks from the NIX File. """ for blk in self.nix_file.blocks: yield self._nix_to_neo_block(blk) def _nix_to_neo_block(self, nix_block): neo_attrs = self._nix_attr_to_neo(nix_block) neo_block = Block(**neo_attrs) neo_block.rec_datetime = datetime.fromtimestamp(nix_block.created_at) # descend into Groups groups_to_resolve = [] for grp in nix_block.groups: if grp.type == "neo.segment": newseg = self._nix_to_neo_segment(grp) neo_block.segments.append(newseg) # parent reference newseg.block = neo_block elif grp.type == "neo.group": newgrp, parent_name = self._nix_to_neo_group(grp) assert parent_name is None neo_block.groups.append(newgrp) # parent reference newgrp.block = neo_block elif grp.type == "neo.subgroup": newgrp, parent_name = self._nix_to_neo_group(grp) groups_to_resolve.append((newgrp, parent_name)) else: raise Exception("Unexpected group type") # link subgroups to parents for newgrp, parent_name in groups_to_resolve: parent = self._neo_map[parent_name] parent.groups.append(newgrp) # find free floating (Groupless) signals and spiketrains blockdas = self._group_signals(nix_block.data_arrays) for name, das in blockdas.items(): if name not in self._neo_map: if das[0].type == "neo.analogsignal": self._nix_to_neo_analogsignal(das) elif das[0].type == "neo.irregularlysampledsignal": self._nix_to_neo_irregularlysampledsignal(das) elif das[0].type == "neo.imagesequence": self._nix_to_neo_imagesequence(das) for mt in nix_block.multi_tags: if mt.type == "neo.spiketrain" and mt.name not in self._neo_map: self._nix_to_neo_spiketrain(mt) # descend into Sources for src in nix_block.sources: newchx = self._nix_to_neo_channelindex(src) neo_block.channel_indexes.append(newchx) # parent reference newchx.block = neo_block # create object links neo_block.create_relationship() # reset maps self._neo_map = dict() self._ref_map = dict() self._signal_map = dict() self._view_map = dict() return neo_block def _nix_to_neo_segment(self, nix_group): neo_attrs = self._nix_attr_to_neo(nix_group) neo_segment = Segment(**neo_attrs) neo_segment.rec_datetime = datetime.fromtimestamp(nix_group.created_at) self._neo_map[nix_group.name] = neo_segment # this will probably get all the DAs anyway, but if we change any part # of the mapping to add other kinds of DataArrays to a group, such as # MultiTag positions and extents, this filter will be necessary dataarrays = list(filter( lambda da: da.type in ("neo.analogsignal", "neo.irregularlysampledsignal", "neo.imagesequence",), nix_group.data_arrays)) dataarrays = self._group_signals(dataarrays) # descend into DataArrays for name, das in dataarrays.items(): if das[0].type == "neo.analogsignal": newasig = self._nix_to_neo_analogsignal(das) neo_segment.analogsignals.append(newasig) # parent reference newasig.segment = neo_segment elif das[0].type == "neo.irregularlysampledsignal": newisig = self._nix_to_neo_irregularlysampledsignal(das) neo_segment.irregularlysampledsignals.append(newisig) # parent reference newisig.segment = neo_segment elif das[0].type == "neo.imagesequence": new_imgseq = self._nix_to_neo_imagesequence(das) neo_segment.imagesequences.append(new_imgseq) # parent reference new_imgseq.segment = neo_segment # descend into MultiTags for mtag in nix_group.multi_tags: if mtag.type == "neo.event": newevent = self._nix_to_neo_event(mtag) neo_segment.events.append(newevent) # parent reference newevent.segment = neo_segment elif mtag.type == "neo.epoch": newepoch = self._nix_to_neo_epoch(mtag) neo_segment.epochs.append(newepoch) # parent reference newepoch.segment = neo_segment elif mtag.type == "neo.spiketrain": newst = self._nix_to_neo_spiketrain(mtag) neo_segment.spiketrains.append(newst) # parent reference newst.segment = neo_segment return neo_segment def _nix_to_neo_group(self, nix_group): neo_attrs = self._nix_attr_to_neo(nix_group) parent_name = neo_attrs.pop("neo_parent", None) neo_group = Group(**neo_attrs) self._neo_map[nix_group.name] = neo_group dataarrays = list(filter( lambda da: da.type in ("neo.analogsignal", "neo.irregularlysampledsignal", "neo.imagesequence",), nix_group.data_arrays)) dataarrays = self._group_signals(dataarrays) # descend into DataArrays for name in dataarrays: obj = self._neo_map[name] neo_group.add(obj) # descend into MultiTags for mtag in nix_group.multi_tags: if mtag.type == "neo.channelview" and mtag.name not in self._neo_map: self._nix_to_neo_channelview(mtag) obj = self._neo_map[mtag.name] neo_group.add(obj) return neo_group, parent_name def _nix_to_neo_channelindex(self, nix_source): neo_attrs = self._nix_attr_to_neo(nix_source) channels = list(self._nix_attr_to_neo(c) for c in nix_source.sources if c.type == "neo.channelindex") neo_attrs["index"] = np.array([c["index"] for c in channels]) if len(channels): chan_names = list(c["name"] for c in channels if "name" in c and c["name"] is not None) chan_ids = list(c["channel_id"] for c in channels if "channel_id" in c) if chan_names: neo_attrs["channel_names"] = chan_names if chan_ids: neo_attrs["channel_ids"] = chan_ids if "coordinates" in channels[0]: neo_attrs["coordinates"] = list(c["coordinates"] for c in channels) neo_chx = ChannelIndex(**neo_attrs) self._neo_map[nix_source.name] = neo_chx # create references to Signals signals = self._ref_map.get(nix_source.name, list()) for sig in signals: if isinstance(sig, AnalogSignal): neo_chx.analogsignals.append(sig) elif isinstance(sig, IrregularlySampledSignal): neo_chx.irregularlysampledsignals.append(sig) # else error? # descend into Sources for src in nix_source.sources: if src.type == "neo.unit": newunit = self._nix_to_neo_unit(src) neo_chx.units.append(newunit) # parent reference newunit.channel_index = neo_chx return neo_chx def _nix_to_neo_channelview(self, nix_mtag): neo_attrs = self._nix_attr_to_neo(nix_mtag) index = nix_mtag.positions nix_name, = self._group_signals(nix_mtag.references).keys() obj = self._neo_map[nix_name] neo_chview = ChannelView(obj, index, **neo_attrs) self._neo_map[nix_mtag.name] = neo_chview return neo_chview def _nix_to_neo_unit(self, nix_source): neo_attrs = self._nix_attr_to_neo(nix_source) neo_unit = Unit(**neo_attrs) self._neo_map[nix_source.name] = neo_unit # create references to SpikeTrains neo_unit.spiketrains.extend(self._ref_map.get(nix_source.name, list())) return neo_unit def _nix_to_neo_analogsignal(self, nix_da_group): """ Convert a group of NIX DataArrays to a Neo AnalogSignal. This method expects a list of data arrays that all represent the same, multidimensional Neo AnalogSignal object. :param nix_da_group: a list of NIX DataArray objects :return: a Neo AnalogSignal object """ neo_attrs = self._nix_attr_to_neo(nix_da_group[0]) metadata = nix_da_group[0].metadata neo_attrs["nix_name"] = metadata.name # use the common base name unit = nix_da_group[0].unit signaldata = np.array([d[:] for d in nix_da_group]).transpose() signaldata = create_quantity(signaldata, unit) timedim = self._get_time_dimension(nix_da_group[0]) sampling_period = create_quantity(timedim.sampling_interval, timedim.unit) # t_start should have been added to neo_attrs via the NIX # object's metadata. This may not be present since in older # versions, we didn't store t_start in the metadata when it # wasn't necessary, such as when the timedim.offset and unit # did not require rescaling. if "t_start" in neo_attrs: t_start = neo_attrs["t_start"] del neo_attrs["t_start"] else: t_start = create_quantity(timedim.offset, timedim.unit) neo_signal = AnalogSignal(signal=signaldata, sampling_period=sampling_period, t_start=t_start, **neo_attrs) self._neo_map[neo_attrs["nix_name"]] = neo_signal # all DAs reference the same sources srcnames = list(src.name for src in nix_da_group[0].sources) for n in srcnames: if n not in self._ref_map: self._ref_map[n] = list() self._ref_map[n].append(neo_signal) return neo_signal def _nix_to_neo_imagesequence(self, nix_da_group): """ Convert a group of NIX DataArrays to a Neo ImageSequence. This method expects a list of data arrays that all represent the same, multidimensional Neo ImageSequence object. :param nix_da_group: a list of NIX DataArray objects :return: a Neo ImageSequence object """ neo_attrs = self._nix_attr_to_neo(nix_da_group[0]) metadata = nix_da_group[0].metadata neo_attrs["nix_name"] = metadata.name # use the common base name unit = nix_da_group[0].unit imgseq = np.array([d[:] for d in nix_da_group]).transpose() sampling_rate = neo_attrs["sampling_rate"] del neo_attrs["sampling_rate"] spatial_scale = neo_attrs["spatial_scale"] del neo_attrs["spatial_scale"] if "t_start" in neo_attrs: t_start = neo_attrs["t_start"] del neo_attrs["t_start"] else: t_start = 0.0 * pq.ms neo_seq = ImageSequence(image_data=imgseq, sampling_rate=sampling_rate, spatial_scale=spatial_scale, units=unit, t_start=t_start, **neo_attrs) self._neo_map[neo_attrs["nix_name"]] = neo_seq # all DAs reference the same sources srcnames = list(src.name for src in nix_da_group[0].sources) for n in srcnames: if n not in self._ref_map: self._ref_map[n] = list() self._ref_map[n].append(neo_seq) return neo_seq def _nix_to_neo_irregularlysampledsignal(self, nix_da_group): """ Convert a group of NIX DataArrays to a Neo IrregularlySampledSignal. This method expects a list of data arrays that all represent the same, multidimensional Neo IrregularlySampledSignal object. :param nix_da_group: a list of NIX DataArray objects :return: a Neo IrregularlySampledSignal object """ neo_attrs = self._nix_attr_to_neo(nix_da_group[0]) metadata = nix_da_group[0].metadata neo_attrs["nix_name"] = metadata.name # use the common base name unit = nix_da_group[0].unit signaldata = np.array([d[:] for d in nix_da_group]).transpose() signaldata = create_quantity(signaldata, unit) timedim = self._get_time_dimension(nix_da_group[0]) times = create_quantity(timedim.ticks, timedim.unit) neo_signal = IrregularlySampledSignal(signal=signaldata, times=times, **neo_attrs) self._neo_map[neo_attrs["nix_name"]] = neo_signal # all DAs reference the same sources srcnames = list(src.name for src in nix_da_group[0].sources) for n in srcnames: if n not in self._ref_map: self._ref_map[n] = list() self._ref_map[n].append(neo_signal) return neo_signal def _nix_to_neo_event(self, nix_mtag): neo_attrs = self._nix_attr_to_neo(nix_mtag) time_unit = nix_mtag.positions.unit times = create_quantity(nix_mtag.positions, time_unit) labels = np.array(nix_mtag.positions.dimensions[0].labels, dtype="U") neo_event = Event(times=times, labels=labels, **neo_attrs) self._neo_map[nix_mtag.name] = neo_event return neo_event def _nix_to_neo_epoch(self, nix_mtag): neo_attrs = self._nix_attr_to_neo(nix_mtag) time_unit = nix_mtag.positions.unit times = create_quantity(nix_mtag.positions, time_unit) durations = create_quantity(nix_mtag.extents, nix_mtag.extents.unit) if len(nix_mtag.positions.dimensions[0].labels) > 0: labels = np.array(nix_mtag.positions.dimensions[0].labels, dtype="U") else: labels = None neo_epoch = Epoch(times=times, durations=durations, labels=labels, **neo_attrs) self._neo_map[nix_mtag.name] = neo_epoch return neo_epoch def _nix_to_neo_spiketrain(self, nix_mtag): neo_attrs = self._nix_attr_to_neo(nix_mtag) time_unit = nix_mtag.positions.unit times = create_quantity(nix_mtag.positions, time_unit) neo_spiketrain = SpikeTrain(times=times, **neo_attrs) if nix_mtag.features: wfda = nix_mtag.features[0].data wftime = self._get_time_dimension(wfda) neo_spiketrain.waveforms = create_quantity(wfda, wfda.unit) interval_units = wftime.unit neo_spiketrain.sampling_period = create_quantity(wftime.sampling_interval, interval_units) left_sweep_units = wftime.unit if "left_sweep" in wfda.metadata: neo_spiketrain.left_sweep = create_quantity(wfda.metadata["left_sweep"], left_sweep_units) self._neo_map[nix_mtag.name] = neo_spiketrain srcnames = list(src.name for src in nix_mtag.sources) for n in srcnames: if n not in self._ref_map: self._ref_map[n] = list() self._ref_map[n].append(neo_spiketrain) return neo_spiketrain def write_all_blocks(self, neo_blocks, use_obj_names=False): """ Convert all ``neo_blocks`` to the NIX equivalent and write them to the file. :param neo_blocks: List (or iterable) containing Neo blocks :param use_obj_names: If True, will not generate unique object names but will instead try to use the name of each Neo object. If these are not unique, an exception will be raised. """ if use_obj_names: self._use_obj_names(neo_blocks) self._names_ok = True for bl in neo_blocks: self.write_block(bl, use_obj_names) def write_block(self, block, use_obj_names=False): """ Convert the provided Neo Block to a NIX Block and write it to the NIX file. :param block: Neo Block to be written :param use_obj_names: If True, will not generate unique object names but will instead try to use the name of each Neo object. If these are not unique, an exception will be raised. """ if use_obj_names: if not self._names_ok: # _names_ok guards against check duplication # If it's False, it means write_block() was called directly self._use_obj_names([block]) if "nix_name" in block.annotations: nix_name = block.annotations["nix_name"] else: nix_name = f"neo.block.{self._generate_nix_name()}" block.annotate(nix_name=nix_name) if nix_name in self.nix_file.blocks: nixblock = self.nix_file.blocks[nix_name] del self.nix_file.blocks[nix_name] del self.nix_file.sections[nix_name] nixblock = self.nix_file.create_block(nix_name, "neo.block") nixblock.metadata = self.nix_file.create_section(nix_name, "neo.block.metadata") metadata = nixblock.metadata neoname = block.name if block.name is not None else "" metadata["neo_name"] = neoname nixblock.definition = block.description if block.rec_datetime: nix_rec_dt = int(block.rec_datetime.strftime("%s")) nixblock.force_created_at(nix_rec_dt) if block.file_datetime: fdt, annotype = dt_to_nix(block.file_datetime) fdtprop = metadata.create_property("file_datetime", fdt) fdtprop.definition = annotype if block.annotations: for k, v in block.annotations.items(): self._write_property(metadata, k, v) # descend into Segments for seg in block.segments: self._write_segment(seg, nixblock) # descend into ChannelIndexes for chx in block.channel_indexes: self._write_channelindex(chx, nixblock) # descend into Neo Groups for group in block.groups: self._write_group(group, nixblock) self._create_source_links(block, nixblock) def _write_channelindex(self, chx, nixblock): """ Convert the provided Neo ChannelIndex to a NIX Source and write it to the NIX file. For each index in the ChannelIndex object, a child NIX Source is also created. :param chx: The Neo ChannelIndex to be written :param nixblock: NIX Block where the Source will be created """ if "nix_name" in chx.annotations: nix_name = chx.annotations["nix_name"] else: nix_name = f"neo.channelindex.{self._generate_nix_name()}" chx.annotate(nix_name=nix_name) nixsource = nixblock.create_source(nix_name, "neo.channelindex") nixsource.metadata = nixblock.metadata.create_section(nix_name, "neo.channelindex.metadata") metadata = nixsource.metadata neoname = chx.name if chx.name is not None else "" metadata["neo_name"] = neoname nixsource.definition = chx.description if chx.annotations: for k, v in chx.annotations.items(): self._write_property(metadata, k, v) coordinates = chx.coordinates if coordinates is not None and np.ndim(coordinates) == 1: # support 1D coordinates for single ChannelIndex coordinates = [coordinates] for idx, channel in enumerate(chx.index): channame = f"{nix_name}.ChannelIndex{idx}" nixchan = nixsource.create_source(channame, "neo.channelindex") nixchan.metadata = nixsource.metadata.create_section(nixchan.name, "neo.channelindex.metadata") nixchan.definition = nixsource.definition chanmd = nixchan.metadata chanmd["index"] = int(channel) if len(chx.channel_names): neochanname = stringify(chx.channel_names[idx]) chanmd["neo_name"] = neochanname if len(chx.channel_ids): chanid = chx.channel_ids[idx] chanmd["channel_id"] = chanid if coordinates is not None: coords = coordinates[idx] coordunits = stringify(coords[0].dimensionality) nixcoords = tuple(c.magnitude.item() for c in coords) chanprop = chanmd.create_property("coordinates", nixcoords) chanprop.unit = coordunits # Descend into Units for unit in chx.units: self._write_unit(unit, nixsource) def _write_channelview(self, chview, nixblock, nixgroup): """ Convert the provided Neo ChannelView to a NIX MultiTag and write it to the NIX file. :param chx: The Neo ChannelView to be written :param nixblock: NIX Block where the MultiTag will be created """ if "nix_name" in chview.annotations: nix_name = chview.annotations["nix_name"] else: nix_name = "neo.channelview.{}".format(self._generate_nix_name()) chview.annotate(nix_name=nix_name) # create a new data array if this channelview was not saved yet if not nix_name in self._view_map: channels = nixblock.create_data_array( "{}.index".format(nix_name), "neo.channelview.index", data=chview.index ) nixmt = nixblock.create_multi_tag(nix_name, "neo.channelview", positions=channels) nixmt.metadata = nixgroup.metadata.create_section( nix_name, "neo.channelview.metadata" ) metadata = nixmt.metadata neoname = chview.name if chview.name is not None else "" metadata["neo_name"] = neoname nixmt.definition = chview.description if chview.annotations: for k, v in chview.annotations.items(): self._write_property(metadata, k, v) self._view_map[nix_name] = nixmt # link tag to the data array for the ChannelView's signal if not ("nix_name" in chview.obj.annotations and chview.obj.annotations["nix_name"] in self._signal_map): # the following restriction could be relaxed later # but for a first pass this simplifies my mental model raise Exception("Need to save signals before saving views") nix_name = chview.obj.annotations["nix_name"] nixmt.references.extend(self._signal_map[nix_name]) else: nixmt = self._view_map[nix_name] nixgroup.multi_tags.append(nixmt) def _write_segment(self, segment, nixblock): """ Convert the provided Neo Segment to a NIX Group and write it to the NIX file. :param segment: Neo Segment to be written :param nixblock: NIX Block where the Group will be created """ if "nix_name" in segment.annotations: nix_name = segment.annotations["nix_name"] else: nix_name = f"neo.segment.{self._generate_nix_name()}" segment.annotate(nix_name=nix_name) nixgroup = nixblock.create_group(nix_name, "neo.segment") nixgroup.metadata = nixblock.metadata.create_section(nix_name, "neo.segment.metadata") metadata = nixgroup.metadata neoname = segment.name if segment.name is not None else "" metadata["neo_name"] = neoname nixgroup.definition = segment.description if segment.rec_datetime: nix_rec_dt = int(segment.rec_datetime.strftime("%s")) nixgroup.force_created_at(nix_rec_dt) if segment.file_datetime: fdt, annotype = dt_to_nix(segment.file_datetime) fdtprop = metadata.create_property("file_datetime", fdt) fdtprop.definition = annotype if segment.annotations: for k, v in segment.annotations.items(): self._write_property(metadata, k, v) # write signals, events, epochs, and spiketrains for asig in segment.analogsignals: self._write_analogsignal(asig, nixblock, nixgroup) for isig in segment.irregularlysampledsignals: self._write_irregularlysampledsignal(isig, nixblock, nixgroup) for event in segment.events: self._write_event(event, nixblock, nixgroup) for epoch in segment.epochs: self._write_epoch(epoch, nixblock, nixgroup) for spiketrain in segment.spiketrains: self._write_spiketrain(spiketrain, nixblock, nixgroup) for imagesequence in segment.imagesequences: self._write_imagesequence(imagesequence, nixblock, nixgroup) def _write_group(self, neo_group, nixblock, parent=None): """ Convert the provided Neo Group to a NIX Group and write it to the NIX file. :param neo_group: Neo Group to be written :param nixblock: NIX Block where the NIX Group will be created :param parent: for sub-groups, the parent Neo Group """ if parent: label = "neo.subgroup" # note that the use of a different label for top-level groups and sub-groups is not # strictly necessary, the presence of the "neo_parent" annotation is sufficient. # However, I think it adds clarity and helps in debugging and testing. else: label = "neo.group" if "nix_name" in neo_group.annotations: nix_name = neo_group.annotations["nix_name"] else: nix_name = "{}.{}".format(label, self._generate_nix_name()) neo_group.annotate(nix_name=nix_name) nixgroup = nixblock.create_group(nix_name, label) nixgroup.metadata = nixblock.metadata.create_section( nix_name, f"{label}.metadata" ) metadata = nixgroup.metadata neoname = neo_group.name if neo_group.name is not None else "" metadata["neo_name"] = neoname if parent: metadata["neo_parent"] = parent.annotations["nix_name"] nixgroup.definition = neo_group.description if neo_group.annotations: for k, v in neo_group.annotations.items(): self._write_property(metadata, k, v) # link signals and image sequences objnames = [] for obj in chain( neo_group.analogsignals, neo_group.irregularlysampledsignals, neo_group.imagesequences, ): if not ("nix_name" in obj.annotations and obj.annotations["nix_name"] in self._signal_map): # the following restriction could be relaxed later # but for a first pass this simplifies my mental model raise Exception("Orphan signals/image sequences cannot be stored, needs to belong to a Segment") objnames.append(obj.annotations["nix_name"]) for name in objnames: for da in self._signal_map[name]: nixgroup.data_arrays.append(da) # link events, epochs and spiketrains objnames = [] for obj in chain( neo_group.events, neo_group.epochs, neo_group.spiketrains, ): if not ("nix_name" in obj.annotations and obj.annotations["nix_name"] in nixblock.multi_tags): # the following restriction could be relaxed later # but for a first pass this simplifies my mental model raise Exception("Orphan epochs/events/spiketrains cannot be stored, needs to belong to a Segment") objnames.append(obj.annotations["nix_name"]) for name in objnames: mt = nixblock.multi_tags[name] nixgroup.multi_tags.append(mt) # save channel views for chview in neo_group.channelviews: self._write_channelview(chview, nixblock, nixgroup) # save sub-groups for subgroup in neo_group.groups: self._write_group(subgroup, nixblock, parent=neo_group) def _write_analogsignal(self, anasig, nixblock, nixgroup): """ Convert the provided ``anasig`` (AnalogSignal) to a list of NIX DataArray objects and write them to the NIX file. All DataArray objects created from the same AnalogSignal have their metadata section point to the same object. :param anasig: The Neo AnalogSignal to be written :param nixblock: NIX Block where the DataArrays will be created :param nixgroup: NIX Group where the DataArrays will be attached """ if "nix_name" in anasig.annotations: nix_name = anasig.annotations["nix_name"] else: nix_name = f"neo.analogsignal.{self._generate_nix_name()}" anasig.annotate(nix_name=nix_name) if f"{nix_name}.0" in nixblock.data_arrays and nixgroup: # AnalogSignal is in multiple Segments. # Append DataArrays to Group and return. dalist = list() for idx in itertools.count(): daname = f"{nix_name}.{idx}" if daname in nixblock.data_arrays: dalist.append(nixblock.data_arrays[daname]) else: break nixgroup.data_arrays.extend(dalist) return if isinstance(anasig, BaseProxy): data = np.transpose(anasig.load()[:].magnitude) else: data = np.transpose(anasig[:].magnitude) parentmd = nixgroup.metadata if nixgroup else nixblock.metadata metadata = parentmd.create_section(nix_name, "neo.analogsignal.metadata") nixdas = list() for idx, row in enumerate(data): daname = f"{nix_name}.{idx}" da = nixblock.create_data_array(daname, "neo.analogsignal", data=row) da.metadata = metadata da.definition = anasig.description da.unit = units_to_string(anasig.units) sampling_period = anasig.sampling_period.magnitude.item() timedim = da.append_sampled_dimension(sampling_period) timedim.unit = units_to_string(anasig.sampling_period.units) tstart = anasig.t_start metadata["t_start"] = tstart.magnitude.item() metadata.props["t_start"].unit = units_to_string(tstart.units) timedim.offset = tstart.rescale(timedim.unit).magnitude.item() timedim.label = "time" nixdas.append(da) if nixgroup: nixgroup.data_arrays.append(da) neoname = anasig.name if anasig.name is not None else "" metadata["neo_name"] = neoname if anasig.annotations: for k, v in anasig.annotations.items(): self._write_property(metadata, k, v) if anasig.array_annotations: for k, v in anasig.array_annotations.items(): p = self._write_property(metadata, k, v) p.type = ARRAYANNOTATION self._signal_map[nix_name] = nixdas def _write_imagesequence(self, imgseq, nixblock, nixgroup): """ Convert the provided ``imgseq`` (ImageSequence) to a list of NIX DataArray objects and write them to the NIX file. All DataArray objects created from the same ImageSequence have their metadata section point to the same object. :param anasig: The Neo ImageSequence to be written :param nixblock: NIX Block where the DataArrays will be created :param nixgroup: NIX Group where the DataArrays will be attached """ if "nix_name" in imgseq.annotations: nix_name = imgseq.annotations["nix_name"] else: nix_name = f"neo.imagesequence.{self._generate_nix_name()}" imgseq.annotate(nix_name=nix_name) if f"{nix_name}.0" in nixblock.data_arrays and nixgroup: dalist = list() for idx in itertools.count(): daname = f"{nix_name}.{idx}" if daname in nixblock.data_arrays: dalist.append(nixblock.data_arrays[daname]) else: break nixgroup.data_arrays.extend(dalist) return if isinstance(imgseq, BaseProxy): data = np.transpose(imgseq.load()[:].magnitude) else: data = np.transpose(imgseq[:].magnitude) parentmd = nixgroup.metadata if nixgroup else nixblock.metadata metadata = parentmd.create_section(nix_name, "neo.imagesequence.metadata") nixdas = list() for idx, row in enumerate(data): daname = f"{nix_name}.{idx}" da = nixblock.create_data_array(daname, "neo.imagesequence", data=row) da.metadata = metadata da.definition = imgseq.description da.unit = units_to_string(imgseq.units) metadata["sampling_rate"] = imgseq.sampling_rate.magnitude.item() units = imgseq.sampling_rate.units metadata.props["sampling_rate"].unit = units_to_string(units) metadata["spatial_scale"] = imgseq.spatial_scale.magnitude.item() units = imgseq.spatial_scale.units metadata.props["spatial_scale"].unit = units_to_string(units) metadata["t_start"] = imgseq.t_start.magnitude.item() units = imgseq.t_start.units metadata.props["t_start"].unit = units_to_string(units) nixdas.append(da) if nixgroup: nixgroup.data_arrays.append(da) neoname = imgseq.name if imgseq.name is not None else "" metadata["neo_name"] = neoname if imgseq.annotations: for k, v in imgseq.annotations.items(): self._write_property(metadata, k, v) self._signal_map[nix_name] = nixdas def _write_irregularlysampledsignal(self, irsig, nixblock, nixgroup): """ Convert the provided ``irsig`` (IrregularlySampledSignal) to a list of NIX DataArray objects and write them to the NIX file at the location. All DataArray objects created from the same IrregularlySampledSignal have their metadata section point to the same object. :param irsig: The Neo IrregularlySampledSignal to be written :param nixblock: NIX Block where the DataArrays will be created :param nixgroup: NIX Group where the DataArrays will be attached """ if "nix_name" in irsig.annotations: nix_name = irsig.annotations["nix_name"] else: nix_name = f"neo.irregularlysampledsignal.{self._generate_nix_name()}" irsig.annotate(nix_name=nix_name) if f"{nix_name}.0" in nixblock.data_arrays and nixgroup: # IrregularlySampledSignal is in multiple Segments. # Append DataArrays to Group and return. dalist = list() for idx in itertools.count(): daname = f"{nix_name}.{idx}" if daname in nixblock.data_arrays: dalist.append(nixblock.data_arrays[daname]) else: break nixgroup.data_arrays.extend(dalist) return if isinstance(irsig, BaseProxy): data = np.transpose(irsig.load()[:].magnitude) else: data = np.transpose(irsig[:].magnitude) parentmd = nixgroup.metadata if nixgroup else nixblock.metadata metadata = parentmd.create_section(nix_name, "neo.irregularlysampledsignal.metadata") nixdas = list() for idx, row in enumerate(data): daname = f"{nix_name}.{idx}" da = nixblock.create_data_array(daname, "neo.irregularlysampledsignal", data=row) da.metadata = metadata da.definition = irsig.description da.unit = units_to_string(irsig.units) timedim = da.append_range_dimension(irsig.times.magnitude) timedim.unit = units_to_string(irsig.times.units) timedim.label = "time" nixdas.append(da) if nixgroup: nixgroup.data_arrays.append(da) neoname = irsig.name if irsig.name is not None else "" metadata["neo_name"] = neoname if irsig.annotations: for k, v in irsig.annotations.items(): self._write_property(metadata, k, v) if irsig.array_annotations: for k, v in irsig.array_annotations.items(): p = self._write_property(metadata, k, v) p.type = ARRAYANNOTATION self._signal_map[nix_name] = nixdas def _write_event(self, event, nixblock, nixgroup): """ Convert the provided Neo Event to a NIX MultiTag and write it to the NIX file. :param event: The Neo Event to be written :param nixblock: NIX Block where the MultiTag will be created :param nixgroup: NIX Group where the MultiTag will be attached """ if "nix_name" in event.annotations: nix_name = event.annotations["nix_name"] else: nix_name = f"neo.event.{self._generate_nix_name()}" event.annotate(nix_name=nix_name) if nix_name in nixblock.multi_tags: # Event is in multiple Segments. Append to Group and return. mt = nixblock.multi_tags[nix_name] nixgroup.multi_tags.append(mt) return if isinstance(event, BaseProxy): event = event.load() times = event.times.magnitude units = units_to_string(event.times.units) labels = event.labels timesda = nixblock.create_data_array(f"{nix_name}.times", "neo.event.times", data=times) timesda.unit = units nixmt = nixblock.create_multi_tag(nix_name, "neo.event", positions=timesda) nixmt.metadata = nixgroup.metadata.create_section(nix_name, "neo.event.metadata") metadata = nixmt.metadata labeldim = timesda.append_set_dimension() labeldim.labels = labels neoname = event.name if event.name is not None else "" metadata["neo_name"] = neoname nixmt.definition = event.description if event.annotations: for k, v in event.annotations.items(): self._write_property(metadata, k, v) if event.array_annotations: for k, v in event.array_annotations.items(): p = self._write_property(metadata, k, v) p.type = ARRAYANNOTATION nixgroup.multi_tags.append(nixmt) # reference all AnalogSignals and IrregularlySampledSignals in Group for da in nixgroup.data_arrays: if da.type in ("neo.analogsignal", "neo.irregularlysampledsignal"): nixmt.references.append(da) def _write_epoch(self, epoch, nixblock, nixgroup): """ Convert the provided Neo Epoch to a NIX MultiTag and write it to the NIX file. :param epoch: The Neo Epoch to be written :param nixblock: NIX Block where the MultiTag will be created :param nixgroup: NIX Group where the MultiTag will be attached """ if "nix_name" in epoch.annotations: nix_name = epoch.annotations["nix_name"] else: nix_name = f"neo.epoch.{self._generate_nix_name()}" epoch.annotate(nix_name=nix_name) if nix_name in nixblock.multi_tags: # Epoch is in multiple Segments. Append to Group and return. mt = nixblock.multi_tags[nix_name] nixgroup.multi_tags.append(mt) return if isinstance(epoch, BaseProxy): epoch = epoch.load() times = epoch.times.magnitude tunits = units_to_string(epoch.times.units) durations = epoch.durations.magnitude dunits = units_to_string(epoch.durations.units) timesda = nixblock.create_data_array(f"{nix_name}.times", "neo.epoch.times", data=times) timesda.unit = tunits nixmt = nixblock.create_multi_tag(nix_name, "neo.epoch", positions=timesda) durada = nixblock.create_data_array(f"{nix_name}.durations", "neo.epoch.durations", data=durations) durada.unit = dunits nixmt.extents = durada nixmt.metadata = nixgroup.metadata.create_section(nix_name, "neo.epoch.metadata") metadata = nixmt.metadata labeldim = timesda.append_set_dimension() labeldim.labels = epoch.labels neoname = epoch.name if epoch.name is not None else "" metadata["neo_name"] = neoname nixmt.definition = epoch.description if epoch.annotations: for k, v in epoch.annotations.items(): self._write_property(metadata, k, v) if epoch.array_annotations: for k, v in epoch.array_annotations.items(): p = self._write_property(metadata, k, v) p.type = ARRAYANNOTATION nixgroup.multi_tags.append(nixmt) # reference all AnalogSignals and IrregularlySampledSignals in Group for da in nixgroup.data_arrays: if da.type in ("neo.analogsignal", "neo.irregularlysampledsignal"): nixmt.references.append(da) def _write_spiketrain(self, spiketrain, nixblock, nixgroup): """ Convert the provided Neo SpikeTrain to a NIX MultiTag and write it to the NIX file. :param spiketrain: The Neo SpikeTrain to be written :param nixblock: NIX Block where the MultiTag will be created :param nixgroup: NIX Group where the MultiTag will be attached """ if "nix_name" in spiketrain.annotations: nix_name = spiketrain.annotations["nix_name"] else: nix_name = f"neo.spiketrain.{self._generate_nix_name()}" spiketrain.annotate(nix_name=nix_name) if nix_name in nixblock.multi_tags and nixgroup: # SpikeTrain is in multiple Segments. Append to Group and return. mt = nixblock.multi_tags[nix_name] nixgroup.multi_tags.append(mt) return if isinstance(spiketrain, BaseProxy): spiketrain = spiketrain.load() times = spiketrain.times.magnitude tunits = units_to_string(spiketrain.times.units) waveforms = spiketrain.waveforms timesda = nixblock.create_data_array(f"{nix_name}.times", "neo.spiketrain.times", data=times) timesda.unit = tunits nixmt = nixblock.create_multi_tag(nix_name, "neo.spiketrain", positions=timesda) parentmd = nixgroup.metadata if nixgroup else nixblock.metadata nixmt.metadata = parentmd.create_section(nix_name, "neo.spiketrain.metadata") metadata = nixmt.metadata neoname = spiketrain.name if spiketrain.name is not None else "" metadata["neo_name"] = neoname nixmt.definition = spiketrain.description self._write_property(metadata, "t_start", spiketrain.t_start) self._write_property(metadata, "t_stop", spiketrain.t_stop) if spiketrain.annotations: for k, v in spiketrain.annotations.items(): self._write_property(metadata, k, v) if spiketrain.array_annotations: for k, v in spiketrain.array_annotations.items(): p = self._write_property(metadata, k, v) p.type = ARRAYANNOTATION if nixgroup: nixgroup.multi_tags.append(nixmt) if waveforms is not None: wfdata = list(wf.magnitude for wf in list(wfgroup for wfgroup in spiketrain.waveforms)) wfunits = units_to_string(spiketrain.waveforms.units) wfda = nixblock.create_data_array(f"{nix_name}.waveforms", "neo.waveforms", data=wfdata) wfda.unit = wfunits wfda.metadata = nixmt.metadata.create_section(wfda.name, "neo.waveforms.metadata") nixmt.create_feature(wfda, nix.LinkType.Indexed) # TODO: Move time dimension first for PR #457 # https://github.com/NeuralEnsemble/python-neo/pull/457 wfda.append_set_dimension() wfda.append_set_dimension() wftime = wfda.append_sampled_dimension(spiketrain.sampling_period.magnitude.item()) wftime.unit = units_to_string(spiketrain.sampling_period.units) wftime.label = "time" if spiketrain.left_sweep is not None: self._write_property(wfda.metadata, "left_sweep", spiketrain.left_sweep) def _write_unit(self, neounit, nixchxsource): """ Convert the provided Neo Unit to a NIX Source and write it to the NIX file. :param neounit: The Neo Unit to be written :param nixchxsource: NIX Source (ChannelIndex) where the new Source (Unit) will be created """ if "nix_name" in neounit.annotations: nix_name = neounit.annotations["nix_name"] else: nix_name = f"neo.unit.{self._generate_nix_name()}" neounit.annotate(nix_name=nix_name) nixunitsource = nixchxsource.create_source(nix_name, "neo.unit") nixunitsource.metadata = nixchxsource.metadata.create_section(nix_name, "neo.unit.metadata") metadata = nixunitsource.metadata neoname = neounit.name if neounit.name is not None else "" metadata["neo_name"] = neoname nixunitsource.definition = neounit.description if neounit.annotations: for k, v in neounit.annotations.items(): self._write_property(metadata, k, v) def _create_source_links(self, neoblock, nixblock): """ Creates references between objects in a NIX Block to store the references in the Neo ChannelIndex and Unit objects. Specifically: - If a Neo ChannelIndex references a Neo AnalogSignal or IrregularlySampledSignal, the corresponding signal DataArray will reference the corresponding NIX Source object which represents the Neo ChannelIndex. - If a Neo Unit references a Neo SpikeTrain, the corresponding MultiTag will reference the NIX Source objects which represent the Neo Unit and its parent ChannelIndex. The two arguments must represent the same Block in each corresponding format. Neo objects that have not been converted yet (i.e., AnalogSignal, IrregularlySampledSignal, or SpikeTrain objects that are not attached to a Segment) are created on the nixblock. :param neoblock: A Neo Block object :param nixblock: The corresponding NIX Block """ for chx in neoblock.channel_indexes: signames = [] for asig in chx.analogsignals: if not ("nix_name" in asig.annotations and asig.annotations["nix_name"] in self._signal_map): self._write_analogsignal(asig, nixblock, None) signames.append(asig.annotations["nix_name"]) for isig in chx.irregularlysampledsignals: if not ("nix_name" in isig.annotations and isig.annotations["nix_name"] in self._signal_map): self._write_irregularlysampledsignal(isig, nixblock, None) signames.append(isig.annotations["nix_name"]) chxsource = nixblock.sources[chx.annotations["nix_name"]] for name in signames: for da in self._signal_map[name]: da.sources.append(chxsource) for unit in chx.units: unitsource = chxsource.sources[unit.annotations["nix_name"]] for st in unit.spiketrains: mtags = nixblock.multi_tags if not ("nix_name" in st.annotations and st.annotations["nix_name"] in mtags): self._write_spiketrain(st, nixblock, None) stmt = mtags[st.annotations["nix_name"]] stmt.sources.append(chxsource) stmt.sources.append(unitsource) @staticmethod def _generate_nix_name(): return uuid4().hex def _write_property(self, section, name, v): """ Create a metadata property with a given name and value on the provided metadata section. :param section: The metadata section to hold the new property :param name: The name of the property :param v: The value to write :return: The newly created property """ if isinstance(v, pq.Quantity): if len(v.shape): section.create_property(name, tuple(v.magnitude)) else: section.create_property(name, v.magnitude.item()) section.props[name].unit = str(v.dimensionality) elif isinstance(v, datetime_types): value, annotype = dt_to_nix(v) prop = section.create_property(name, value) prop.definition = annotype elif isinstance(v, str): if len(v): section.create_property(name, v) else: section.create_property(name, nix.DataType.String) elif isinstance(v, bytes): section.create_property(name, v.decode()) elif isinstance(v, Iterable): values = [] unit = None definition = None if len(v) == 0: # NIX supports empty properties but dtype must be specified # Defaulting to String and using definition to signify empty # iterable as opposed to empty string values = nix.DataType.String definition = EMPTYANNOTATION elif hasattr(v, "ndim") and v.ndim == 0: values = v.item() if isinstance(v, pq.Quantity): unit = str(v.dimensionality) else: for item in v: if isinstance(item, str): item = item elif isinstance(item, pq.Quantity): unit = str(item.dimensionality) item = item.magnitude.item() elif isinstance(item, Iterable): self.logger.warn("Multidimensional arrays and nested " "containers are not currently " "supported when writing to NIX.") return None else: item = item values.append(item) section.create_property(name, values) section.props[name].unit = unit section.props[name].definition = definition elif type(v).__module__ == "numpy": section.create_property(name, v.item()) else: section.create_property(name, v) return section.props[name] @staticmethod def _nix_attr_to_neo(nix_obj): """ Reads common attributes and metadata from a NIX object and populates a dictionary with Neo-compatible attributes and annotations. Common attributes: neo_name, nix_name, description, file_datetime (if applicable). Metadata: For properties that specify a 'unit', a Quantity object is created. """ neo_attrs = dict() neo_attrs["nix_name"] = nix_obj.name neo_attrs["description"] = stringify(nix_obj.definition) if nix_obj.metadata: for prop in nix_obj.metadata.inherited_properties(): values = list(prop.values) if prop.unit: units = prop.unit values = create_quantity(values, units) if not len(values): if prop.definition == EMPTYANNOTATION: values = list() elif prop.data_type == nix.DataType.String: values = "" elif len(values) == 1: values = values[0] if prop.definition in (DATEANNOTATION, TIMEANNOTATION, DATETIMEANNOTATION): values = dt_from_nix(values, prop.definition) if prop.type == ARRAYANNOTATION: if 'array_annotations' in neo_attrs: neo_attrs['array_annotations'][prop.name] = values else: neo_attrs['array_annotations'] = {prop.name: values} else: neo_attrs[prop.name] = values # since the 'neo_name' NIX property becomes the actual object's name, # there's no reason to keep it in the annotations neo_attrs["name"] = stringify(neo_attrs.pop("neo_name", None)) return neo_attrs @staticmethod def _group_signals(dataarrays): """ Groups data arrays that were generated by the same Neo Signal object. The collection can contain both AnalogSignals and IrregularlySampledSignals. :param dataarrays: A collection of DataArray objects to group :return: A dictionary mapping a base name to a list of DataArrays which belong to the same Signal """ # now start grouping groups = OrderedDict() for da in dataarrays: basename = ".".join(da.name.split(".")[:-1]) if basename not in groups: groups[basename] = list() groups[basename].append(da) return groups @staticmethod def _get_time_dimension(obj): for dim in obj.dimensions: if hasattr(dim, "label") and dim.label == "time": return dim return None def _use_obj_names(self, blocks): errmsg = "use_obj_names enabled: found conflict or anonymous object" allobjs = [] def check_unique(objs): names = list(o.name for o in objs) if None in names or "" in names: raise ValueError(names) if len(names) != len(set(names)): self._names_ok = False raise ValueError(names) # collect objs if ok allobjs.extend(objs) try: check_unique(blocks) except ValueError as exc: raise ValueError(f"{errmsg} in Blocks") from exc for blk in blocks: try: # Segments check_unique(blk.segments) except ValueError as exc: raise ValueError(f"{errmsg} at Block '{blk.name}' > segments") from exc # collect all signals in all segments signals = [] # collect all events, epochs, and spiketrains in all segments eests = [] for seg in blk.segments: signals.extend(seg.analogsignals) signals.extend(seg.irregularlysampledsignals) signals.extend(seg.imagesequences) eests.extend(seg.events) eests.extend(seg.epochs) eests.extend(seg.spiketrains) try: # AnalogSignals and IrregularlySampledSignals check_unique(signals) except ValueError as exc: raise ValueError(f"{errmsg} in Signal names of Block '{blk.name}'") from exc try: # Events, Epochs, and SpikeTrains check_unique(eests) except ValueError as exc: raise ValueError( f"{errmsg} in Event, Epoch, and Spiketrain names of Block '{blk.name}'" ) from exc try: # ChannelIndexes check_unique(blk.channel_indexes) except ValueError as exc: raise ValueError(f"{errmsg} in ChannelIndex names of Block '{blk.name}'") from exc for chx in blk.channel_indexes: try: check_unique(chx.units) except ValueError as exc: raise ValueError(f"{errmsg} in Unit names of Block '{blk.name}' > " f"ChannelIndex '{chx.name}'") from exc # names are OK: assign annotations for o in allobjs: o.annotations["nix_name"] = o.name def close(self): """ Closes the open nix file and resets maps. """ if (hasattr(self, "nix_file") and self.nix_file and self.nix_file.is_open()): self.nix_file.close() self.nix_file = None self._neo_map = None self._ref_map = None self._signal_map = None self._view_map = None self._block_read_counter = None def __del__(self): self.close()