Coverage for fiqus/getdp_runners/RunGetdpMultipole.py: 84%
111 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-25 02:54 +0100
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-25 02:54 +0100
1import os
2import pathlib
3import subprocess
4import logging
5import timeit
6import re
8import gmsh
9from fiqus.pro_assemblers.ProAssembler import ASS_PRO as aP
10from fiqus.utils.Utils import GmshUtils
11from fiqus.utils.Utils import FilesAndFolders as Util
12from fiqus.data import DataFiQuS as dF
13from fiqus.data import RegionsModelFiQuS as rM
14from fiqus.data import DataMultipole as dM
16logger = logging.getLogger(__name__)
19class AssignNaming:
20 def __init__(self, data: dF.FDM() = None):
21 """
22 Class to assign naming convention
23 :param data: FiQuS data model
24 """
25 self.data: dF.FDM() = data
27 self.naming_conv = {'omega': 'Omega', 'boundary': 'Bd_', 'powered': '_p', 'induced': '_i', 'air': '_a',
28 'air_far_field': '_aff', 'iron': '_bh', 'conducting': '_c', 'insulator': '_ins', 'terms': 'Terms'}
29 self.data.magnet.postproc.electromagnetics.volumes = \
30 [self.naming_conv['omega'] + (self.naming_conv[var] if var != 'omega' else '') for var in self.data.magnet.postproc.electromagnetics.volumes]
31 self.data.magnet.postproc.thermal.volumes = \
32 [self.naming_conv['omega'] + (self.naming_conv[var] if var != 'omega' else '') for var in self.data.magnet.postproc.thermal.volumes]
35class RunGetdpMultipole:
36 def __init__(self, data: AssignNaming = None, solution_folder: str = None, GetDP_path: str = None, verbose: bool = False):
37 """
38 Class to solve pro file
39 :param data: FiQuS data model
40 :param GetDP_path: settings data model
41 :param verbose: If True more information is printed in python console.
42 """
43 logger.info(
44 f"Initializing Multipole runner for {os.path.basename(solution_folder)}."
45 )
46 self.data: dF.FDM() = data.data
47 self.naming_conv: dict = data.naming_conv
48 self.solution_folder = solution_folder
49 self.GetDP_path = GetDP_path
50 self.verbose: bool = verbose
51 self.call_method = 'subprocess' # or onelab
53 self.rm_EM = rM.RegionsModel()
54 self.rm_TH = rM.RegionsModel()
55 self.rc = dM.MultipoleRegionCoordinate()\
56 if self.data.magnet.mesh.thermal.isothermal_conductors and self.data.magnet.solve.thermal.solve_type else None
58 self.gu = GmshUtils(self.solution_folder, self.verbose)
59 self.gu.initialize(verbosity_Gmsh=self.data.run.verbosity_Gmsh)
60 self.occ = gmsh.model.occ
61 self.mesh = gmsh.model.mesh
63 self.brep_iron_curves = {1: set(), 2: set(), 3: set(), 4: set()}
64 self.mesh_files = os.path.join(os.path.dirname(self.solution_folder), self.data.general.magnet_name)
65 self.model_file = os.path.join(self.solution_folder, 'Center_line.csv')
67 self.mf = {'EM': f"{self.mesh_files}_EM.msh", 'TH': f"{self.mesh_files}_TH.msh"}
69 def loadRegionFiles(self):
70 if self.data.magnet.solve.electromagnetics.solve_type:
71 self.rm_EM = Util.read_data_from_yaml(f"{self.mesh_files}_EM.reg", rM.RegionsModel)
72 if self.data.magnet.solve.thermal.solve_type:
73 self.rm_TH = Util.read_data_from_yaml(f"{self.mesh_files}_TH.reg", rM.RegionsModel)
75 def loadRegionCoordinateFile(self):
76 self.rc = Util.read_data_from_yaml(f"{self.mesh_files}_TH.reco", dM.MultipoleRegionCoordinate)
78 def assemblePro(self):
79 logger.info(f"Assembling pro file...")
80 start_time = timeit.default_timer()
81 ap = aP(file_base_path=os.path.join(self.solution_folder, self.data.general.magnet_name), naming_conv=self.naming_conv)
82 BH_curves_path = os.path.join(pathlib.Path(os.path.dirname(__file__)).parent, 'pro_material_functions', 'ironBHcurves.pro')
83 ap.assemble_combined_pro(template='Multipole_template.pro', rm_EM=self.rm_EM, rm_TH=self.rm_TH, rc=self.rc, dm=self.data, mf=self.mf, BH_curves_path=BH_curves_path)
84 logger.info(
85 f"Assembling pro file took"
86 f" {timeit.default_timer() - start_time:.2f} s."
87 )
89 def solve_and_postprocess(self):
90 commands = None
91 if self.call_method == 'onelab':
92 commands = f"-solve -v2 -pos -verbose {self.data.run.verbosity_GetDP}"
93 elif self.call_method == 'subprocess':
94 commands = []
95 commands.append(["-solve", 'resolution', "-v2", "-pos", "Dummy", "-verbose", str(self.data.run.verbosity_GetDP)])
97 self._run(commands=commands)
99 def postprocess(self):
100 if self.call_method == 'onelab':
101 command = "-v2 -pos -verbose {self.data.run.verbosity_GetDP}"
102 elif self.call_method == 'subprocess':
103 command = [["-v2", "-pos", "-verbose", str(self.data.run.verbosity_GetDP)]]
104 self._run(commands=command)
106 def _run(self, commands):
107 logger.info("Solving...")
108 start_time = timeit.default_timer()
109 if self.call_method == 'onelab':
110 for command in commands:
111 gmsh.onelab.run(f"{self.data.general.magnet_name}",
112 f"{self.GetDP_path} {os.path.join(self.solution_folder, self.data.general.magnet_name)}.pro {command}")
113 gmsh.onelab.setChanged("GetDP", 0)
114 elif self.call_method == 'subprocess':
115 # subprocess.call([f"{self.GetDP_path}", f"{os.path.join(self.solution_folder, self.data.general.magnet_name)}.pro"] + command + ["-msh", f"{self.mesh_files}.msh"])
117 # view_tag = gmsh.view.getTags() # this should be b
118 # # # v = "View[" + str(gmsh.view.getIndex('b')) + "]"
119 # gmsh.view.write(view_tag, f"{os.path.join(self.solution_folder, self.data.general.magnet_name)}-view.msh")
121 if self.data.magnet.solve.noOfMPITasks:
122 mpi_prefix = ["mpiexec", "-np", str(self.data.magnet.solve.noOfMPITasks)]
123 else:
124 mpi_prefix = []
126 for command in commands:
127 getdpProcess = subprocess.Popen(
128 mpi_prefix + [f"{self.GetDP_path}", f"{os.path.join(self.solution_folder, self.data.general.magnet_name)}.pro"] +
129 command,
130 stdout=subprocess.PIPE,
131 stderr=subprocess.STDOUT,
132 )
133 with getdpProcess.stdout:
134 for line in iter(getdpProcess.stdout.readline, b""):
135 line = line.decode("utf-8").rstrip()
136 line = line.split("\r")[-1]
137 if "Info" in line:
138 parsedLine = re.sub(r"Info\s*:\s*", "", line)
139 logger.info(parsedLine)
140 elif "Warning" in line:
141 parsedLine = re.sub(r"Warning\s*:\s*", "", line)
142 if "Unknown" not in parsedLine:
143 logger.warning(parsedLine)
144 elif "Error" in line:
145 parsedLine = re.sub(r"Error\s*:\s*", "", line)
146 logger.error(parsedLine)
147 raise Exception(parsedLine)
148 elif "Critical" in line:
149 parsedLine = re.sub(r"Critical\s*:\s*", "", line)
150 logger.critical(parsedLine)
151 # catch the maximum temperature line
152 elif "Maximum temperature" in line:
153 parsedLine = re.sub(r"Print\s*:\s*", "", line)
154 logger.info(parsedLine)
155 # this activates the debugging message mode
156 elif self.data.run.verbosity_GetDP > 99:
157 logger.info(line)
159 getdpProcess.wait()
161 logger.info(
162 f"Solving took {timeit.default_timer() - start_time:.2f} s."
163 )
165 def ending_step(self, gui: bool = False):
166 if gui:
167 self.gu.launch_interactive_GUI()
168 else:
169 gmsh.clear()
170 gmsh.finalize()