123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- # -*- coding: utf-8 -*-
- """
- Classe for reading data from WinEdr, a software tool written by
- John Dempster.
- WinEdr is free:
- http://spider.science.strath.ac.uk/sipbs/software.htm
- Depend on:
- Supported : Read
- Author: sgarcia
- """
- import os
- import struct
- import sys
- import numpy as np
- import quantities as pq
- from neo.io.baseio import BaseIO
- from neo.core import Segment, AnalogSignal
- PY3K = (sys.version_info[0] == 3)
- class WinEdrIO(BaseIO):
- """
- Class for reading data from WinEDR.
- Usage:
- >>> from neo import io
- >>> r = io.WinEdrIO(filename='File_WinEDR_1.EDR')
- >>> seg = r.read_segment(lazy=False, cascade=True,)
- >>> print seg.analogsignals
- [<AnalogSignal(array([ 89.21203613, 88.83666992, 87.21008301, ..., 64.56298828,
- 67.94128418, 68.44177246], dtype=float32) * pA, [0.0 s, 101.5808 s], sampling rate: 10000.0 Hz)>]
- """
- is_readable = True
- is_writable = False
- supported_objects = [ Segment , AnalogSignal ]
- readable_objects = [Segment]
- writeable_objects = []
- has_header = False
- is_streameable = False
- read_params = { Segment : [ ], }
- write_params = None
- name = 'WinEDR'
- extensions = [ 'EDR' ]
- mode = 'file'
- def __init__(self , filename = None) :
- """
- This class read a WinEDR file.
- Arguments:
- filename : the filename
- """
- BaseIO.__init__(self)
- self.filename = filename
- def read_segment(self , lazy = False, cascade = True):
- seg = Segment(
- file_origin = os.path.basename(self.filename),
- )
- if not cascade:
- return seg
- fid = open(self.filename , 'rb')
- headertext = fid.read(2048)
- if PY3K:
- headertext = headertext.decode('ascii')
- header = {}
- for line in headertext.split('\r\n'):
- if '=' not in line : continue
- #print '#' , line , '#'
- key,val = line.split('=')
- if key in ['NC', 'NR','NBH','NBA','NBD','ADCMAX','NP','NZ','ADCMAX' ] :
- val = int(val)
- elif key in ['AD', 'DT', ] :
- val = val.replace(',','.')
- val = float(val)
- header[key] = val
- if not lazy:
- data = np.memmap(self.filename , np.dtype('i2') , 'r',
- #shape = (header['NC'], header['NP']) ,
- shape = (header['NP']//header['NC'],header['NC'], ) ,
- offset = header['NBH'])
- for c in range(header['NC']):
- YCF = float(header['YCF%d'%c].replace(',','.'))
- YAG = float(header['YAG%d'%c].replace(',','.'))
- YZ = float(header['YZ%d'%c].replace(',','.'))
- ADCMAX = header['ADCMAX']
- AD = header['AD']
- DT = header['DT']
- if 'TU' in header:
- if header['TU'] == 'ms':
- DT *= .001
- unit = header['YU%d'%c]
- try :
- unit = pq.Quantity(1., unit)
- except:
- unit = pq.Quantity(1., '')
- if lazy:
- signal = [ ] * unit
- else:
- chan = int(header['YO%d'%c])
- signal = (data[:,chan].astype('float32')-YZ) *AD/( YCF*YAG*(ADCMAX+1)) * unit
- ana = AnalogSignal(signal,
- sampling_rate=pq.Hz / DT,
- t_start=0. * pq.s,
- name=header['YN%d' % c],
- channel_index=c)
- if lazy:
- ana.lazy_shape = header['NP']/header['NC']
- seg.analogsignals.append(ana)
- seg.create_many_to_one_relationship()
- fid.close()
- return seg
- AnalysisDescription = [
- ('RecordStatus','8s'),
- ('RecordType','4s'),
- ('GroupNumber','f'),
- ('TimeRecorded','f'),
- ('SamplingInterval','f'),
- ('VMax','8f'),
- ]
- class HeaderReader():
- def __init__(self,fid ,description ):
- self.fid = fid
- self.description = description
- def read_f(self, offset =0):
- self.fid.seek(offset)
- d = { }
- for key, fmt in self.description :
- val = struct.unpack(fmt , self.fid.read(struct.calcsize(fmt)))
- if len(val) == 1:
- val = val[0]
- else :
- val = list(val)
- d[key] = val
- return d
|