Coverage for fiqus/parsers/ParserRES.py: 20%
75 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-25 02:54 +0100
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-25 02:54 +0100
1import pandas as pd
2import re
3from collections import defaultdict
5class ParserRES:
7 def __init__(self, res_file_path, write_data=None):
8 """
9 TO BE DONE!!
10 Read res file and returns its content as object attribute .pqv (postprocessed quantity value) that is a float
11 :param dat_file_path: Full path to .pos file, including file name and extension.
12 :return: nothing, keeps attribute pqv (postprocessed quantity value)
13 """
14 self._res_format_markers = {'s': '$ResFormat', 'e': '$EndResFormat'}
15 self._getdp_version_markers = {'s': '/* ', 'e': ','}
16 self._encoding_markers = {'s': ', ', 'e': ' */'}
17 # the 1.1 is hard-coded according to the GetDP documentation,
18 # see https://getdp.info/doc/texinfo/getdp.html#File-formats
19 self._res_file_format = {'s': '1.1 ', 'e': '\n$EndResFormat'}
21 self.solution = defaultdict(dict)
23 if write_data:
24 self._res_file_path = res_file_path
25 self._write_data = write_data
26 self._write_res_file()
27 else:
28 # read contents of .res file
29 with open(res_file_path) as f:
30 self._contents = f.read()
31 self._parse_res_file()
34 def __get_content_between_markers(self, markers_dict):
35 """
36 Gets text string between two markers specified in markers_dict
37 """
38 return self._contents[self._contents.find(markers_dict['s']) + len(markers_dict['s']):self._contents.find(markers_dict['e'])]
40 def _res_header(self):
41 """
42 Parse the header of the .res file.
43 Add the attributes:
44 - getdp_version: GetDP version that created the .res file
45 - encoding: encoding of the .res file
46 - res_file_format: format of the .res file
47 """
48 self.getdp_version = self.__get_content_between_markers(self._getdp_version_markers)
49 self.encoding = self.__get_content_between_markers(self._encoding_markers)
50 self.res_file_format = self.__get_content_between_markers(self._res_file_format)
52 def _get_all_solution_blocks(self):
53 """
54 Add all unparsed solution blocks to the attribute _solution_blocks
55 using regular expressions. It is a list of lists which each sub-list
56 containing exactly one solution block.
57 """
58 solution_string = self._contents[self._contents.find('$Solution'):]
59 self._solution_blocks = re.findall(r'\$Solution.*?\$EndSolution', solution_string, re.DOTALL)
61 def _parse_res_file_single_solution_block(self, solution_block_split_by_line):
63 # the first line is ignored
64 header = solution_block_split_by_line[1]
65 header_split = header.split()
66 dof_data = int(header_split[0])
67 time_real = float(header_split[1])
68 time_imag = float(header_split[2])
69 time_step = int(header_split[3])
70 solution = [float(entry) for entry in solution_block_split_by_line[2:-1]]
72 if "time_real" not in self.solution:
73 self.solution['time_real'] = [time_real]
74 else:
75 self.solution['time_real'].append(time_real)
77 if "time_imag" not in self.solution:
78 self.solution['time_imag'] = [time_imag]
79 else:
80 self.solution['time_imag'].append(time_imag)
82 if "time_step" not in self.solution:
83 self.solution['time_step'] = [time_step]
84 else:
85 self.solution['time_step'].append(time_step)
87 if "dof_data" not in self.solution:
88 self.solution['dof_data'] = [dof_data]
89 else:
90 self.solution['dof_data'].append(dof_data)
92 if "solution" not in self.solution:
93 self.solution['solution'] = [solution]
94 else:
95 self.solution['solution'].append(solution)
97 @staticmethod
98 def __get_lines(data_str):
99 """
100 Converts text string into a list of lines
101 """
102 data_str = re.sub('\n', "'", data_str)
103 data_str = re.sub('"', '', data_str)
104 str_list = re.split("'", data_str)
105 return str_list
107 def _parse_res_file_solution_blocks(self):
108 """
110 """
111 for solution_block in self._solution_blocks:
112 # split by line
113 solution_block_split_by_line = self.__get_lines(solution_block)
114 self._parse_res_file_single_solution_block(solution_block_split_by_line)
116 def _parse_res_file(self):
117 self._res_header()
118 self._get_all_solution_blocks()
119 self._parse_res_file_solution_blocks()
121 def _write_res_file(self):
122 with open(self._res_file_path, "w") as f:
123 # write header
124 f.write(f"$ResFormat /* {self._write_data.getdp_version}, {self._write_data.encoding} */\n")
125 # write res file format
126 f.write(f"1.1 {self._write_data.res_file_format}\n")
127 f.write(f"$EndResFormat\n")
129 self._write_solution_block(f)
131 def _write_solution_block(self, f):
132 for time_real, time_imag, time_step, dof_data, solution in zip(self._write_data.solution['time_real'], self._write_data.solution['time_imag'], self._write_data.solution['time_step'], self._write_data.solution['dof_data'], self._write_data.solution['solution']):
134 f.write(f"$Solution /* DofData #{dof_data} */\n")
135 f.write(f"{dof_data} {time_real:.16g} {time_imag:.16g} {time_step}\n")
136 f.write('\n'.join('{0:.16g}'.format(sol_entry) for sol_entry in solution))
137 f.write(f"\n$EndSolution\n")
139# ==============================================================================
140#parsedRes = ParserRES('test.res')
141#ParserRES('test_written.res', write_data=parsedRes)
142#import filecmp
143#print(filecmp.cmp('test.res', 'test_written.res'))