from .baseio import BaseIO from neo.core import ImageSequence, Segment, Block import numpy as np import struct import os import math import quantities as pq class BlkIO(BaseIO): """ Neo IO module for optical imaging data stored as BLK file *Usage*: >>> from neo import io >>> import quantities as pq >>> r = io.BlkIO("file_blk_1.BLK",units='V',sampling_rate=1.0*pq.Hz, ... spatial_scale=1.0*pq.Hz) >>> block = r.read_block() reading the header reading block returning block >>> block Block with 6 segments file_origin: 'file_blk_1.BLK' # segments (N=6) 0: Segment with 1 imagesequences description: 'stim nb:0' # analogsignals (N=0) 1: Segment with 1 imagesequences description: 'stim nb:1' # analogsignals (N=0) 2: Segment with 1 imagesequences description: 'stim nb:2' # analogsignals (N=0) 3: Segment with 1 imagesequences description: 'stim nb:3' # analogsignals (N=0) 4: Segment with 1 imagesequences description: 'stim nb:4' # analogsignals (N=0) 5: Segment with 1 imagesequences description: 'stim nb:5' # analogsignals (N=0) Many thanks to Thomas Deneux for the MATLAB code on which this was based. """ name = 'BLK IO' description = "Neo IO module for optical imaging data stored as BLK file" _prefered_signal_goup_mode = 'group-by-same-units' is_readable = True is_wirtable = False supported_objects = [Block, Segment, ImageSequence] readble_objects = supported_objects support_lazy = False read_params = {} write_params = {} extensions = [] mode = 'file' def __init__(self, file_name=None, units=None, sampling_rate=None, spatial_scale=None, **kwargs): BaseIO.__init__(self, file_name, **kwargs) self.units = units self.sampling_rate = sampling_rate self.spatial_scale = spatial_scale def read(self, lazy=False, **kwargs): """ Return all data from the file as a list of Blocks """ if lazy: raise ValueError('This IO module does not support lazy loading') return [self.read_block(lazy=lazy, units=self.units, sampling_rate=self.sampling_rate, spatial_scale=self.spatial_scale, **kwargs)] def read_block(self, lazy=False, **kargs): def read(name, type, nb, dictionary, file): if type == 'int32': # dictionary[name] = int.from_bytes(file.read(4), byteorder=sys.byteorder, signed=True) dictionary[name] = struct.unpack("i", file.read(4))[0] if type == 'float32': dictionary[name] = struct.unpack('f', file.read(4))[0] if type == 'uint8': l = [] for i in range(nb): l.append(chr(struct.unpack('B', file.read(1))[0])) dictionary[name] = l if type == 'uint16': l = [] for i in range(nb): l.append((struct.unpack('H', file.read(2)))[0]) dictionary[name] = l if type == 'short': dictionary[name] = struct.unpack('h', file.read(2))[0] return dictionary def read_header(file_name): file = open(file_name, "rb") i = [ ['file_size', 'int32', 1], ['checksum_header', 'int32', 1], ['check_data', 'int32', 1], ['lenheader', 'int32', 1], ['versionid', 'float32', 1], ['filetype', 'int32', 1], ['filesubtype', 'int32', 1], ['datatype', 'int32', 1], ['sizeof', 'int32', 1], ['framewidth', 'int32', 1], ['frameheight', 'int32', 1], ['nframesperstim', 'int32', 1], ['nstimuli', 'int32', 1], ['initialxbinfactor', 'int32', 1], ['initialybinfactor', 'int32', 1], ['xbinfactor', 'int32', 1], ['ybinfactor', 'int32', 1], ['username', 'uint8', 32], ['recordingdate', 'uint8', 16], ['x1roi', 'int32', 1], ['y1roi', 'int32', 1], ['x2roi', 'int32', 1], ['y2roi', 'int32', 1], ['stimoffs', 'int32', 1], ['stimsize', 'int32', 1], ['frameoffs', 'int32', 1], ['framesize', 'int32', 1], ['refoffs', 'int32', 1], ['refsize', 'int32', 1], ['refwidth', 'int32', 1], ['refheight', 'int32', 1], ['whichblocks', 'uint16', 16], ['whichframe', 'uint16', 16], ['loclip', 'int32', 1], ['hiclip', 'int32', 1], ['lopass', 'int32', 1], ['hipass', 'int32', 1], ['operationsperformed', 'uint8', 64], ['magnification', 'float32', 1], ['gain', 'uint16', 1], ['wavelength', 'uint16', 1], ['exposuretime', 'int32', 1], ['nrepetitions', 'int32', 1], ['acquisitiondelay', 'int32', 1], ['interstiminterval', 'int32', 1], ['creationdate', 'uint8', 16], ['datafilename', 'uint8', 64], ['orareserved', 'uint8', 256] ] dic = {} for x in i: dic = read(name=x[0], type=x[1], nb=x[2], dictionary=dic, file=file) if dic['filesubtype'] == 13: i = [ ["includesrefframe", "int32", 1], ["temp", "uint8", 128], ["ntrials", "int32", 1], ["scalefactors", "int32", 1], ["cameragain", "short", 1], ["ampgain", "short", 1], ["samplingrate", "short", 1], ["average", "short", 1], ["exposuretime", "short", 1], ["samplingaverage", "short", 1], ["presentaverage", "short", 1], ["framesperstim", "short", 1], ["trialsperblock", "short", 1], ["sizeofanalogbufferinframes", "short", 1], ["cameratrials", "short", 1], ["filler", "uint8", 106], ["dyedaqreserved", "uint8", 106] ] for x in i: dic = read(name=x[0], type=x[1], nb=x[2], dictionary=dic, file=file) # nottested # p.listofstimuli=temp(1:max(find(temp~=0)))'; % up to first non-zero stimulus dic["listofstimuli"] = dic["temp"][0:np.argwhere(x != 0).max(0)] else: i = [ ["includesrefframe", "int32", 1], ["listofstimuli", "uint8", 256], ["nvideoframesperdataframe", "int32", 1], ["ntrials", "int32", 1], ["scalefactor", "int32", 1], ["meanampgain", "float32", 1], ["meanampdc", "float32", 1], ["vdaqreserved", "uint8", 256] ] for x in i: dic = read(name=x[0], type=x[1], nb=x[2], dictionary=dic, file=file) i = [["user", "uint8", 256], ["comment", "uint8", 256], ["refscalefactor", "int32", 1]] for x in i: dic = read(name=x[0], type=x[1], nb=x[2], dictionary=dic, file=file) dic["actuallength"] = os.stat(file_name).st_size file.close() return dic # start of the reading process nblocks = 1 print("reading the header") header = read_header(self.filename) nstim = header['nstimuli'] ni = header['framewidth'] nj = header['frameheight'] nfr = header['nframesperstim'] lenh = header['lenheader'] framesize = header['framesize'] filesize = header['file_size'] dtype = header['datatype'] gain = header['meanampgain'] dc = header['meanampdc'] scalefactor = header['scalefactor'] # [["dtype","nbytes","datatype","type_out"],[...]] l = [ [11, 1, "uchar", "uint8", "B"], [12, 2, "ushort", "uint16", "H"], [13, 4, "ulong", "uint32", "I"], [14, 4, "float", "single", "f"] ] for i in l: if dtype == i[0]: nbytes, datatype, type_out, struct_type = i[1], i[2], i[3], i[4] if framesize != ni * nj * nbytes: print("BAD HEADER!!! framesize does not match framewidth*frameheight*nbytes!") framesize = ni * nj * nbytes if (filesize - lenh) > (framesize * nfr * nstim): nfr2 = nfr + 1 includesrefframe = True else: nfr2 = nfr includesrefframe = False nbin = nblocks conds = [i for i in range(1, nstim + 1)] ncond = len(conds) data = [[[np.zeros((ni, nj, nfr), type_out)] for x in range(ncond)] for i in range(nbin)] for k in range(1, nbin + 1): print("reading block") bin = np.arange(math.floor((k - 1 / nbin * nblocks) + 1), math.floor((k / nbin * nblocks) + 1)) sbin = bin.size for j in range(1, sbin + 1): file = open(self.filename, 'rb') for i in range(1, ncond + 1): framestart = conds[i - 1] * nfr2 - nfr offset = framestart * ni * nj * nbytes + lenh file.seek(offset, 0) a = [(struct.unpack(struct_type, file.read(nbytes)))[0] for m in range(ni * nj * nfr)] a = np.reshape(np.array(a, dtype=type_out, order='F'), (ni * nj, nfr), order='F') a = np.reshape(a, (ni, nj, nfr), order='F') if includesrefframe: # not tested framestart = (conds[i] - 1) * nfr2 offset = framestart * ni * nj * nbytes + lenh file.seek(offset) ref = [(struct.unpack(struct_type, file.read(nbytes)))[0] for m in range(ni * nj)] ref = np.array(ref, dtype=type_out) for y in range(len(ref)): ref[y] *= scalefactor ref = np.reshape(ref, (ni, nj)) b = np.tile(ref, [1, 1, nfr]) for y in range(len(a)): b.append([]) for x in range(len(a[y])): b[y + 1].append([]) for frame in range(len(a[y][x])): b[y + 1][x][frame] = (a[y][x][frame] / gain) - \ (scalefactor * dc / gain) a = b if sbin == 1: data[k - 1][i - 1] = a else: # not tested for y in range(len(a)): for x in range(len(a[y])): a[y][x] /= sbin data[k - 1][i - 1] = data[k - 1][i - 1] + a / sbin file.close() # data format [block][stim][width][height][frame]] # data structure should be [block][stim][frame][width][height] in order to be easy to use with neo # each file is a block # each stim could be a segment # then an image sequence [frame][width][height] # image need to be rotated # changing order of data for compatibility # [block][stim][width][height][frame]] # to # [block][stim][frame][width][height] for block in range(len(data)): for stim in range(len(data[block])): a = [] for frame in range(header['nframesperstim']): a.append([]) for width in range(len(data[block][stim])): a[frame].append([]) for height in range(len(data[block][stim][width])): a[frame][width].append(data[block][stim][width][height][frame]) # rotation of data to be the same as thomas deneux screenshot a[frame] = np.rot90(np.fliplr(a[frame])) data[block][stim] = a block = Block(file_origin=self.filename) for stim in range(len(data[0])): image_sequence = ImageSequence(data[0][stim], units=self.units, sampling_rate=self.sampling_rate, spatial_scale=self.spatial_scale) segment = Segment(file_origin=self.filename, description=("stim nb:"+str(stim))) segment.imagesequences = [image_sequence] segment.block = block for key in header: block.annotations[key] = header[key] block.segments.append(segment) print("returning block") return block