Coverage for fiqus/parsers/ParserRES.py: 20%

75 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2024-05-20 03:24 +0200

1import pandas as pd 

2import re 

3from collections import defaultdict 

4 

5class ParserRES: 

6 

7 def __init__(self, res_file_path, write_data=None): 

8 """ 

9 TO BE DONE!! 

10 Read res file and returns its content as object attribute .pqv (postprocessed quantity value) that is a float 

11 :param dat_file_path: Full path to .pos file, including file name and extension. 

12 :return: nothing, keeps attribute pqv (postprocessed quantity value) 

13 """ 

14 self._res_format_markers = {'s': '$ResFormat', 'e': '$EndResFormat'} 

15 self._getdp_version_markers = {'s': '/* ', 'e': ','} 

16 self._encoding_markers = {'s': ', ', 'e': ' */'} 

17 # the 1.1 is hard-coded according to the GetDP documentation,  

18 # see https://getdp.info/doc/texinfo/getdp.html#File-formats 

19 self._res_file_format = {'s': '1.1 ', 'e': '\n$EndResFormat'} 

20 

21 self.solution = defaultdict(dict) 

22 

23 if write_data: 

24 self._res_file_path = res_file_path 

25 self._write_data = write_data 

26 self._write_res_file() 

27 else: 

28 # read contents of .res file 

29 with open(res_file_path) as f: 

30 self._contents = f.read() 

31 self._parse_res_file() 

32 

33 

34 def __get_content_between_markers(self, markers_dict): 

35 """ 

36 Gets text string between two markers specified in markers_dict 

37 """ 

38 return self._contents[self._contents.find(markers_dict['s']) + len(markers_dict['s']):self._contents.find(markers_dict['e'])] 

39 

40 def _res_header(self): 

41 """ 

42 Parse the header of the .res file. 

43 Add the attributes:  

44 - getdp_version: GetDP version that created the .res file 

45 - encoding: encoding of the .res file 

46 - res_file_format: format of the .res file 

47 """ 

48 self.getdp_version = self.__get_content_between_markers(self._getdp_version_markers) 

49 self.encoding = self.__get_content_between_markers(self._encoding_markers) 

50 self.res_file_format = self.__get_content_between_markers(self._res_file_format) 

51 

52 def _get_all_solution_blocks(self): 

53 """ 

54 Add all unparsed solution blocks to the attribute _solution_blocks 

55 using regular expressions. It is a list of lists which each sub-list  

56 containing exactly one solution block. 

57 """ 

58 solution_string = self._contents[self._contents.find('$Solution'):] 

59 self._solution_blocks = re.findall(r'\$Solution.*?\$EndSolution', solution_string, re.DOTALL) 

60 

61 def _parse_res_file_single_solution_block(self, solution_block_split_by_line): 

62 

63 # the first line is ignored  

64 header = solution_block_split_by_line[1] 

65 header_split = header.split() 

66 dof_data = int(header_split[0]) 

67 time_real = float(header_split[1]) 

68 time_imag = float(header_split[2]) 

69 time_step = int(header_split[3]) 

70 solution = [float(entry) for entry in solution_block_split_by_line[2:-1]] 

71 

72 if "time_real" not in self.solution: 

73 self.solution['time_real'] = [time_real] 

74 else: 

75 self.solution['time_real'].append(time_real) 

76 

77 if "time_imag" not in self.solution: 

78 self.solution['time_imag'] = [time_imag] 

79 else: 

80 self.solution['time_imag'].append(time_imag) 

81 

82 if "time_step" not in self.solution: 

83 self.solution['time_step'] = [time_step] 

84 else: 

85 self.solution['time_step'].append(time_step) 

86 

87 if "dof_data" not in self.solution: 

88 self.solution['dof_data'] = [dof_data] 

89 else: 

90 self.solution['dof_data'].append(dof_data) 

91 

92 if "solution" not in self.solution: 

93 self.solution['solution'] = [solution] 

94 else: 

95 self.solution['solution'].append(solution) 

96 

97 @staticmethod 

98 def __get_lines(data_str): 

99 """ 

100 Converts text string into a list of lines 

101 """ 

102 data_str = re.sub('\n', "'", data_str) 

103 data_str = re.sub('"', '', data_str) 

104 str_list = re.split("'", data_str) 

105 return str_list 

106 

107 def _parse_res_file_solution_blocks(self): 

108 """ 

109  

110 """ 

111 for solution_block in self._solution_blocks: 

112 # split by line 

113 solution_block_split_by_line = self.__get_lines(solution_block) 

114 self._parse_res_file_single_solution_block(solution_block_split_by_line) 

115 

116 def _parse_res_file(self): 

117 self._res_header() 

118 self._get_all_solution_blocks() 

119 self._parse_res_file_solution_blocks() 

120 

121 def _write_res_file(self): 

122 with open(self._res_file_path, "w") as f: 

123 # write header 

124 f.write(f"$ResFormat /* {self._write_data.getdp_version}, {self._write_data.encoding} */\n") 

125 # write res file format 

126 f.write(f"1.1 {self._write_data.res_file_format}\n") 

127 f.write(f"$EndResFormat\n") 

128 

129 self._write_solution_block(f) 

130 

131 def _write_solution_block(self, f): 

132 for time_real, time_imag, time_step, dof_data, solution in zip(self._write_data.solution['time_real'], self._write_data.solution['time_imag'], self._write_data.solution['time_step'], self._write_data.solution['dof_data'], self._write_data.solution['solution']): 

133 

134 f.write(f"$Solution /* DofData #{dof_data} */\n") 

135 f.write(f"{dof_data} {time_real:.16g} {time_imag:.16g} {time_step}\n") 

136 f.write('\n'.join('{0:.16g}'.format(sol_entry) for sol_entry in solution)) 

137 f.write(f"\n$EndSolution\n") 

138 

139# ============================================================================== 

140#parsedRes = ParserRES('test.res') 

141#ParserRES('test_written.res', write_data=parsedRes) 

142#import filecmp  

143#print(filecmp.cmp('test.res', 'test_written.res'))