# -*- coding: utf-8 -*- """ Class for reading data from Alpha Omega .map files. This class is an experimental reader with important limitations. See the source code for details of the limitations. The code of this reader is of alpha quality and received very limited testing. This code is written from the incomplete file specifications available in: [1] AlphaMap Data Acquisition System User's Manual Version 10.1.1 Section 5 APPENDIX B: ALPHAMAP FILE STRUCTURE, pages 120-140 Edited by ALPHA OMEGA Home Office: P.O. Box 810, Nazareth Illit 17105, Israel http://www.alphaomega-eng.com/ and from the source code of a C software for conversion of .map files to .eeg elan software files : [2] alphamap2eeg 1.0, 12/03/03, Anne CHEYLUS - CNRS ISC UMR 5015 Supported : Read @author : sgarcia, Florent Jaillet """ # NOTE: For some specific types of comments, the following convention is used: # "TODO:" Desirable future evolution # "WARNING:" Information about code that is based on broken or missing # specifications and that might be wrong # Main limitations of this reader: # - The reader is only able to load data stored in data blocks of type 5 # (data block for one channel). In particular it means that it doesn't # support signals stored in blocks of type 7 (data block for multiple # channels). # For more details on these data blocks types, see 5.4.1 and 5.4.2 p 127 in # [1]. # - Rather than supporting all the neo objects types that could be extracted # from the file, all read data are returned in AnalogSignal objects, even for # digital channels or channels containing spiking informations. # - Digital channels are not converted to events or events array as they # should. # - Loading multichannel signals as AnalogSignalArrays is not supported. # - Many data or metadata that are avalaible in the file and that could be # represented in some way in the neo model are not extracted. In particular # scaling of the data and extraction of the units of the signals are not # supported. # - It received very limited testing, exlusively using python 2.6.6. In # particular it has not been tested using Python 3.x. # # These limitations are mainly due to the following reasons: # - Incomplete, unclear and in some places innacurate specifications of the # format in [1]. # - Lack of test files containing all the types of data blocks of interest # (in particular no file with type 7 data block for multiple channels where # available when writing this code). # - Lack of knowledge of the Alphamap software and the associated data models. # - Lack of time (especially as the specifications are incomplete, a lot of # reverse engineering and testing is required, which makes the development of # this IO very painful and long). # needed for python 3 compatibility from __future__ import absolute_import, division # specific imports import datetime import os import struct # file no longer exists in Python3 try: file except NameError: import io file = io.BufferedReader # note neo.core need only numpy and quantities import numpy as np import quantities as pq from neo.io.baseio import BaseIO from neo.core import Block, Segment, AnalogSignal class AlphaOmegaIO(BaseIO): """ Class for reading data from Alpha Omega .map files (experimental) This class is an experimental reader with important limitations. See the source code for details of the limitations. The code of this reader is of alpha quality and received very limited testing. Usage: >>> from neo import io >>> r = io.AlphaOmegaIO( filename = 'File_AlphaOmega_1.map') >>> blck = r.read_block(lazy = False, cascade = True) >>> print blck.segments[0].analogsignals """ is_readable = True # This is a reading only class is_writable = False # writing is not supported # This class is able to directly or indirectly read the following kind of # objects supported_objects = [ Block, Segment , AnalogSignal] # TODO: Add support for other objects that should be extractable from .map # files (Event, Epoch?, Epoch Array?, SpikeTrain?) # This class can only return a Block readable_objects = [ Block ] # TODO : create readers for different type of objects (Segment, # AnalogSignal,...) # This class is not able to write objects writeable_objects = [ ] # This is for GUI stuff : a definition for parameters when reading. read_params = { Block : [ ] } # Writing is not supported, so no GUI stuff write_params = None name = 'AlphaOmega' extensions = [ 'map' ] mode = 'file' def __init__(self , filename = None) : """ Arguments: filename : the .map Alpha Omega file name """ BaseIO.__init__(self) self.filename = filename # write is not supported so I do not overload write method from BaseIO def read_block(self, # the 2 first keyword arguments are imposed by neo.io API lazy = False, cascade = True): """ Return a Block. """ def count_samples(m_length): """ Count the number of signal samples available in a type 5 data block of length m_length """ # for information about type 5 data block, see [1] count = int((m_length-6)/2-2) # -6 corresponds to the header of block 5, and the -2 take into # account the fact that last 2 values are not available as the 4 # corresponding bytes are coding the time stamp of the beginning # of the block return count # create the neo Block that will be returned at the end blck = Block(file_origin = os.path.basename(self.filename)) blck.file_origin = os.path.basename(self.filename) fid = open(self.filename, 'rb') # NOTE: in the following, the word "block" is used in the sense used in # the alpha-omega specifications (ie a data chunk in the file), rather # than in the sense of the usual Block object in neo # step 1: read the headers of all the data blocks to load the file # structure pos_block = 0 # position of the current block in the file file_blocks = [] # list of data blocks available in the file if not cascade: # we read only the main header m_length, m_TypeBlock = struct.unpack('Hcx' , fid.read(4)) # m_TypeBlock should be 'h', as we read the first block block = HeaderReader(fid, dict_header_type.get(m_TypeBlock, Type_Unknown)).read_f() block.update({'m_length': m_length, 'm_TypeBlock': m_TypeBlock, 'pos': pos_block}) file_blocks.append(block) else: # cascade == True seg = Segment(file_origin = os.path.basename(self.filename)) seg.file_origin = os.path.basename(self.filename) blck.segments.append(seg) while True: first_4_bytes = fid.read(4) if len(first_4_bytes) < 4: # we have reached the end of the file break else: m_length, m_TypeBlock = struct.unpack('Hcx', first_4_bytes) block = HeaderReader(fid, dict_header_type.get(m_TypeBlock, Type_Unknown)).read_f() block.update({'m_length': m_length, 'm_TypeBlock': m_TypeBlock, 'pos': pos_block}) if m_TypeBlock == '2': # The beginning of the block of type '2' is identical for # all types of channels, but the following part depends on # the type of channel. So we need a special case here. # WARNING: How to check the type of channel is not # described in the documentation. So here I use what is # proposed in the C code [2]. # According to this C code, it seems that the 'm_isAnalog' # is used to distinguished analog and digital channels, and # 'm_Mode' encodes the type of analog channel: # 0 for continuous, 1 for level, 2 for external trigger. # But in some files, I found channels that seemed to be # continuous channels with 'm_Modes' = 128 or 192. So I # decided to consider every channel with 'm_Modes' # different from 1 or 2 as continuous. I also couldn't # check that values of 1 and 2 are really for level and # external trigger as I had no test files containing data # of this types. type_subblock = 'unknown_channel_type(m_Mode=' \ + str(block['m_Mode'])+ ')' description = Type2_SubBlockUnknownChannels block.update({'m_Name': 'unknown_name'}) if block['m_isAnalog'] == 0: # digital channel type_subblock = 'digital' description = Type2_SubBlockDigitalChannels elif block['m_isAnalog'] == 1: # analog channel if block['m_Mode'] == 1: # level channel type_subblock = 'level' description = Type2_SubBlockLevelChannels elif block['m_Mode'] == 2: # external trigger channel type_subblock = 'external_trigger' description = Type2_SubBlockExtTriggerChannels else: # continuous channel type_subblock = 'continuous(Mode' \ + str(block['m_Mode']) +')' description = Type2_SubBlockContinuousChannels subblock = HeaderReader(fid, description).read_f() block.update(subblock) block.update({'type_subblock': type_subblock}) file_blocks.append(block) pos_block += m_length fid.seek(pos_block) # step 2: find the available channels list_chan = [] # list containing indexes of channel blocks for ind_block, block in enumerate(file_blocks): if block['m_TypeBlock'] == '2': list_chan.append(ind_block) # step 3: find blocks containing data for the available channels list_data = [] # list of lists of indexes of data blocks # corresponding to each channel for ind_chan, chan in enumerate(list_chan): list_data.append([]) num_chan = file_blocks[chan]['m_numChannel'] for ind_block, block in enumerate(file_blocks): if block['m_TypeBlock'] == '5': if block['m_numChannel'] == num_chan: list_data[ind_chan].append(ind_block) # step 4: compute the length (number of samples) of the channels chan_len = np.zeros(len(list_data), dtype = np.int) for ind_chan, list_blocks in enumerate(list_data): for ind_block in list_blocks: chan_len[ind_chan] += count_samples( file_blocks[ind_block]['m_length']) # step 5: find channels for which data are available ind_valid_chan = np.nonzero(chan_len)[0] # step 6: load the data # TODO give the possibility to load data as AnalogSignalArrays for ind_chan in ind_valid_chan: list_blocks = list_data[ind_chan] ind = 0 # index in the data vector # read time stamp for the beginning of the signal form = '