Coverage for fiqus/getdp_runners/RunGetdpMultipole.py: 84%

111 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-12-25 02:54 +0100

1import os 

2import pathlib 

3import subprocess 

4import logging 

5import timeit 

6import re 

7 

8import gmsh 

9from fiqus.pro_assemblers.ProAssembler import ASS_PRO as aP 

10from fiqus.utils.Utils import GmshUtils 

11from fiqus.utils.Utils import FilesAndFolders as Util 

12from fiqus.data import DataFiQuS as dF 

13from fiqus.data import RegionsModelFiQuS as rM 

14from fiqus.data import DataMultipole as dM 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class AssignNaming: 

20 def __init__(self, data: dF.FDM() = None): 

21 """ 

22 Class to assign naming convention 

23 :param data: FiQuS data model 

24 """ 

25 self.data: dF.FDM() = data 

26 

27 self.naming_conv = {'omega': 'Omega', 'boundary': 'Bd_', 'powered': '_p', 'induced': '_i', 'air': '_a', 

28 'air_far_field': '_aff', 'iron': '_bh', 'conducting': '_c', 'insulator': '_ins', 'terms': 'Terms'} 

29 self.data.magnet.postproc.electromagnetics.volumes = \ 

30 [self.naming_conv['omega'] + (self.naming_conv[var] if var != 'omega' else '') for var in self.data.magnet.postproc.electromagnetics.volumes] 

31 self.data.magnet.postproc.thermal.volumes = \ 

32 [self.naming_conv['omega'] + (self.naming_conv[var] if var != 'omega' else '') for var in self.data.magnet.postproc.thermal.volumes] 

33 

34 

35class RunGetdpMultipole: 

36 def __init__(self, data: AssignNaming = None, solution_folder: str = None, GetDP_path: str = None, verbose: bool = False): 

37 """ 

38 Class to solve pro file 

39 :param data: FiQuS data model 

40 :param GetDP_path: settings data model 

41 :param verbose: If True more information is printed in python console. 

42 """ 

43 logger.info( 

44 f"Initializing Multipole runner for {os.path.basename(solution_folder)}." 

45 ) 

46 self.data: dF.FDM() = data.data 

47 self.naming_conv: dict = data.naming_conv 

48 self.solution_folder = solution_folder 

49 self.GetDP_path = GetDP_path 

50 self.verbose: bool = verbose 

51 self.call_method = 'subprocess' # or onelab 

52 

53 self.rm_EM = rM.RegionsModel() 

54 self.rm_TH = rM.RegionsModel() 

55 self.rc = dM.MultipoleRegionCoordinate()\ 

56 if self.data.magnet.mesh.thermal.isothermal_conductors and self.data.magnet.solve.thermal.solve_type else None 

57 

58 self.gu = GmshUtils(self.solution_folder, self.verbose) 

59 self.gu.initialize(verbosity_Gmsh=self.data.run.verbosity_Gmsh) 

60 self.occ = gmsh.model.occ 

61 self.mesh = gmsh.model.mesh 

62 

63 self.brep_iron_curves = {1: set(), 2: set(), 3: set(), 4: set()} 

64 self.mesh_files = os.path.join(os.path.dirname(self.solution_folder), self.data.general.magnet_name) 

65 self.model_file = os.path.join(self.solution_folder, 'Center_line.csv') 

66 

67 self.mf = {'EM': f"{self.mesh_files}_EM.msh", 'TH': f"{self.mesh_files}_TH.msh"} 

68 

69 def loadRegionFiles(self): 

70 if self.data.magnet.solve.electromagnetics.solve_type: 

71 self.rm_EM = Util.read_data_from_yaml(f"{self.mesh_files}_EM.reg", rM.RegionsModel) 

72 if self.data.magnet.solve.thermal.solve_type: 

73 self.rm_TH = Util.read_data_from_yaml(f"{self.mesh_files}_TH.reg", rM.RegionsModel) 

74 

75 def loadRegionCoordinateFile(self): 

76 self.rc = Util.read_data_from_yaml(f"{self.mesh_files}_TH.reco", dM.MultipoleRegionCoordinate) 

77 

78 def assemblePro(self): 

79 logger.info(f"Assembling pro file...") 

80 start_time = timeit.default_timer() 

81 ap = aP(file_base_path=os.path.join(self.solution_folder, self.data.general.magnet_name), naming_conv=self.naming_conv) 

82 BH_curves_path = os.path.join(pathlib.Path(os.path.dirname(__file__)).parent, 'pro_material_functions', 'ironBHcurves.pro') 

