# -*- coding: utf-8 -*- """ Class for reading data from Plexon acquisition system (.plx) Compatible with versions 100 to 106. Other versions have not been tested. This IO is developed thanks to the header file downloadable from: http://www.plexon.com/downloads.html Supported: Read Author: sgarcia """ from __future__ import unicode_literals, print_function, division import datetime import struct import os import numpy as np import quantities as pq from neo.io.baseio import BaseIO from neo.core import Segment, AnalogSignal, SpikeTrain, Event class PlexonIO(BaseIO): """ Class for reading data from Plexon acquisition systems (.plx) Compatible with versions 100 to 106. Other versions have not been tested. Usage: >>> from neo import io >>> r = io.PlexonIO(filename='File_plexon_1.plx') >>> seg = r.read_segment(lazy=False, cascade=True) >>> print seg.analogsignals [] >>> print seg.spiketrains # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE [>> print seg.events [] """ is_readable = True is_writable = False supported_objects = [Segment, AnalogSignal, SpikeTrain, Event] readable_objects = [Segment] writeable_objects = [] has_header = False is_streameable = False # This is for GUI stuff: a definition for parameters when reading. read_params = { Segment: [ ('load_spike_waveform', {'value': False}), ] } write_params = None name = 'Plexon' extensions = ['plx'] mode = 'file' def __init__(self, filename=None): """ Arguments: filename : the filename """ BaseIO.__init__(self, filename) def read_segment(self, lazy=False, cascade=True, load_spike_waveform=True): """ Read in a segment. Arguments: load_spike_waveform : load or not waveform of spikes (default True) """ fid = open(self.filename, 'rb') global_header = HeaderReader(fid, GlobalHeader).read_f(offset=0) # metadatas seg = Segment() seg.rec_datetime = datetime.datetime(global_header['Year'], global_header['Month'], global_header['Day'], global_header['Hour'], global_header['Minute'], global_header['Second']) seg.file_origin = os.path.basename(self.filename) seg.annotate(plexon_version=global_header['Version']) for key, val in global_header.items(): seg.annotate(**{key: val}) if not cascade: fid.close() return seg ## Step 1 : read headers # dsp channels header = spikes and waveforms dspChannelHeaders = {} maxunit = 0 maxchan = 0 for _ in range(global_header['NumDSPChannels']): # channel is 1 based channelHeader = HeaderReader(fid, ChannelHeader).read_f(offset=None) channelHeader['Template'] = np.array(channelHeader['Template']).reshape((5,64)) channelHeader['Boxes'] = np.array(channelHeader['Boxes']).reshape((5,2,4)) dspChannelHeaders[channelHeader['Channel']] = channelHeader maxunit = max(channelHeader['NUnits'], maxunit) maxchan = max(channelHeader['Channel'], maxchan) # event channel header eventHeaders = {} for _ in range(global_header['NumEventChannels']): eventHeader = HeaderReader(fid, EventHeader).read_f(offset=None) eventHeaders[eventHeader['Channel']] = eventHeader # slow channel header = signal slowChannelHeaders = {} for _ in range(global_header['NumSlowChannels']): slowChannelHeader = HeaderReader(fid, SlowChannelHeader).read_f( offset=None) slowChannelHeaders[slowChannelHeader['Channel']] = \ slowChannelHeader ## Step 2 : a first loop for counting size # signal nb_samples = np.zeros(len(slowChannelHeaders), dtype='int64') sample_positions = np.zeros(len(slowChannelHeaders), dtype='int64') t_starts = np.zeros(len(slowChannelHeaders), dtype='float64') #spiketimes and waveform nb_spikes = np.zeros((maxchan + 1, maxunit + 1), dtype='i') wf_sizes = np.zeros((maxchan + 1, maxunit + 1, 2), dtype='i') # eventarrays nb_events = {} #maxstrsizeperchannel = { } for chan, h in eventHeaders.items(): nb_events[chan] = 0 #maxstrsizeperchannel[chan] = 0 start = fid.tell() while fid.tell() != -1: # read block header dataBlockHeader = HeaderReader(fid, DataBlockHeader).read_f( offset=None) if dataBlockHeader is None: break chan = dataBlockHeader['Channel'] unit = dataBlockHeader['Unit'] n1, n2 = dataBlockHeader['NumberOfWaveforms'], dataBlockHeader[ 'NumberOfWordsInWaveform'] time = (dataBlockHeader['UpperByteOf5ByteTimestamp'] * 2. ** 32 + dataBlockHeader['TimeStamp']) if dataBlockHeader['Type'] == 1: nb_spikes[chan, unit] += 1 wf_sizes[chan, unit, :] = [n1, n2] fid.seek(n1 * n2 * 2, 1) elif dataBlockHeader['Type'] == 4: #event nb_events[chan] += 1 elif dataBlockHeader['Type'] == 5: #continuous signal fid.seek(n2 * 2, 1) if n2 > 0: nb_samples[chan] += n2 if nb_samples[chan] == 0: t_starts[chan] = time ## Step 3: allocating memory and 2 loop for reading if not lazy if not lazy: # allocating mem for signal sigarrays = {} for chan, h in slowChannelHeaders.items(): sigarrays[chan] = np.zeros(nb_samples[chan]) # allocating mem for SpikeTrain stimearrays = np.zeros((maxchan + 1, maxunit + 1), dtype=object) swfarrays = np.zeros((maxchan + 1, maxunit + 1), dtype=object) for (chan, unit), _ in np.ndenumerate(nb_spikes): stimearrays[chan, unit] = np.zeros(nb_spikes[chan, unit], dtype='f') if load_spike_waveform: n1, n2 = wf_sizes[chan, unit, :] swfarrays[chan, unit] = np.zeros( (nb_spikes[chan, unit], n1, n2), dtype='f4') pos_spikes = np.zeros(nb_spikes.shape, dtype='i') # allocating mem for event eventpositions = {} evarrays = {} for chan, nb in nb_events.items(): evarrays[chan] = { 'times': np.zeros(nb, dtype='f'), 'labels': np.zeros(nb, dtype='S4') } eventpositions[chan]=0 fid.seek(start) while fid.tell() != -1: dataBlockHeader = HeaderReader(fid, DataBlockHeader).read_f( offset=None) if dataBlockHeader is None: break chan = dataBlockHeader['Channel'] n1, n2 = dataBlockHeader['NumberOfWaveforms'], dataBlockHeader[ 'NumberOfWordsInWaveform'] time = dataBlockHeader['UpperByteOf5ByteTimestamp'] * \ 2. ** 32 + dataBlockHeader['TimeStamp'] time /= global_header['ADFrequency'] if n2 < 0: break if dataBlockHeader['Type'] == 1: #spike unit = dataBlockHeader['Unit'] pos = pos_spikes[chan, unit] stimearrays[chan, unit][pos] = time if load_spike_waveform and n1 * n2 != 0: swfarrays[chan, unit][pos, :, :] = np.fromstring( fid.read(n1 * n2 * 2), dtype='i2' ).reshape(n1, n2).astype('f4') else: fid.seek(n1 * n2 * 2, 1) pos_spikes[chan, unit] += 1 elif dataBlockHeader['Type'] == 4: # event pos = eventpositions[chan] evarrays[chan]['times'][pos] = time evarrays[chan]['labels'][pos] = dataBlockHeader['Unit'] eventpositions[chan]+= 1 elif dataBlockHeader['Type'] == 5: #signal data = np.fromstring( fid.read(n2 * 2), dtype='i2').astype('f4') sigarrays[chan][sample_positions[chan]: sample_positions[chan]+data.size] = data sample_positions[chan] += data.size ## Step 4: create neo object for chan, h in eventHeaders.items(): if lazy: times = [] labels = None else: times = evarrays[chan]['times'] labels = evarrays[chan]['labels'] ea = Event( times*pq.s, labels=labels, channel_name=eventHeaders[chan]['Name'], channel_index=chan ) if lazy: ea.lazy_shape = nb_events[chan] seg.events.append(ea) for chan, h in slowChannelHeaders.items(): if lazy: signal = [] else: if global_header['Version'] == 100 or global_header[ 'Version'] == 101: gain = 5000. / ( 2048 * slowChannelHeaders[chan]['Gain'] * 1000.) elif global_header['Version'] == 102: gain = 5000. / (2048 * slowChannelHeaders[chan]['Gain'] * slowChannelHeaders[chan]['PreampGain']) elif global_header['Version'] >= 103: gain = global_header['SlowMaxMagnitudeMV'] / ( .5 * (2 ** global_header['BitsPerSpikeSample']) * slowChannelHeaders[chan]['Gain'] * slowChannelHeaders[chan]['PreampGain']) signal = sigarrays[chan] * gain anasig = AnalogSignal( signal, sampling_rate=float( slowChannelHeaders[chan]['ADFreq']) * pq.Hz, units='mV', t_start=t_starts[chan] * pq.s, channel_index=slowChannelHeaders[chan]['Channel'], channel_name=slowChannelHeaders[chan]['Name']) if lazy: anasig.lazy_shape = nb_samples[chan] seg.analogsignals.append(anasig) for (chan, unit), value in np.ndenumerate(nb_spikes): if nb_spikes[chan, unit] == 0: continue if lazy: times = [] waveforms = None t_stop = 0 else: times = stimearrays[chan, unit] t_stop = times.max() if load_spike_waveform: if global_header['Version'] < 103: gain = 3000. / ( 2048 * dspChannelHeaders[chan]['Gain'] * 1000.) elif global_header['Version'] >= 103 and global_header[ 'Version'] < 105: gain = global_header['SpikeMaxMagnitudeMV'] / ( .5 * 2. ** (global_header['BitsPerSpikeSample']) * dspChannelHeaders[chan]['Gain'] * 1000.) elif global_header['Version'] > 105: gain = global_header['SpikeMaxMagnitudeMV'] / ( .5 * 2. ** (global_header['BitsPerSpikeSample']) * dspChannelHeaders[chan]['Gain'] * global_header['SpikePreAmpGain']) waveforms = swfarrays[chan, unit] * gain * pq.mV else: waveforms = None if global_header['WaveformFreq']>0: wf_sampling_rate= float(global_header['WaveformFreq']) * pq.Hz else: wf_sampling_rate = float(global_header['ADFrequency']) * pq.Hz sptr = SpikeTrain( times, units='s', t_stop=t_stop*pq.s, waveforms=waveforms, sampling_rate=wf_sampling_rate, ) sptr.annotate(unit_name = dspChannelHeaders[chan]['Name']) sptr.annotate(channel_index = chan) for key, val in dspChannelHeaders[chan].items(): sptr.annotate(**{key: val}) if lazy: sptr.lazy_shape = nb_spikes[chan, unit] seg.spiketrains.append(sptr) seg.create_many_to_one_relationship() fid.close() return seg GlobalHeader = [ ('MagicNumber', 'I'), ('Version', 'i'), ('Comment', '128s'), ('ADFrequency', 'i'), #this rate is only for events ('NumDSPChannels', 'i'), ('NumEventChannels', 'i'), ('NumSlowChannels', 'i'), ('NumPointsWave', 'i'), ('NumPointsPreThr', 'i'), ('Year', 'i'), ('Month', 'i'), ('Day', 'i'), ('Hour', 'i'), ('Minute', 'i'), ('Second', 'i'), ('FastRead', 'i'), ('WaveformFreq', 'i'), ('LastTimestamp', 'd'), # version >103 ('Trodalness', 'b'), ('DataTrodalness', 'b'), ('BitsPerSpikeSample', 'b'), ('BitsPerSlowSample', 'b'), ('SpikeMaxMagnitudeMV', 'H'), ('SlowMaxMagnitudeMV', 'H'), #version 105 ('SpikePreAmpGain', 'H'), #version 106 ('AcquiringSoftware', '18s'), ('ProcessingSoftware', '18s'), ('Padding', '10s'), # all version ('TSCounts', '650i'), ('WFCounts', '650i'), ('EVCounts', '512i'), ] ChannelHeader = [ ('Name', '32s'), ('SIGName', '32s'), ('Channel', 'i'), ('WFRate', 'i'), #this is not the waveform sampling rate!!! ('SIG', 'i'), ('Ref', 'i'), ('Gain', 'i'), ('Filter', 'i'), ('Threshold', 'i'), ('Method', 'i'), ('NUnits', 'i'), ('Template', '320h'), ('Fit', '5i'), ('SortWidth', 'i'), ('Boxes', '40h'), ('SortBeg', 'i'), # version 105 ('Comment', '128s'), #version 106 ('SrcId', 'b'), ('reserved', 'b'), ('ChanId', 'H'), ('Padding', '10i'), ] EventHeader = [ ('Name', '32s'), ('Channel', 'i'), # version 105 ('Comment', '128s'), #version 106 ('SrcId', 'b'), ('reserved', 'b'), ('ChanId', 'H'), ('Padding', '32i'), ] SlowChannelHeader = [ ('Name', '32s'), ('Channel', 'i'), ('ADFreq', 'i'), ('Gain', 'i'), ('Enabled', 'i'), ('PreampGain', 'i'), # version 104 ('SpikeChannel', 'i'), #version 105 ('Comment', '128s'), #version 106 ('SrcId', 'b'), ('reserved', 'b'), ('ChanId', 'H'), ('Padding', '27i'), ] DataBlockHeader = [ ('Type', 'h'), ('UpperByteOf5ByteTimestamp', 'h'), ('TimeStamp', 'i'), ('Channel', 'h'), ('Unit', 'h'), ('NumberOfWaveforms', 'h'), ('NumberOfWordsInWaveform', 'h'), ] # 16 bytes class HeaderReader(): def __init__(self, fid, description): self.fid = fid self.description = description def read_f(self, offset=None): if offset is not None: self.fid.seek(offset) d = {} for key, fmt in self.description: buf = self.fid.read(struct.calcsize(fmt)) if len(buf) != struct.calcsize(fmt): return None val = list(struct.unpack(fmt, buf)) for i, ival in enumerate(val): if hasattr(ival, 'replace'): ival = ival.replace(b'\x03', b'') ival = ival.replace(b'\x00', b'') val[i] = ival.decode("utf-8") if len(val) == 1: val = val[0] d[key] = val return d