123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- # -*- coding: utf-8 -*-
- """Some classes and functions to load and save MultiViewVideo files (*.idlmov)
- Client code only needs class StimulusSet.
- Save a list of pictures as stimulus set file:
- >>> stim_set = StimulusSet(pics=pics)
- >>> stim_set.save(file_path)
- Load a stimulus file:
- >>> stimset = StimulusSet(file_path)
- """
- import objsimpy.element_tree_object as eto
- import struct
- import numpy as np
- import array
- import logging
- import copy
- from objsimpy.xmltodict import unparse as xml_from_dict
- def remove_null_character(s):
- end = s.find('\x00')
- if end == -1:
- return ''
- else:
- return s[0:end]
-
- class StimMetaData:
- def __init__(self, file_path):
- self.eto = eto.EtObject(file_path=file_path)
- self.movie_file_name = self.eto.StimParas[0].MovieFileName[0].value
- self.ny_paras = int(self.eto.StimParas[0].NYParas[0].value)
- self.nx_paras = int(self.eto.StimParas[0].NXParas[0].value)
-
- def write_meta_xml(meta_dict, fname_meta):
- """Write stimulus meta data xml file.
-
- Args:
- meta_dict (dict): stimulus meta data in a dictionary
- fname_meta (string): name of xml file
- """
- meta_copy = copy.deepcopy(meta_dict)
- make_content_value_recursive(meta_copy)
- with open(fname_meta, 'w') as fobj_meta:
- fobj_meta.write(xml_from_dict(meta_copy, pretty=True))
-
- def make_content_value_recursive(meta_dict):
- keys = meta_dict.keys()
- for key in keys:
- if isinstance(meta_dict[key], dict):
- make_content_value_recursive(meta_dict[key])
- else:
- meta_dict[key] = {'@value': str(meta_dict[key])}
-
- class StimFileHeader:
- def __init__(self, file_obj=None, info="MindVideo", version="0.05", width=1, height=1, nfilters=1):
- if file_obj is not None:
- self.from_file(file_obj)
- else:
- self.info = info
- self.version = version
- self.width = width
- self.height = height
- self.nfilters = nfilters
-
-
- def from_file(self, file_obj):
- (info,) = struct.unpack('12s', file_obj.read(12))
- self.info = remove_null_character(info)
- (version,) = struct.unpack('8s', file_obj.read(8))
- self.version = remove_null_character(version)
- (self.width, self.height, self.nfilters) = struct.unpack('iii', file_obj.read(12))
-
- def save(self, file_obj):
- file_obj.write(struct.pack('12s', self.info))
- file_obj.write(struct.pack('8s', self.version))
- file_obj.write(struct.pack('iii', self.width, self.height, self.nfilters))
-
- def __unicode__(self):
- attrs = ["info", "version", "width", "height", "nfilters"]
- ret = u"<StimFileHeader"
- for a in attrs:
- ret += " " + a + "=" + str(getattr(self, a))
- ret += "/>"
- return ret
- class Filter:
- def __init__(self, open_file=None, pic=None, fwidth=2, fheight=2, wshift=0, hshift=0,
- output_width=5, output_height=5, output_size=25):
- if open_file is not None:
- self.from_file(open_file)
- else:
- if pic is None:
- pic = np.zeros((fwidth, fheight))
- self.set_filter_dimensions_from_pic(pic)
-
- self.wshift = wshift
- self.hshift = hshift
- self.output_width = output_width
- self.output_height = output_height
-
- def set_filter_dimensions_from_pic(self, pic):
- self.pic = pic
- self.fheight = pic.shape[0]
- self.fwidth = pic.shape[1]
-
- def from_file(self, file_obj):
- n_integers = 6
- data = struct.unpack('i'*n_integers, file_obj.read(4*n_integers))
- self.fwidth, self.fheight = data[:2]
- self.fsize = self.fwidth * self.fheight
- self.wshift, self.hshift = data[2:4]
- self.output_width, self.output_height = data[4:6]
- self.output_size = self.output_width * self.output_height
- self.pic = struct.unpack('f'*self.fsize, file_obj.read(4*self.fsize))
-
- def save(self, file_obj):
- data = [self.fwidth, self.fheight, self.wshift, self.hshift, self.output_width, self.output_height]
- int_array = array.array('i', data)
- int_array.tofile(file_obj)
- pic_float_array = array.array('f', self.pic.flat)
- pic_float_array.tofile(file_obj)
-
- def __repr__(self):
- attrs = ["fwidth", "fheight", "wshift", "hshift", "output_width", "output_height", "output_size"]
- ret = "<Filter"
- for a in attrs:
- ret += " " + a + "=" + str(getattr(self, a))
- ret += "/>"
- return ret
- class StimulusSet:
-
- """Represents a set of images used as stimuli for neural network.
-
- A single stimulus is a frame, consisting of an image for every filter.
- Currently only stimulus sets with single filter are supported.
-
- Args:
- file_path (str, optional): if given, the stimulus set will be loaded from there
- pics (list of ndarray, optional): List of 2d numpy arrays representing stimulus pictures
-
- Attributes:
- pics (list of ndarray): list of images (2d numpy arrays)
- """
-
- def __init__(self, file_path=None, pics=None):
- """Initializes an instance of StimulusSet.
-
- Args:
- file_path (str, optional): if given, the stimulus set will be loaded from there
- pics (list of ndarray, optional): List of 2d numpy arrays representing stimulus pictures
- """
-
- self.header = None
- self.filters = None
- if pics is None:
- self.pics = []
- else:
- self.pics = pics
- if file_path is not None:
- self.pics = self.from_file(file_path)
-
- def add_frame(self, pics=None):
- if pics is None:
- pics = []
- if len(pics) > 1:
- raise NotImplementedError
- self.pics.append(pics[0])
-
- def get_output_width(self, filter_num=0):
- if self.filters is not None:
- return self.filters[filter_num].output_width
- else:
- return 0
- def get_output_height(self, filter_num=0):
- if self.filters is not None:
- return self.filters[filter_num].output_height
- else:
- return 0
-
- def from_file(self, file_path):
- """Loads a stimulus set from file_path"""
-
- file_obj = open(file_path, "rb")
- self.header = StimFileHeader(file_obj)
- logging.debug(unicode(self.header).encode('utf8'))
- self.filters = []
- for i in range(self.header.nfilters):
- f = Filter(file_obj)
- self.filters.append(f)
- logging.debug(f)
-
- if self.header.nfilters > 1:
- raise NotImplementedError
-
- img_filter = self.filters[0]
- n = img_filter.output_size
- pics = []
- while True:
- n_bytes = 4 * n
- data = file_obj.read(n_bytes)
- if len(data) < n_bytes:
- break
- pic = struct.unpack('f'*n, data)
- pic = np.array(pic)
- pic = pic.reshape((img_filter.output_height, img_filter.output_width))
- pics.append(pic)
- file_obj.close()
-
- return pics
-
- def save(self, file_path):
- """Saves the stimulus set to file_path."""
-
- file_obj = open(file_path, "wb")
- self.save_header(file_obj)
- self.save_filters(file_obj)
- self.save_frames(file_obj)
- file_obj.close()
-
- # only with 1 filter implemented
- def save_header(self, file_obj):
- height, width = self.pics[0].shape
- if self.filters is None:
- self.filters = [Filter(output_width=width, output_height=height),]
- nfilters = len(self.filters)
- if nfilters > 1:
- raise NotImplementedError
- header = StimFileHeader(info="MindVideo", version="0.05", width=width, height=height, nfilters=nfilters)
- header.save(file_obj)
-
- def save_filters(self, file_obj):
- for f in self.filters:
- f.save(file_obj)
-
- def save_frames(self, file_obj):
- nfilters = len(self.filters)
- if nfilters > 1:
- raise NotImplementedError
- for pic in self.pics:
- pic_float_array = array.array('f', pic.flat)
- pic_float_array.tofile(file_obj)
-
-
-
- def load_stim_movie():
- pass
|