83 ap.assemble_combined_pro(template='Multipole_template.pro', rm_EM=self.rm_EM, rm_TH=self.rm_TH, rc=self.rc, dm=self.data, mf=self.mf, BH_curves_path=BH_curves_path) 

84 logger.info( 

85 f"Assembling pro file took" 

86 f" {timeit.default_timer() - start_time:.2f} s." 

87 ) 

88 

89 def solve_and_postprocess(self): 

90 commands = None 

91 if self.call_method == 'onelab': 

92 commands = f"-solve -v2 -pos -verbose {self.data.run.verbosity_GetDP}" 

93 elif self.call_method == 'subprocess': 

94 commands = [] 

95 commands.append(["-solve", 'resolution', "-v2", "-pos", "Dummy", "-verbose", str(self.data.run.verbosity_GetDP)]) 

96 

97 self._run(commands=commands) 

98 

99 def postprocess(self): 

100 if self.call_method == 'onelab': 

101 command = "-v2 -pos -verbose {self.data.run.verbosity_GetDP}" 

102 elif self.call_method == 'subprocess': 

103 command = [["-v2", "-pos", "-verbose", str(self.data.run.verbosity_GetDP)]] 

104 self._run(commands=command) 

105 

106 def _run(self, commands): 

107 logger.info("Solving...") 

108 start_time = timeit.default_timer() 

109 if self.call_method == 'onelab': 

110 for command in commands: 

111 gmsh.onelab.run(f"{self.data.general.magnet_name}", 

112 f"{self.GetDP_path} {os.path.join(self.solution_folder, self.data.general.magnet_name)}.pro {command}") 

113 gmsh.onelab.setChanged("GetDP", 0) 

114 elif self.call_method == 'subprocess': 

115 # subprocess.call([f"{self.GetDP_path}", f"{os.path.join(self.solution_folder, self.data.general.magnet_name)}.pro"] + command + ["-msh", f"{self.mesh_files}.msh"]) 

116 

117 # view_tag = gmsh.view.getTags() # this should be b 

118 # # # v = "View[" + str(gmsh.view.getIndex('b')) + "]" 

119 # gmsh.view.write(view_tag, f"{os.path.join(self.solution_folder, self.data.general.magnet_name)}-view.msh") 

120 

121 if self.data.magnet.solve.noOfMPITasks: 

122 mpi_prefix = ["mpiexec", "-np", str(self.data.magnet.solve.noOfMPITasks)] 

123 else: 

124 mpi_prefix = [] 

125 

126 for command in commands: 

127 getdpProcess = subprocess.Popen( 

128 mpi_prefix + [f"{self.GetDP_path}", f"{os.path.join(self.solution_folder, self.data.general.magnet_name)}.pro"] + 

129 command, 

130 stdout=subprocess.PIPE, 

131 stderr=subprocess.STDOUT, 

132 ) 

133 with getdpProcess.stdout: 

134 for line in iter(getdpProcess.stdout.readline, b""): 

135 line = line.decode("utf-8").rstrip() 

136 line = line.split("\r")[-1] 

137 if "Info" in line: 

138 parsedLine = re.sub(r"Info\s*:\s*", "", line) 

139 logger.info(parsedLine) 

140 elif "Warning" in line: 

141 parsedLine = re.sub(r"Warning\s*:\s*", "", line) 

142 if "Unknown" not in parsedLine: 

143 logger.warning(parsedLine) 

144 elif "Error" in line: 

145 parsedLine = re.sub(r"Error\s*:\s*", "", line) 

146 logger.error(parsedLine) 

147 raise Exception(parsedLine) 

148 elif "Critical" in line: 

149 parsedLine = re.sub(r"Critical\s*:\s*", "", line) 

150 logger.critical(parsedLine) 

151 # catch the maximum temperature line 

152 elif "Maximum temperature" in line: 

153 parsedLine = re.sub(r"Print\s*:\s*", "", line) 

154 logger.info(parsedLine) 

155 # this activates the debugging message mode 

156 elif self.data.run.verbosity_GetDP > 99: 

157 logger.info(line) 

158 

159 getdpProcess.wait() 

160 

161 logger.info( 

162 f"Solving took {timeit.default_timer() - start_time:.2f} s." 

163 ) 

164 

165 def ending_step(self, gui: bool = False): 

166 if gui: 

167 self.gu.launch_interactive_GUI() 

168 else: 

169 gmsh.clear() 

170 gmsh.finalize()