Coverage for fiqus/getdp_runners/RunGetdpMultipole.py: 85%

169 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-02-01 01:38 +0000

1import os 

2import pathlib 

3import subprocess 

4import logging 

5import timeit 

6import re 

7import pandas as pd 

8 

9import gmsh 

10from fiqus.pro_assemblers.ProAssembler import ASS_PRO as aP 

11from fiqus.utils.Utils import GmshUtils 

12from fiqus.utils.Utils import FilesAndFolders as Util 

13from fiqus.data import DataFiQuS as dF 

14from fiqus.data import RegionsModelFiQuS as rM 

15from fiqus.data import DataMultipole as dM 

16 

17logger = logging.getLogger('FiQuS') 

18 

19trigger_time_deactivated = 99999 

20 

21class AssignNaming: 

22 def __init__(self, data: dF.FDM() = None): 

23 """ 

24 Class to assign naming convention 

25 :param data: FiQuS data model 

26 """ 

27 self.data: dF.FDM() = data 

28 

29 self.naming_conv = {'omega': 'Omega', 

30 'boundary': 'Bnd_', 

31 'powered': '_p', 'induced': '_i', 

32 'air': '_a', 

33 'air_far_field': '_aff', 

34 'conducting': '_c', 

35 'insulator': '_ins', 

36 'terms': 'Terms', 

37 'iron_yoke': '_bh', #todo: consider renaming this 

38 'collar': '_collar', 

39 'ref_mesh': '_refmesh', 

40 'poles': '_poles'} 

41 

42 self.data.magnet.postproc.electromagnetics.volumes = \ 

43 [self.naming_conv['omega'] + (self.naming_conv[var] if var != 'omega' else '') for var in 

44 self.data.magnet.postproc.electromagnetics.volumes] 

45 self.data.magnet.postproc.thermal.volumes = \ 

46 [self.naming_conv['omega'] + (self.naming_conv[var] if var != 'omega' else '') for var in 

47 self.data.magnet.postproc.thermal.volumes] 

48 

49 

50class RunGetdpMultipole: 

51 def __init__(self, data: AssignNaming = None, solution_folder: str = None, GetDP_path: str = None, 

52 verbose: bool = False): 

53 """ 

54 Class to solve pro file 

55 :param data: FiQuS data model 

56 :param GetDP_path: settings data model 

57 :param verbose: If True more information is printed in python console. 

58 """ 

59 logger.info( 

60 f"Initializing Multipole runner for {os.path.basename(solution_folder)}." 

61 ) 

62 self.data: dF.FDM() = data.data 

63 self.naming_conv: dict = data.naming_conv 

64 self.solution_folder = solution_folder 

65 self.GetDP_path = GetDP_path 

66 self.verbose: bool = verbose 

67 self.call_method = 'subprocess' # or onelab or subprocess or alt_solver 

68 

69 self.rm_EM = rM.RegionsModel() 

70 self.rm_TH = rM.RegionsModel() 

71 self.aux = { 

72 "half_turns": { 

73 "ht": {}, # Dictionary to store half-turn data 

74 "max_reg": None, # Maximum region number to avoid overlaps of physical regions 

75 "block": {} # Dictionary that stores the relation between magnet blocks(groups) and half-turn areas 

76 

77 }, 

78 "e_cliq": {}, # Dictionary that stores the E-CLIQ source current LUTs from a csv 

79 "tsa_collar": {}, 

80 "sim_info": { 

81 'date': '', 

82 'author': '' 

83 } 

84 } 

85 self.material_properties_model = None 

86 self.rohm = {} # rohm parameter dictionary 

87 self.rohf = {} # rohf parameter dictionary 

88 self.rc = dM.MultipoleRegionCoordinate() \ 

89 if self.data.magnet.mesh.thermal.isothermal_conductors and self.data.magnet.solve.thermal.solve_type else None 

90 

91 self.gu = GmshUtils(self.solution_folder, self.verbose) 

92 self.gu.initialize(verbosity_Gmsh=self.data.run.verbosity_Gmsh) 

93 self.occ = gmsh.model.occ 

94 self.mesh = gmsh.model.mesh 

95 self.brep_curves = {} 

96 for name in self.data.magnet.geometry.electromagnetics.areas: 

97 self.brep_curves[name] = {1: set(), 2: set(), 3: set(), 4: set()} 

98 self.mesh_files = os.path.join(os.path.dirname(self.solution_folder), self.data.general.magnet_name) 

99 self.model_file = os.path.join(self.solution_folder, 'Center_line.csv') 

100 

101 self.mf = {'EM': f"{self.mesh_files}_EM.msh", 'TH': f"{self.mesh_files}_TH.msh"} 

102 

103 def loadRegionFiles(self): 

104 """ 

105 Read the regions' files and store the information in a variable. used for extracting the maximum region number. 

106 :param 

107 :return: Data from the regions files 

108 """ 

109 if self.data.magnet.solve.electromagnetics.solve_type: 

110 self.rm_EM = Util.read_data_from_yaml(f"{self.mesh_files}_EM.reg", rM.RegionsModel) 

