Coverage for fiqus/getdp_runners/RunGetdpMultipole.py: 85%
169 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-02-01 01:38 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-02-01 01:38 +0000
1import os
2import pathlib
3import subprocess
4import logging
5import timeit
6import re
7import pandas as pd
9import gmsh
10from fiqus.pro_assemblers.ProAssembler import ASS_PRO as aP
11from fiqus.utils.Utils import GmshUtils
12from fiqus.utils.Utils import FilesAndFolders as Util
13from fiqus.data import DataFiQuS as dF
14from fiqus.data import RegionsModelFiQuS as rM
15from fiqus.data import DataMultipole as dM
17logger = logging.getLogger('FiQuS')
19trigger_time_deactivated = 99999
21class AssignNaming:
22 def __init__(self, data: dF.FDM() = None):
23 """
24 Class to assign naming convention
25 :param data: FiQuS data model
26 """
27 self.data: dF.FDM() = data
29 self.naming_conv = {'omega': 'Omega',
30 'boundary': 'Bnd_',
31 'powered': '_p', 'induced': '_i',
32 'air': '_a',
33 'air_far_field': '_aff',
34 'conducting': '_c',
35 'insulator': '_ins',
36 'terms': 'Terms',
37 'iron_yoke': '_bh', #todo: consider renaming this
38 'collar': '_collar',
39 'ref_mesh': '_refmesh',
40 'poles': '_poles'}
42 self.data.magnet.postproc.electromagnetics.volumes = \
43 [self.naming_conv['omega'] + (self.naming_conv[var] if var != 'omega' else '') for var in
44 self.data.magnet.postproc.electromagnetics.volumes]
45 self.data.magnet.postproc.thermal.volumes = \
46 [self.naming_conv['omega'] + (self.naming_conv[var] if var != 'omega' else '') for var in
47 self.data.magnet.postproc.thermal.volumes]
50class RunGetdpMultipole:
51 def __init__(self, data: AssignNaming = None, solution_folder: str = None, GetDP_path: str = None,
52 verbose: bool = False):
53 """
54 Class to solve pro file
55 :param data: FiQuS data model
56 :param GetDP_path: settings data model
57 :param verbose: If True more information is printed in python console.
58 """
59 logger.info(
60 f"Initializing Multipole runner for {os.path.basename(solution_folder)}."
61 )
62 self.data: dF.FDM() = data.data
63 self.naming_conv: dict = data.naming_conv
64 self.solution_folder = solution_folder
65 self.GetDP_path = GetDP_path
66 self.verbose: bool = verbose
67 self.call_method = 'subprocess' # or onelab or subprocess or alt_solver
69 self.rm_EM = rM.RegionsModel()
70 self.rm_TH = rM.RegionsModel()
71 self.aux = {
72 "half_turns": {
73 "ht": {}, # Dictionary to store half-turn data
74 "max_reg": None, # Maximum region number to avoid overlaps of physical regions
75 "block": {} # Dictionary that stores the relation between magnet blocks(groups) and half-turn areas
77 },
78 "e_cliq": {}, # Dictionary that stores the E-CLIQ source current LUTs from a csv
79 "tsa_collar": {},
80 "sim_info": {
81 'date': '',
82 'author': ''
83 }
84 }
85 self.material_properties_model = None
86 self.rohm = {} # rohm parameter dictionary
87 self.rohf = {} # rohf parameter dictionary
88 self.rc = dM.MultipoleRegionCoordinate() \
89 if self.data.magnet.mesh.thermal.isothermal_conductors and self.data.magnet.solve.thermal.solve_type else None
91 self.gu = GmshUtils(self.solution_folder, self.verbose)
92 self.gu.initialize(verbosity_Gmsh=self.data.run.verbosity_Gmsh)
93 self.occ = gmsh.model.occ
94 self.mesh = gmsh.model.mesh
95 self.brep_curves = {}
96 for name in self.data.magnet.geometry.electromagnetics.areas:
97 self.brep_curves[name] = {1: set(), 2: set(), 3: set(), 4: set()}
98 self.mesh_files = os.path.join(os.path.dirname(self.solution_folder), self.data.general.magnet_name)
99 self.model_file = os.path.join(self.solution_folder, 'Center_line.csv')
101 self.mf = {'EM': f"{self.mesh_files}_EM.msh", 'TH': f"{self.mesh_files}_TH.msh"}
103 def loadRegionFiles(self):
104 """
105 Read the regions' files and store the information in a variable. used for extracting the maximum region number.
106 :param
107 :return: Data from the regions files
108 """
109 if self.data.magnet.solve.electromagnetics.solve_type:
110 self.rm_EM = Util.read_data_from_yaml(f"{self.mesh_files}_EM.reg", rM.RegionsModel)
111 if self.data.magnet.solve.thermal.solve_type:
112 self.rm_TH = Util.read_data_from_yaml(f"{self.mesh_files}_TH.reg", rM.RegionsModel)
114 def loadRegionCoordinateFile(self):
115 """
116 Read the reco file and store the information in a variable.
117 :param
118 :return: Data from the reco file
119 """
120 self.rc = Util.read_data_from_yaml(f"{self.mesh_files}_TH.reco", dM.MultipoleRegionCoordinate)
122 def assemblePro(self):
123 """
124 Assemble the .pro file using the right template depending on the inputs from the yaml
125 :param
126 :return: Assembled .pro file
127 """
128 logger.info(f"Assembling pro file...")
129 start_time = timeit.default_timer()
130 ap = aP(file_base_path=os.path.join(self.solution_folder, self.data.general.magnet_name),
131 naming_conv=self.naming_conv)
132 BH_curves_path = os.path.join(pathlib.Path(os.path.dirname(__file__)).parent, 'pro_material_functions',
133 'ironBHcurves.pro')
134 template = 'Multipole_template.pro'
135 ap.assemble_combined_pro(template=template, rm_EM=self.rm_EM, rm_TH=self.rm_TH, rc=self.rc, dm=self.data,
136 mf=self.mf, BH_curves_path=BH_curves_path, aux=self.aux)
137 logger.info(
138 f"Assembling pro file took"
139 f" {timeit.default_timer() - start_time:.2f} s."
140 )
142 def solve_and_postprocess(self):
143 """
144 Generates the necessary GetDP commands to solve and postprocess the model, and runs them.
145 :param
146 :return: None
147 """
148 commands = None
149 if self.call_method == 'onelab':
150 commands = f"-solve -v2 -pos -verbose {self.data.run.verbosity_GetDP}"
151 elif self.call_method == 'subprocess':
152 commands = [["-solve", 'resolution', "-v2", "-pos", "Dummy", "-verbose", str(self.data.run.verbosity_GetDP),
153 "-mat_mumps_icntl_14", "250"]]
154 self._run(commands=commands)
156 def postprocess(self):
157 """
158 Generates the necessary GetDP commands to postprocess the model, and runs them.
159 :param
160 :return: None
161 """
162 if self.call_method == 'onelab':
163 commands = f"-v2 -pos -verbose {self.data.run.verbosity_GetDP}"
164 elif self.call_method == 'subprocess':
165 commands = [["-v2", "-pos", "Dummy", "-verbose", str(self.data.run.verbosity_GetDP)]]
166 elif self.call_method == 'alt_solver':
167 commands = [["-solve", 'resolution', "-v2", "-pos", "Dummy"]]
169 self._run(commands=commands)
171 def _run(self, commands):
172 """
173 runs GetDP with the specified commands. Additionally it captures and logs the output.
174 :param commands: List of commands to run GetDP with
175 :return: None
176 """
177 logger.info("Solving...")
178 start_time = timeit.default_timer()
179 if self.call_method == 'onelab':
180 for command in commands:
181 gmsh.onelab.run(f"{self.data.general.magnet_name}",
182 f"{self.GetDP_path} {os.path.join(self.solution_folder, self.data.general.magnet_name)}.pro {command}")
183 gmsh.onelab.setChanged("GetDP", 0)
184 elif self.call_method == 'subprocess' or self.call_method == 'alt_solver':
185 # subprocess.call([f"{self.GetDP_path}", f"{os.path.join(self.solution_folder, self.data.general.magnet_name)}.pro"] + command + ["-msh", f"{self.mesh_files}.msh"])
187 # view_tag = gmsh.view.getTags() # this should be b
188 # # # v = "View[" + str(gmsh.view.getIndex('b')) + "]"
189 # gmsh.view.write(view_tag, f"{os.path.join(self.solution_folder, self.data.general.magnet_name)}-view.msh")
191 if self.data.magnet.solve.noOfMPITasks:
192 mpi_prefix = ["mpiexec", "-np", str(self.data.magnet.solve.noOfMPITasks)]
193 else:
194 mpi_prefix = []
196 for command in commands:
197 use_predefined_pro = False # If True, one can use a predefined .pro file
198 if use_predefined_pro:
199 logger.warning("Using predefined .pro file")
200 getdpProcess = subprocess.Popen(
201 mpi_prefix + [f"{self.GetDP_path}",
202 f"{os.path.join(os.path.abspath(os.path.join(self.solution_folder, '..', '..', '..')), '_out/TEST')}.pro"] +
203 command,
204 stdout=subprocess.PIPE,
205 stderr=subprocess.STDOUT,
206 )
207 else:
208 getdpProcess = subprocess.Popen(
209 mpi_prefix + [f"{self.GetDP_path}",
210 f"{os.path.join(self.solution_folder, self.data.general.magnet_name)}.pro"] +
211 command,
212 stdout=subprocess.PIPE,
213 stderr=subprocess.STDOUT,
214 )
215 with getdpProcess.stdout:
216 for line in iter(getdpProcess.stdout.readline, b""):
217 line = line.decode("utf-8").rstrip()
218 line = line.split("\r")[-1]
219 if "Info" in line:
220 parsedLine = re.sub(r"Info\s*:\s*", "", line)
221 logger.info(parsedLine)
222 elif "Warning" in line:
223 parsedLine = re.sub(r"Warning\s*:\s*", "", line)
224 logger.warning(parsedLine)
225 elif "Error" in line:
226 parsedLine = re.sub(r"Error\s*:\s*", "", line)
227 logger.error(parsedLine)
228 raise Exception(parsedLine)
229 elif "Critical" in line:
230 parsedLine = re.sub(r"Critical\s*:\s*", "", line)
231 logger.critical(parsedLine)
232 # catch the maximum temperature line
233 elif "Maximum temperature" in line:
234 parsedLine = re.sub(r"Print\s*:\s*", "", line)
235 logger.info(parsedLine)
236 # this activates the debugging message mode
237 elif self.data.run.verbosity_GetDP > 99:
238 logger.info(line)
240 getdpProcess.wait()
242 logger.info(
243 f"Solving took {timeit.default_timer() - start_time:.2f} s."
244 )
246 def ending_step(self, gui: bool = False):
247 """
248 Finalize the GetDP runner, either by launching the GUI or finalizing gmsh.
249 :param gui: If True, launches the interactive GUI; otherwise finalizes gmsh.
250 :return: None
251 """
252 if gui:
253 self.gu.launch_interactive_GUI()
254 else:
255 if gmsh.isInitialized():
256 gmsh.clear()
257 gmsh.finalize()
259 def read_aux_file(self, aux_file_path):
260 """
261 Read the auxiliary file and store the information in a variable. Necessary to extract half-turn information from blocks.
262 :param aux_file_path: Path to the auxiliary file
263 :return: Data from the auxiliary file
264 """
265 try:
266 self.aux_data = Util.read_data_from_yaml(aux_file_path, dM.MultipoleData)
267 logger.info(f"Auxiliary data loaded from {aux_file_path}")
268 except FileNotFoundError:
269 logger.error(f"Auxiliary file not found: {aux_file_path}")
270 except Exception as e:
271 logger.error(f"Error reading auxiliary file: {e}")
273 def extract_half_turn_blocks(self):
274 """
275 Extract a dictionary from the .aux file where the keys are integers corresponding to the block number
276 and the dictionary entries for these keys are a list of all the half-turn areas included in that block.
277 :return: Dictionary with block numbers as keys and lists of half-turn areas as values
278 """
279 aux_data = self.aux_data
281 half_turn_areas = {}
282 info_block = {}
283 coils = aux_data.geometries.coil.coils
284 for coil_nr, coil_data in coils.items():
285 poles = coil_data.poles
286 for pole_nr, pole_data in poles.items():
287 layers = pole_data.layers
288 for layer_nr, layer_data in layers.items():
289 windings = layer_data.windings
290 for winding_nr, winding_data in windings.items():
291 blocks = winding_data.blocks
292 for block_nr, block_data in blocks.items():
293 half_turns = block_data.half_turns.areas
294 info_block[block_nr] = [int(area) for area in half_turns.keys()]
295 half_turn_areas[int(block_nr)] = [int(area) for area in half_turns.keys()]
297 self.aux["half_turns"]["ht"] = half_turn_areas
298 self.aux["half_turns"]["block"] = info_block
299 pass
301 def extract_specific_TSA_lines(self):
302 """
303 Extract a dictionary from the thermal .aux file, collects the outer lines of the half-turns and wedges used for the TSA collar
304 """
305 aux_data = self.aux_data
306 pg = aux_data.domains.physical_groups
307 max_layer = len([k for k in self.aux_data.geometries.coil.coils[1].poles[1].layers.keys()]) # todo : more elegant way ? @emma
308 tags = []
309 for block in pg.blocks.values():
310 for ht in block.half_turns.values():
311 if ht.group[-1] == str(max_layer): # r1_a2 or r2_a2
312 tags.append(ht.lines['o'])
313 self.aux["tsa_collar"]['ht_lines'] = tags
315 tags = []
316 for wedge in pg.wedges.values():
317 if wedge.group[-1] == str(max_layer): # r1_a2 or r2_a2
318 tags.append(wedge.lines['o'])
319 self.aux["tsa_collar"]['wedge_lines'] = tags
320 pass