pv_parser.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. '''
  2. Created on 20.08.2020
  3. Author:
  4. Michael Diedenhofen
  5. Max Planck Institute for Metabolism Research, Cologne
  6. Read Bruker ParaVision JCAMP parameter files (e.g. acqp, method, visu_pars).
  7. '''
  8. from __future__ import print_function
  9. VERSION = 'pv_parser.py v 1.0.2 20200820'
  10. import re
  11. import sys
  12. import collections
  13. import numpy as np
  14. def strfind(string, sub):
  15. len_sub = len(sub)
  16. result = []
  17. if (len_sub == 0) or (len_sub > len(string)):
  18. return result
  19. pos = string.find(sub)
  20. while pos >= 0:
  21. result.append(pos)
  22. pos = string.find(sub, pos + len_sub)
  23. return result
  24. def strtok(string, delimiters=None):
  25. token = ''
  26. remainder = ''
  27. len_str = len(string)
  28. if len_str == 0:
  29. return (token, remainder)
  30. if delimiters is None: # whitespace characters
  31. delimiters = list(map(chr, list(range(9, 14)) + [32]))
  32. i = 0
  33. while string[i] in delimiters:
  34. i += 1
  35. if i >= len_str:
  36. return (token, remainder)
  37. start = i
  38. while string[i] not in delimiters:
  39. i += 1
  40. if i >= len_str:
  41. break
  42. token = string[start:i]
  43. remainder = string[i:len_str]
  44. return (token, remainder)
  45. def extract_jcamp_strings(string, get_all=True):
  46. if string is None:
  47. result = None
  48. elif get_all:
  49. result = re.findall(r'<(.*?)>', string)
  50. else:
  51. result = re.search(r'<(.*?)>', string)
  52. if result is not None:
  53. result = result.group(1)
  54. return result
  55. def extract_unit_string(string):
  56. if string is None:
  57. result = None
  58. else:
  59. result = re.search(r'\[(.*?)\]', string)
  60. if result is not None:
  61. result = result.group(1)
  62. else:
  63. result = string
  64. return result
  65. def replace_jcamp_strings(string):
  66. pos_stop = 0
  67. elements = []
  68. str_list = []
  69. index = 0
  70. while True:
  71. pos_start = string.find('<', pos_stop)
  72. if pos_start < 0:
  73. elements.append(string[pos_stop:])
  74. break
  75. elements.append(string[pos_stop:pos_start])
  76. pos_stop = string.find('>', pos_start + 1)
  77. if pos_stop < 0:
  78. elements.append(string[pos_start:])
  79. break
  80. pos_stop += 1
  81. elements.append(''.join(['<#', str(index), '>']))
  82. str_list.append(string[pos_start:pos_stop])
  83. index += 1
  84. return (''.join(elements), str_list)
  85. def check_struct_list(values, str_list):
  86. flag_int = True
  87. flag_float = True
  88. for value in values:
  89. if flag_int:
  90. try:
  91. value = int(value)
  92. except ValueError:
  93. flag_int = False
  94. else:
  95. continue
  96. try:
  97. value = float(value)
  98. except ValueError:
  99. flag_float = False
  100. break
  101. if flag_int:
  102. return (list(map(int, values)), 0)
  103. if flag_float:
  104. return (list(map(float, values)), 0)
  105. # Restore JCAMP strings
  106. count = len(str_list)
  107. if count > 0:
  108. for index, value in enumerate(values):
  109. result = re.findall(r'<#(.*?)>', value)
  110. if len(result) == 1:
  111. str_id = int(result[0])
  112. values[index] = str_list[str_id]
  113. count -= 1
  114. if count == 0:
  115. break
  116. elif len(result) > 1:
  117. sys.exit("Found more than one ID string in a value: %s" % (value,))
  118. return (values, len(str_list) - count)
  119. def create_struct_list(string, str_list, restored):
  120. if len(string) < 1:
  121. return ([], restored)
  122. # Split one struct in its parts
  123. #items = re.split(r'^ +| *, *| +$', string)
  124. items = re.split(r'(?:^ +| *),(?: *| +$)', string)
  125. #items = [x.strip(' ') for x in string.split(',')]
  126. for index, item in enumerate(items):
  127. #values = re.findall(r'[^\s]+', item)
  128. values = item.split(' ')
  129. #values = item.split()
  130. values, number = check_struct_list(values, str_list)
  131. if len(values) == 1:
  132. items[index] = values[0]
  133. else:
  134. items[index] = values
  135. restored += number
  136. return (items, restored)
  137. def push_list(level, obj_list, obj):
  138. while level > 0:
  139. obj_list = obj_list[-1]
  140. level -= 1
  141. obj_list.append(obj)
  142. def parse_struct(string, str_list):
  143. level = 0
  144. restored = 0
  145. obj_list = []
  146. pos_start = string.find('(')
  147. if pos_start < 0:
  148. return (obj_list, restored)
  149. pos_left, start_left = (pos_start + 1, True)
  150. pos_start = string.find('(', pos_left)
  151. pos_stop = string.find(')', pos_left)
  152. while True:
  153. if (pos_start >= pos_left) and (pos_stop >= pos_left):
  154. pos_right, start_right = (pos_start, True) if pos_start < pos_stop else (pos_stop, False)
  155. elif pos_start >= pos_left:
  156. pos_right, start_right = (pos_start, True)
  157. elif pos_stop >= pos_left:
  158. pos_right, start_right = (pos_stop, False)
  159. else:
  160. pos_right, start_right = (len(string), False)
  161. sub = string[pos_left:pos_right].strip(' ')
  162. if sub.startswith(','):
  163. sub = sub[1:].lstrip(' ')
  164. if sub.endswith(','):
  165. sub = sub[:-1].rstrip(' ')
  166. #print("sub:%d:%s:" % (len(sub), sub))
  167. items, restored = create_struct_list(sub, str_list, restored)
  168. if start_left:
  169. push_list(level, obj_list, items)
  170. if start_right:
  171. level += 1
  172. else:
  173. for item in items:
  174. push_list(level, obj_list, item)
  175. if not start_right:
  176. level -= 1
  177. if pos_right >= len(string):
  178. break
  179. pos_left, start_left = (pos_right + 1, start_right)
  180. if start_left:
  181. pos_start = string.find('(', pos_left)
  182. else:
  183. pos_stop = string.find(')', pos_left)
  184. return (obj_list, restored)
  185. def check_array_list(values):
  186. flag_int = True
  187. flag_float = True
  188. for value in values:
  189. if flag_int:
  190. try:
  191. value = int(value)
  192. except ValueError:
  193. flag_int = False
  194. else:
  195. continue
  196. try:
  197. value = float(value)
  198. except ValueError:
  199. flag_float = False
  200. break
  201. if flag_int:
  202. return np.array(values, dtype=np.int32)
  203. if flag_float:
  204. return np.array(values, dtype=np.float64)
  205. return np.array(values, dtype=object)
  206. def get_array_values(label, sizes, data):
  207. # Removing whitespaces at the edge of strings
  208. #data = data.replace('< ', '<')
  209. #data = data.replace(' >', '>')
  210. if data.startswith('<'): # Checking if array is a single string or an array of strings ...
  211. #data = data.replace('> <', '><')
  212. #values = re.findall(r'<(.*?)>', data)
  213. values = re.findall(r'<.*?>', data)
  214. if len(sizes) > 1:
  215. values = np.array(values, dtype=object)
  216. if np.prod(sizes[:-1]) == values.size:
  217. values = values.reshape(sizes[:-1])
  218. elif len(values) == 1:
  219. values = values[0]
  220. elif data.startswith('('): # ... or a struct or an array of structs ...
  221. if len(sizes) > 1:
  222. print("Warning: The sizes dimension is greater than 1 for the %s array of structs." % (label,), file=sys.stderr)
  223. data, str_list = replace_jcamp_strings(data)
  224. values, restored = parse_struct(data, str_list)
  225. if len(str_list) != restored:
  226. print("%s:" % (label,), values)
  227. sys.exit("Not all replaced JCAMP strings are restored (%d of %d)." % (restored, len(str_list)))
  228. else: # ... or a simple array (most frequently numeric)
  229. values = re.findall(r'[^\s]+', data)
  230. #values = data.split()
  231. values = np.reshape(check_array_list(values), sizes)
  232. return values
  233. def read_param_file(filename):
  234. # Open parameter file
  235. try:
  236. fid = open(filename, 'r')
  237. except IOError as V:
  238. if V.errno == 2:
  239. sys.exit("Cannot open parameter file %s" % (filename,))
  240. else:
  241. raise
  242. # Generate header information
  243. header = collections.OrderedDict()
  244. weekdays = ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun')
  245. line = ''
  246. for index, line in enumerate(fid):
  247. line = line.lstrip(' \t').rstrip('\r\n')
  248. if line.startswith('##$'):
  249. break
  250. #print("line:%d:%s:" % (len(line), line))
  251. if line.startswith('##'): # It's a variable with ##
  252. # Retrieve the Labeled Data Record
  253. label, value = strtok(line, delimiters='=')
  254. label = strtok(label, delimiters='#')[0].strip()
  255. value = strtok(value, delimiters='=')[0].strip()
  256. # Save value without $
  257. #value = strtok(value, delimiters='$')[0].strip()
  258. header[label] = value
  259. elif line.startswith('$$'): # It's a comment
  260. comment = strtok(line, delimiters='$')[0].strip()
  261. if comment.startswith('/'):
  262. header['Path'] = comment
  263. elif comment.startswith('process'):
  264. header['Process'] = comment[8:]
  265. else:
  266. pos = strfind(comment[:10], '-')
  267. if (comment[:3] in weekdays) or ((comment[:2] in ('19', '20')) and (len(pos) == 2)):
  268. header['Date'] = comment
  269. else:
  270. header['Header' + str(index + 1)] = comment
  271. # Check if using a supported version of JCAMP file format
  272. if 'JCAMPDX' in header:
  273. version = float(header['JCAMPDX'])
  274. elif 'JCAMP-DX' in header:
  275. version = float(header['JCAMP-DX'])
  276. else:
  277. raise KeyError("The file header does not contain the key 'JCAMP-DX'.")
  278. #sys.exit("The file header is not correct.")
  279. if (version != 4.24) and (version != 5):
  280. print("Warning: JCAMP version %s is not supported (%s)." % (version, filename), file=sys.stderr)
  281. params = collections.OrderedDict()
  282. # Loop for reading parameters
  283. while line.lstrip(' \t').startswith('##'):
  284. result = re.search(r'##(.*)=(.*)', line)
  285. result = [] if result is None else list(result.groups())
  286. # Checking if label present and removing proprietary tag
  287. try:
  288. label = result[0]
  289. except:
  290. label = None
  291. else:
  292. if label.startswith('$'):
  293. label = label[1:]
  294. #print("label:%d:%s:" % (len(label), label))
  295. # Checking if value present otherwise value is set to empty string
  296. try:
  297. value = result[1]
  298. except:
  299. value = ''
  300. #print("value:%d:%s:" % (len(value), value))
  301. flag_comment = True if '$$' in line else False
  302. line = ''
  303. data = []
  304. for line in fid:
  305. if line.lstrip(' \t').startswith('##'):
  306. break
  307. if not line.lstrip(' \t').startswith('$$'): # Skip comment line
  308. if (not flag_comment) and ('$$' in line):
  309. flag_comment = True
  310. #data.append(line.rstrip('\\\r\n'))
  311. data.append(line.rstrip('\r\n'))
  312. #print("line:%d:%s:" % (len(data[-1]), data[-1]))
  313. # Create data string
  314. data = ''.join(data)
  315. #print("data:%d:%s:" % (len(data), data))
  316. if flag_comment:
  317. sys.exit("Found JCAMP comment ('$$') in LDR %s." % (label,))
  318. # Checking for END tag
  319. if (label is None) or (label == 'END'):
  320. break
  321. # Checking if value is a string or an array, a struct or a single value
  322. if value.startswith('( <'):
  323. print("Warning: The parsing of the LDR %s failed." % (label,), file=sys.stderr)
  324. elif value.startswith('( '): # A single string, an array of strings or structs or a simple array
  325. sizes = [int(x) for x in value.strip('( )').split(',')]
  326. try:
  327. params[label] = get_array_values(label, sizes, data)
  328. except ValueError:
  329. pass
  330. elif value.startswith('('): # A struct
  331. data = ''.join([value, data])
  332. params[label] = get_array_values(label, [1], data)[0]
  333. else: # A single value
  334. try:
  335. params[label] = int(value)
  336. except ValueError:
  337. try:
  338. params[label] = float(value)
  339. except ValueError:
  340. params[label] = value
  341. fid.close()
  342. if label != 'END':
  343. sys.exit("Unexpected end of file: Missing END Statement")
  344. return (header, params)
  345. def main():
  346. import argparse
  347. parser = argparse.ArgumentParser(description='Read ParaVision parameter file')
  348. parser.add_argument('filename', help='ParaVision parameter file (acqp, method, visu_pars)')
  349. args = parser.parse_args()
  350. # read parameter file
  351. header, params = read_param_file(args.filename)
  352. for (label, value) in header.items():
  353. print("%s: %s" % (label, value))
  354. for (label, value) in params.items():
  355. if isinstance(value, np.ndarray):
  356. print("%s:" % (label,))
  357. print(value)
  358. else:
  359. print("%s: %s" % (label, value))
  360. if __name__ == '__main__':
  361. main()