123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- # -*- coding: utf-8 -*-
- """
- Class for reading data from BrainVision product.
- This code was originally made by L. Pezard (2010), modified B. Burle and
- S. More.
- Supported : Read
- Author: sgarcia
- """
- import os
- import re
- import numpy as np
- import quantities as pq
- from neo.io.baseio import BaseIO
- from neo.core import Segment, AnalogSignal, Event
- class BrainVisionIO(BaseIO):
- """
- Class for reading/writing data from BrainVision products (brainAmp,
- brain analyser...)
- Usage:
- >>> from neo import io
- >>> r = io.BrainVisionIO( filename = 'File_brainvision_1.eeg')
- >>> seg = r.read_segment(lazy = False, cascade = True,)
- """
- is_readable = True
- is_writable = False
- supported_objects = [Segment, AnalogSignal, Event]
- readable_objects = [Segment]
- writeable_objects = []
- has_header = False
- is_streameable = False
- read_params = {Segment: []}
- write_params = {Segment: []}
- name = None
- extensions = ['vhdr']
- mode = 'file'
- def __init__(self, filename=None):
- """
- This class read/write a elan based file.
- **Arguments**
- filename : the filename to read or write
- """
- BaseIO.__init__(self)
- self.filename = filename
- def read_segment(self, lazy=False, cascade=True):
- # # Read header file (vhdr)
- header = read_brain_soup(self.filename)
- assert header['Common Infos'][
- 'DataFormat'] == 'BINARY', NotImplementedError
- assert header['Common Infos'][
- 'DataOrientation'] == 'MULTIPLEXED', NotImplementedError
- nb_channel = int(header['Common Infos']['NumberOfChannels'])
- sampling_rate = 1.e6 / float(
- header['Common Infos']['SamplingInterval']) * pq.Hz
- fmt = header['Binary Infos']['BinaryFormat']
- fmts = { 'INT_16':np.int16, 'INT_32':np.int32, 'IEEE_FLOAT_32':np.float32,}
- assert fmt in fmts, NotImplementedError
- dt = fmts[fmt]
- seg = Segment(file_origin=os.path.basename(self.filename))
- if not cascade:
- return seg
- # read binary
- if not lazy:
- binary_file = os.path.splitext(self.filename)[0] + '.eeg'
- sigs = np.memmap(binary_file, dt, 'r', ).astype('f')
- n = int(sigs.size / nb_channel)
- sigs = sigs[:n * nb_channel]
- sigs = sigs.reshape(n, nb_channel)
- for c in range(nb_channel):
- name, ref, res, units = header['Channel Infos'][
- 'Ch%d' % (c + 1,)].split(',')
- units = pq.Quantity(1, units.replace('µ', 'u'))
- if lazy:
- signal = [] * units
- else:
- signal = sigs[:,c]*units
- if dt == np.int16 or dt == np.int32:
- signal *= np.float(res)
- anasig = AnalogSignal(signal = signal,
- channel_index = c,
- name = name,
- sampling_rate = sampling_rate,
- )
- if lazy:
- anasig.lazy_shape = -1
- seg.analogsignals.append(anasig)
- # read marker
- marker_file = os.path.splitext(self.filename)[0] + '.vmrk'
- all_info = read_brain_soup(marker_file)['Marker Infos']
- all_types = []
- times = []
- labels = []
- for i in range(len(all_info)):
- type_, label, pos, size, channel = all_info[
- 'Mk%d' % (i + 1,)].split(',')[:5]
- all_types.append(type_)
- times.append(float(pos) / sampling_rate.magnitude)
- labels.append(label)
- all_types = np.array(all_types)
- times = np.array(times) * pq.s
- labels = np.array(labels, dtype='S')
- for type_ in np.unique(all_types):
- ind = type_ == all_types
- if lazy:
- ea = Event(name=str(type_))
- ea.lazy_shape = -1
- else:
- ea = Event(
- times=times[ind], labels=labels[ind], name=str(type_))
- seg.events.append(ea)
- seg.create_many_to_one_relationship()
- return seg
- def read_brain_soup(filename):
- section = None
- all_info = {}
- for line in open(filename, 'rU'):
- line = line.strip('\n').strip('\r')
- if line.startswith('['):
- section = re.findall('\[([\S ]+)\]', line)[0]
- all_info[section] = {}
- continue
- if line.startswith(';'):
- continue
- if '=' in line and len(line.split('=')) == 2:
- k, v = line.split('=')
- all_info[section][k] = v
- return all_info
|