111 if self.data.magnet.solve.thermal.solve_type: 

112 self.rm_TH = Util.read_data_from_yaml(f"{self.mesh_files}_TH.reg", rM.RegionsModel) 

113 

114 def loadRegionCoordinateFile(self): 

115 """ 

116 Read the reco file and store the information in a variable. 

117 :param 

118 :return: Data from the reco file 

119 """ 

120 self.rc = Util.read_data_from_yaml(f"{self.mesh_files}_TH.reco", dM.MultipoleRegionCoordinate) 

121 

122 def assemblePro(self): 

123 """ 

124 Assemble the .pro file using the right template depending on the inputs from the yaml 

125 :param 

126 :return: Assembled .pro file 

127 """ 

128 logger.info(f"Assembling pro file...") 

129 start_time = timeit.default_timer() 

130 ap = aP(file_base_path=os.path.join(self.solution_folder, self.data.general.magnet_name), 

131 naming_conv=self.naming_conv) 

132 BH_curves_path = os.path.join(pathlib.Path(os.path.dirname(__file__)).parent, 'pro_material_functions', 

133 'ironBHcurves.pro') 

134 template = 'Multipole_template.pro' 

135 ap.assemble_combined_pro(template=template, rm_EM=self.rm_EM, rm_TH=self.rm_TH, rc=self.rc, dm=self.data, 

136 mf=self.mf, BH_curves_path=BH_curves_path, aux=self.aux) 

137 logger.info( 

138 f"Assembling pro file took" 

139 f" {timeit.default_timer() - start_time:.2f} s." 

140 ) 

141 

142 def solve_and_postprocess(self): 

143 """ 

144 Generates the necessary GetDP commands to solve and postprocess the model, and runs them. 

145 :param 

146 :return: None 

147 """ 

148 commands = None 

149 if self.call_method == 'onelab': 

150 commands = f"-solve -v2 -pos -verbose {self.data.run.verbosity_GetDP}" 

151 elif self.call_method == 'subprocess': 

152 commands = [["-solve", 'resolution', "-v2", "-pos", "Dummy", "-verbose", str(self.data.run.verbosity_GetDP), 

153 "-mat_mumps_icntl_14", "250"]] 

154 self._run(commands=commands) 

155 

156 def postprocess(self): 

157 """ 

158 Generates the necessary GetDP commands to postprocess the model, and runs them. 

159 :param 

160 :return: None 

161 """ 

162 if self.call_method == 'onelab': 

163 commands = f"-v2 -pos -verbose {self.data.run.verbosity_GetDP}" 

164 elif self.call_method == 'subprocess': 

165 commands = [["-v2", "-pos", "Dummy", "-verbose", str(self.data.run.verbosity_GetDP)]] 

166 elif self.call_method == 'alt_solver': 

167 commands = [["-solve", 'resolution', "-v2", "-pos", "Dummy"]] 

168 

169 self._run(commands=commands) 

170 

171 def _run(self, commands): 

172 """ 

173 runs GetDP with the specified commands. Additionally it captures and logs the output. 

174 :param commands: List of commands to run GetDP with 

175 :return: None 

176 """ 

177 logger.info("Solving...") 

178 start_time = timeit.default_timer() 

179 if self.call_method == 'onelab': 

180 for command in commands: 

181 gmsh.onelab.run(f"{self.data.general.magnet_name}", 

182 f"{self.GetDP_path} {os.path.join(self.solution_folder, self.data.general.magnet_name)}.pro {command}") 

183 gmsh.onelab.setChanged("GetDP", 0) 

184 elif self.call_method == 'subprocess' or self.call_method == 'alt_solver': 

185 # subprocess.call([f"{self.GetDP_path}", f"{os.path.join(self.solution_folder, self.data.general.magnet_name)}.pro"] + command + ["-msh", f"{self.mesh_files}.msh"]) 

186 

187 # view_tag = gmsh.view.getTags() # this should be b 

188 # # # v = "View[" + str(gmsh.view.getIndex('b')) + "]" 

189 # gmsh.view.write(view_tag, f"{os.path.join(self.solution_folder, self.data.general.magnet_name)}-view.msh") 

190 

191 if self.data.magnet.solve.noOfMPITasks: 

192 mpi_prefix = ["mpiexec", "-np", str(self.data.magnet.solve.noOfMPITasks)] 

193 else: 

194 mpi_prefix = [] 

195 

196 for command in commands: 

197 use_predefined_pro = False # If True, one can use a predefined .pro file 

198 if use_predefined_pro: 

199 logger.warning("Using predefined .pro file") 

200 getdpProcess = subprocess.Popen( 

201 mpi_prefix + [f"{self.GetDP_path}", 

202 f"{os.path.join(os.path.abspath(os.path.join(self.solution_folder, '..', '..', '..')), '_out/TEST')}.pro"] + 

203 command, 

204 stdout=subprocess.PIPE, 

205 stderr=subprocess.STDOUT, 

206 ) 

207 else: 

