# -*- coding: utf-8 -*- ''' Created on 31.07.2012 @author: frank ''' import struct from array import array import numpy import logging def fwrite(filename, formatstring, ndarray): """ from http://stackoverflow.com/questions/10637376/import-error-no-module-named-numpyio because numpyio was removed in SciPy 0.9 """ arr = array.array(formatstring, ndarray.flatten()) f = open(filename, 'w') arr.tofile(f) f.close() class NoChunkNameFound(Exception): pass class NoMoreChunksFound(Exception): pass class UnexpectedEndOfChunkFile(Exception): pass class ChunkNotFound(Exception): pass class WrongElementSize(Exception): pass def write_name_string(file_obj, s): file_obj.write(struct.pack('B', len(s))) file_obj.write(s.encode('ascii')) class FileFormat(object): def __init__(self, name, major, minor): self.name = name self.major = major self.minor = minor def write(self, file_obj): write_name_string(file_obj, self.name) file_obj.write(struct.pack('ii', self.major, self.minor)) class ChunkHeader(object): def __init__(self, name, chunk_size, element_size, n_elements): self.name = name self.chunk_size = chunk_size self.element_size = element_size self.n_elements = n_elements def get_data_size(self): return self.element_size * self.n_elements def get_header_size(self): return 1 + len(self.name) + 3*8 def get_total_chunksize(self): return self.get_header_size() + self.get_data_size() def get_data_pos(self, chunk_pos): return chunk_pos + self.get_header_size() def write(self, file_obj): write_name_string(file_obj, self.name) file_obj.write(struct.pack('QQQ', self.chunk_size, self.element_size, self.n_elements)) datatype_size_map = {'i': 4, 'd': 8, 'f':4} class ChunkFile(object): ''' classdocs ''' def __init__(self, file_path=None): ''' Constructor ''' self.file_path = file_path self.file = None self.chunk_names = [] self.chunk_positions = {} self.version = (0,0) if self.file_path is not None: self.open(file_path) def open(self, file_path): self.file = open(file_path, "rb") if self.file is None: raise Exception("konnte Datei nicht oeffnen") self.read_header(self.file) self.chunk_positions = self.read_chunk_headers(self.file) def close(self): if self.file is not None: self.file.close() @staticmethod def read_chunk_headers(file_obj): chunk_positions = {} try: while True: pos = file_obj.tell() chunk_header = ChunkFile.read_chunk_header(file_obj) chunk_positions[chunk_header.name] = (pos, chunk_header) file_obj.seek(pos + chunk_header.get_total_chunksize()) except NoMoreChunksFound: pass return chunk_positions def read_header(self, file_obj): self.format_name = self.read_name_string(file_obj) self.minor, self.major = self.read_version(file_obj) def get_format(self): ''' get file format string ''' return self.format_name def get_chunk_names(self): return self.chunk_positions.keys() def get_version(self): return self.minor, self.major def read_chunk(self, chunk_name): if not chunk_name in self.chunk_positions: raise ChunkNotFound(chunk_name) pos, header = self.chunk_positions[chunk_name] self.file.seek(pos + header.get_header_size()) data = self.file.read(header.get_data_size()) return data def read_array(self, chunk_name, datatype='i'): if not chunk_name in self.chunk_positions: raise ChunkNotFound(chunk_name) pos, header = self.chunk_positions[chunk_name] self.file.seek(pos + header.get_header_size()) if datatype_size_map[datatype] != header.element_size: raise WrongElementSize(str(header.element_size)) data = numpy.fromfile(self.file, dtype=datatype, count=header.n_elements) return data @staticmethod def read_name_string(file_obj): s = file_obj.read(1) if len(s) == 0: raise NoChunkNameFound (length,) = struct.unpack('b', s) s = file_obj.read(length) if len(s) == 0: raise UnexpectedEndOfChunkFile return s @staticmethod def read_version(file_obj): minor, major = struct.unpack('ii', file_obj.read(8)) logging.debug(str(minor) + " " + str(major)) return minor, major @staticmethod def read_chunk_header(file_obj): try: chunk_name = ChunkFile.read_name_string(file_obj) logging.debug("chunk name = " + chunk_name) except NoChunkNameFound: raise NoMoreChunksFound s = file_obj.read(3*8) chunk_size, element_size, n_elements = struct.unpack('QQQ', s) logging.debug(str(chunk_size) + " " + str(element_size) + " " + str(n_elements)) return ChunkHeader(chunk_name, chunk_size, element_size, n_elements) class ChunkFileWriter(object): def __init__(self, file_path, file_format): self.file_path = file_path self.myfile = open(file_path, 'wb') self.file_format = file_format self.file_format.write(self.myfile) def write_byte_chunk(self, name, data): data_size = len(data) chunk_size = data_size + 2*8 # 2*sizeof(size_t) auf x64 System ch = ChunkHeader(name, chunk_size, data_size, 1) ch.write(self.myfile) self.myfile.write(struct.pack(str(data_size) + 's', data)) def write_int_array_chunk(self, name, int_array): element_size = datatype_size_map['i'] n_elements = len(int_array) chunk_size = n_elements*element_size + 2*8 ch = ChunkHeader(name, chunk_size, element_size, n_elements) ch.write(self.myfile) int_array.tofile(self.myfile, format='%i') def write_float_array_chunk(self, name, float_array): element_size = datatype_size_map['f'] n_elements = len(float_array) chunk_size = n_elements*element_size + 2*8 ch = ChunkHeader(name, chunk_size, element_size, n_elements) ch.write(self.myfile) float_array.tofile(self.myfile, format='%f') def close(self): self.myfile.close()