|
- # -*- coding: utf-8 -*-
- '''
- Created on 31.07.2012
- @author: frank
- '''
- import struct
- from array import array
- import numpy
- import logging
- def fwrite(filename, formatstring, ndarray):
- """
- from http://stackoverflow.com/questions/10637376/import-error-no-module-named-numpyio
- because numpyio was removed in SciPy 0.9
- """
- arr = array.array(formatstring, ndarray.flatten())
- f = open(filename, 'w')
- arr.tofile(f)
- f.close()
-
- class NoChunkNameFound(Exception):
- pass
- class NoMoreChunksFound(Exception):
- pass
- class UnexpectedEndOfChunkFile(Exception):
- pass
- class ChunkNotFound(Exception):
- pass
- class WrongElementSize(Exception):
- pass
- def write_name_string(file_obj, s):
- file_obj.write(struct.pack('B', len(s)))
- file_obj.write(s.encode('ascii'))
- class FileFormat(object):
- def __init__(self, name, major, minor):
- self.name = name
- self.major = major
- self.minor = minor
-
- def write(self, file_obj):
- write_name_string(file_obj, self.name)
- file_obj.write(struct.pack('ii', self.major, self.minor))
- class ChunkHeader(object):
- def __init__(self, name, chunk_size, element_size, n_elements):
- self.name = name
- self.chunk_size = chunk_size
- self.element_size = element_size
- self.n_elements = n_elements
-
- def get_data_size(self):
- return self.element_size * self.n_elements
-
- def get_header_size(self):
- return 1 + len(self.name) + 3*8
-
- def get_total_chunksize(self):
- return self.get_header_size() + self.get_data_size()
-
- def get_data_pos(self, chunk_pos):
- return chunk_pos + self.get_header_size()
-
- def write(self, file_obj):
- write_name_string(file_obj, self.name)
- file_obj.write(struct.pack('QQQ', self.chunk_size, self.element_size, self.n_elements))
-
- datatype_size_map = {'i': 4, 'd': 8, 'f':4}
- class ChunkFile(object):
- '''
- classdocs
- '''
- def __init__(self, file_path=None):
- '''
- Constructor
- '''
- self.file_path = file_path
- self.file = None
- self.chunk_names = []
- self.chunk_positions = {}
- self.version = (0,0)
- if self.file_path is not None:
- self.open(file_path)
-
- def open(self, file_path):
- self.file = open(file_path, "rb")
- if self.file is None:
- raise Exception("konnte Datei nicht oeffnen")
- self.read_header(self.file)
- self.chunk_positions = self.read_chunk_headers(self.file)
-
- def close(self):
- if self.file is not None:
- self.file.close()
-
- @staticmethod
- def read_chunk_headers(file_obj):
- chunk_positions = {}
- try:
- while True:
- pos = file_obj.tell()
- chunk_header = ChunkFile.read_chunk_header(file_obj)
- chunk_positions[chunk_header.name] = (pos, chunk_header)
- file_obj.seek(pos + chunk_header.get_total_chunksize())
- except NoMoreChunksFound:
- pass
- return chunk_positions
-
-
- def read_header(self, file_obj):
- self.format_name = self.read_name_string(file_obj)
- self.minor, self.major = self.read_version(file_obj)
-
- def get_format(self):
- '''
- get file format string
- '''
- return self.format_name
-
- def get_chunk_names(self):
- return self.chunk_positions.keys()
-
- def get_version(self):
- return self.minor, self.major
-
- def read_chunk(self, chunk_name):
- if not chunk_name in self.chunk_positions:
- raise ChunkNotFound(chunk_name)
-
- pos, header = self.chunk_positions[chunk_name]
- self.file.seek(pos + header.get_header_size())
- data = self.file.read(header.get_data_size())
- return data
-
- def read_array(self, chunk_name, datatype='i'):
- if not chunk_name in self.chunk_positions:
- raise ChunkNotFound(chunk_name)
-
- pos, header = self.chunk_positions[chunk_name]
- self.file.seek(pos + header.get_header_size())
- if datatype_size_map[datatype] != header.element_size:
- raise WrongElementSize(str(header.element_size))
- data = numpy.fromfile(self.file, dtype=datatype, count=header.n_elements)
- return data
-
- @staticmethod
- def read_name_string(file_obj):
- s = file_obj.read(1)
- if len(s) == 0:
- raise NoChunkNameFound
- (length,) = struct.unpack('b', s)
- s = file_obj.read(length)
- if len(s) == 0:
- raise UnexpectedEndOfChunkFile
- return s
- @staticmethod
- def read_version(file_obj):
- minor, major = struct.unpack('ii', file_obj.read(8))
- logging.debug(str(minor) + " " + str(major))
- return minor, major
-
- @staticmethod
- def read_chunk_header(file_obj):
- try:
- chunk_name = ChunkFile.read_name_string(file_obj)
- logging.debug("chunk name = " + chunk_name)
- except NoChunkNameFound:
- raise NoMoreChunksFound
- s = file_obj.read(3*8)
- chunk_size, element_size, n_elements = struct.unpack('QQQ', s)
- logging.debug(str(chunk_size) + " " + str(element_size) + " " + str(n_elements))
- return ChunkHeader(chunk_name, chunk_size, element_size, n_elements)
-
- class ChunkFileWriter(object):
- def __init__(self, file_path, file_format):
- self.file_path = file_path
- self.myfile = open(file_path, 'wb')
- self.file_format = file_format
- self.file_format.write(self.myfile)
-
- def write_byte_chunk(self, name, data):
- data_size = len(data)
- chunk_size = data_size + 2*8 # 2*sizeof(size_t) auf x64 System
- ch = ChunkHeader(name, chunk_size, data_size, 1)
- ch.write(self.myfile)
- self.myfile.write(struct.pack(str(data_size) + 's', data))
-
- def write_int_array_chunk(self, name, int_array):
- element_size = datatype_size_map['i']
- n_elements = len(int_array)
- chunk_size = n_elements*element_size + 2*8
- ch = ChunkHeader(name, chunk_size, element_size, n_elements)
- ch.write(self.myfile)
- int_array.tofile(self.myfile, format='%i')
-
- def write_float_array_chunk(self, name, float_array):
- element_size = datatype_size_map['f']
- n_elements = len(float_array)
- chunk_size = n_elements*element_size + 2*8
- ch = ChunkHeader(name, chunk_size, element_size, n_elements)
- ch.write(self.myfile)
- float_array.tofile(self.myfile, format='%f')
-
- def close(self):
- self.myfile.close()
|