208 getdpProcess = subprocess.Popen( 

209 mpi_prefix + [f"{self.GetDP_path}", 

210 f"{os.path.join(self.solution_folder, self.data.general.magnet_name)}.pro"] + 

211 command, 

212 stdout=subprocess.PIPE, 

213 stderr=subprocess.STDOUT, 

214 ) 

215 with getdpProcess.stdout: 

216 for line in iter(getdpProcess.stdout.readline, b""): 

217 line = line.decode("utf-8").rstrip() 

218 line = line.split("\r")[-1] 

219 if "Info" in line: 

220 parsedLine = re.sub(r"Info\s*:\s*", "", line) 

221 logger.info(parsedLine) 

222 elif "Warning" in line: 

223 parsedLine = re.sub(r"Warning\s*:\s*", "", line) 

224 logger.warning(parsedLine) 

225 elif "Error" in line: 

226 parsedLine = re.sub(r"Error\s*:\s*", "", line) 

227 logger.error(parsedLine) 

228 raise Exception(parsedLine) 

229 elif "Critical" in line: 

230 parsedLine = re.sub(r"Critical\s*:\s*", "", line) 

231 logger.critical(parsedLine) 

232 # catch the maximum temperature line 

233 elif "Maximum temperature" in line: 

234 parsedLine = re.sub(r"Print\s*:\s*", "", line) 

235 logger.info(parsedLine) 

236 # this activates the debugging message mode 

237 elif self.data.run.verbosity_GetDP > 99: 

238 logger.info(line) 

239 

240 getdpProcess.wait() 

241 

242 logger.info( 

243 f"Solving took {timeit.default_timer() - start_time:.2f} s." 

244 ) 

245 

246 def ending_step(self, gui: bool = False): 

247 """ 

248 Finalize the GetDP runner, either by launching the GUI or finalizing gmsh. 

249 :param gui: If True, launches the interactive GUI; otherwise finalizes gmsh. 

250 :return: None 

251 """ 

252 if gui: 

253 self.gu.launch_interactive_GUI() 

254 else: 

255 if gmsh.isInitialized(): 

256 gmsh.clear() 

257 gmsh.finalize() 

258 

259 def read_aux_file(self, aux_file_path): 

260 """ 

261 Read the auxiliary file and store the information in a variable. Necessary to extract half-turn information from blocks. 

262 :param aux_file_path: Path to the auxiliary file 

263 :return: Data from the auxiliary file 

264 """ 

265 try: 

266 self.aux_data = Util.read_data_from_yaml(aux_file_path, dM.MultipoleData) 

267 logger.info(f"Auxiliary data loaded from {aux_file_path}") 

268 except FileNotFoundError: 

269 logger.error(f"Auxiliary file not found: {aux_file_path}") 

270 except Exception as e: 

271 logger.error(f"Error reading auxiliary file: {e}") 

272 

273 def extract_half_turn_blocks(self): 

274 """ 

275 Extract a dictionary from the .aux file where the keys are integers corresponding to the block number 

276 and the dictionary entries for these keys are a list of all the half-turn areas included in that block. 

277 :return: Dictionary with block numbers as keys and lists of half-turn areas as values 

278 """ 

279 aux_data = self.aux_data 

280 

281 half_turn_areas = {} 

282 info_block = {} 

283 coils = aux_data.geometries.coil.coils 

284 for coil_nr, coil_data in coils.items(): 

285 poles = coil_data.poles 

286 for pole_nr, pole_data in poles.items(): 

287 layers = pole_data.layers 

288 for layer_nr, layer_data in layers.items(): 

289 windings = layer_data.windings 

290 for winding_nr, winding_data in windings.items(): 

291 blocks = winding_data.blocks 

292 for block_nr, block_data in blocks.items(): 

293 half_turns = block_data.half_turns.areas 

294 info_block[block_nr] = [int(area) for area in half_turns.keys()] 

295 half_turn_areas[int(block_nr)] = [int(area) for area in half_turns.keys()] 

296 

297 self.aux["half_turns"]["ht"] = half_turn_areas 

298 self.aux["half_turns"]["block"] = info_block 

299 pass 

300 

301 def extract_specific_TSA_lines(self): 

302 """ 

303 Extract a dictionary from the thermal .aux file, collects the outer lines of the half-turns and wedges used for the TSA collar 

304 """ 

305 aux_data = self.aux_data 

306 pg = aux_data.domains.physical_groups 

307 max_layer = len([k for k in self.aux_data.geometries.coil.coils[1].poles[1].layers.keys()]) # todo : more elegant way ? @emma 

308 tags = [] 

309 for block in pg.blocks.values(): 

310 for ht in block.half_turns.values(): 

311 if ht.group[-1] == str(max_layer): # r1_a2 or r2_a2 

312 tags.append(ht.lines['o']) 

313 self.aux["tsa_collar"]['ht_lines'] = tags 

314 

315 tags = [] 

316 for wedge in pg.wedges.values(): 

317 if wedge.group[-1] == str(max_layer): # r1_a2 or r2_a2 

318 tags.append(wedge.lines['o']) 

319 self.aux["tsa_collar"]['wedge_lines'] = tags 

320 pass