Coverage for fiqus/getdp_runners/RunGetdpPancake3D.py: 56%
134 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-14 02:31 +0100
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-14 02:31 +0100
1import os
2import timeit
3import logging
4import subprocess
5import re
7import gmsh
9from fiqus.utils.Utils import FilesAndFolders
10from fiqus.utils.Utils import GmshUtils
11from fiqus.data.RegionsModelFiQuS import RegionsModel
12from fiqus.pro_assemblers.ProAssembler import ASS_PRO as ass_pro
13from fiqus.mains.MainPancake3D import Base
14from fiqus.data.DataFiQuSPancake3D import Pancake3DGeometry, Pancake3DMesh
15from fiqus.parsers.ParserRES import ParserRES
17logger = logging.getLogger(__name__)
20class Solve(Base):
21 """
22 Main class to run GetDP for Pancake3D.
24 :param fdm: FiQuS data model
25 :param GetDP_path: Settings for GetDP
26 :type GetDP_path: dict
27 """
29 def __init__(
30 self,
31 fdm,
32 GetDP_path: str = None,
33 geom_folder=None,
34 mesh_folder=None,
35 solution_folder=None,
36 ) -> None:
37 super().__init__(fdm, geom_folder, mesh_folder, solution_folder)
39 self.GetDP_path = GetDP_path # Some settings for GetDP
40 # check for init from res file option
41 if self.dm.magnet.solve.initFromPrevious:
42 self.res_file_path = os.path.join(
43 self.mesh_folder,
44 f"Solution_{self.dm.magnet.solve.initFromPrevious}",
45 self.magnet_name + ".res",
46 )
48 if not os.path.isfile(self.res_file_path):
49 raise ValueError(f"Res file {self.res_file_path} does not exist.")
50 else:
51 if self.dm.run.type not in ["solve_only", "solve_with_post_process_python"]:
52 raise ValueError(
53 f"Run type should be solve only for init from res file option."
54 )
56 if self.dm.magnet.solve.type == "weaklyCoupled":
57 _no_of_previous_solutions = 2
58 else:
59 _no_of_previous_solutions = 1
61 logger.info(
62 f"Initializing from previous solution {self.res_file_path}."
63 )
65 # parse given res file
66 parsed_init_res = ParserRES(self.res_file_path)
68 # remove all but no_of_previous_solutions
69 parsed_init_res.solution["time_real"] = parsed_init_res.solution[
70 "time_real"
71 ][-_no_of_previous_solutions:]
72 parsed_init_res.solution["time_imag"] = parsed_init_res.solution[
73 "time_imag"
74 ][-_no_of_previous_solutions:]
75 parsed_init_res.solution["time_step"] = parsed_init_res.solution[
76 "time_step"
77 ][-_no_of_previous_solutions:]
78 parsed_init_res.solution["dof_data"] = parsed_init_res.solution[
79 "dof_data"
80 ][-_no_of_previous_solutions:]
81 parsed_init_res.solution["solution"] = parsed_init_res.solution[
82 "solution"
83 ][-_no_of_previous_solutions:]
85 if fdm.magnet.solve.time.start != parsed_init_res.solution["time_real"][0]:
86 raise ValueError(f"Initial time {fdm.magnet.solve.time.start} does not match with the initFromPrevious res file time {parsed_init_res.solution['time_real'][0]}.")
88 self.res_file_without_previous_solutions = os.path.join(
89 self.solution_folder, self.magnet_name + ".res"
90 )
92 ParserRES(self.res_file_without_previous_solutions, parsed_init_res)
94 # Create pro file:
95 self.ap = ass_pro(os.path.join(self.solution_folder, self.magnet_name))
97 # Start GMSH:
98 self.gu = GmshUtils(self.mesh_folder)
99 self.gu.initialize(verbosity_Gmsh=fdm.run.verbosity_Gmsh)
101 # Read regions model:
102 self.rm = FilesAndFolders.read_data_from_yaml(self.regions_file, RegionsModel)
104 # Check if the geometry data and mesh data has not been altered after they are
105 # created:
106 previousGeo = FilesAndFolders.read_data_from_yaml(
107 self.geometry_data_file, Pancake3DGeometry
108 )
109 previousMesh = FilesAndFolders.read_data_from_yaml(
110 self.mesh_data_file, Pancake3DMesh
111 )
113 if previousGeo.dict() != self.geo.dict():
114 raise ValueError(
115 "Geometry data has been changed. Please regenerate the geometry or load"
116 " the previous geometry data."
117 )
118 elif previousMesh.dict() != self.mesh.dict():
119 raise ValueError(
120 "Mesh data has been changed. Please regenerate the mesh or load the"
121 " previous mesh data."
122 )
124 def assemble_pro(self):
125 logger.info(f"Assembling pro file ({self.pro_file}) has been started.")
126 start_time = timeit.default_timer()
128 self.ap.assemble_combined_pro(
129 template=self.solve.proTemplate,
130 rm=self.rm,
131 dm=self.dm,
132 )
134 logger.info(
135 f"Assembling pro file ({self.pro_file}) has been finished in"
136 f" {timeit.default_timer() - start_time:.2f} s."
137 )
139 def run_getdp(self, solve=True, postOperation=True):
140 logger.info("Solving Pancake3D magnet has been started.")
141 start_time = timeit.default_timer()
143 getdpArguments = ["-v2", "-verbose", str(self.dm.run.verbosity_GetDP)]
145 if self.dm.magnet.solve.EECircuit.enable:
146 getdpArguments += ["-mat_mumps_cntl_1", "1e-6"]
147 else:
148 getdpArguments += ["-mat_mumps_cntl_1", "0"]
150 if self.dm.magnet.solve.initFromPrevious:
151 getdpArguments.extend(
152 ["-restart", "-res", str(self.res_file_without_previous_solutions)]
153 )
155 # Add solve argument
156 if solve:
157 getdpArguments.extend(["-solve", f"RESOLUTION_{self.solve.type}"])
159 # Add post operation argument
160 if postOperation:
161 posStringList = ["-pos"]
162 if solve is False:
163 # Quantities to be saved:
164 if self.dm.magnet.solve.quantitiesToBeSaved is not None:
165 for quantity in self.dm.magnet.solve.quantitiesToBeSaved:
166 posStringList.append(f"{quantity.getdpPostOperationName}")
168 if self.dm.magnet.postproc is not None:
169 if self.dm.magnet.postproc.timeSeriesPlots is not None:
170 # Post-operations for Python post-processing:
171 for timeSeriesPlot in self.dm.magnet.postproc.timeSeriesPlots:
172 posStringList.append(
173 f"POSTOP_timeSeriesPlot_{timeSeriesPlot.quantity}"
174 )
176 if self.dm.magnet.postproc.magneticFieldOnCutPlane is not None:
177 # Post-operations for Python post-processing:
178 posStringList.append("POSTOP_magneticFieldOnCutPlaneVector")
179 posStringList.append("POSTOP_magneticFieldOnCutPlaneMagnitude")
181 else:
182 posStringList.append("POSTOP_dummy")
183 getdpArguments.extend(posStringList)
185 # Add mesh argument
186 getdpArguments.extend(["-msh", f"{self.mesh_file}"])
188 # Add pre-processing argument
189 getdpArguments.extend(["-pre", f"RESOLUTION_{self.solve.type}"])
192 getdp_binary = self.GetDP_path
194 if self.solve.noOfMPITasks:
195 try:
196 import shutil
197 mpi_bin = shutil.which("mpiexec")
198 mpi_prefix = [mpi_bin, "-np", str(self.solve.noOfMPITasks)]
199 except:
200 logger.error("mpiexec not found. Running GetDP in serial mode.")
201 raise Exception("mpiexec not found. Running GetDP in serial mode.")
202 else:
203 mpi_prefix = []
205 getdpProcess = subprocess.Popen(
206 mpi_prefix + [getdp_binary, self.pro_file] + getdpArguments,
207 stdout=subprocess.PIPE,
208 stderr=subprocess.STDOUT,
209 )
210 with getdpProcess.stdout:
211 for line in iter(getdpProcess.stdout.readline, b""):
212 line = line.decode("utf-8").rstrip()
213 line = line.split("\r")[-1]
214 if "Info" in line:
215 parsedLine = re.sub(r"Info\s*:\s*", "", line)
216 logger.info(parsedLine)
217 elif "Warning" in line:
218 parsedLine = re.sub(r"Warning\s*:\s*", "", line)
219 if "Unknown" not in parsedLine:
220 logger.warning(parsedLine)
221 elif "Error" in line:
222 parsedLine = re.sub(r"Error\s*:\s*", "", line)
223 logger.error(parsedLine)
224 logger.error("Solving Pancake3D magnet has failed.")
225 raise Exception(parsedLine)
226 elif "Critical:" in line:
227 parsedLine = re.sub(r"Critical\s*:\s*", "", line)
228 if "Quench started!" in parsedLine:
229 logger.critical(r" _____ _ ___ ____ ")
230 logger.critical(r"| ___(_)/ _ \ _ _/ ___| ")
231 logger.critical(r"| |_ | | | | | | | \___ \ ")
232 logger.critical(r"| _| | | |_| | |_| |___) |")
233 logger.critical(r"|_| |_|\__\_\\__,_|____/ ")
234 logger.critical("")
235 logger.critical("The coil has been quenched!")
236 else:
237 logger.critical(parsedLine)
238 # this activates the debugging message mode
239 elif self.dm.run.verbosity_GetDP > 99:
240 logger.info(line)
242 getdpProcess.wait()
244 logger.info(
245 "Solving Pancake3D magnet has been finished in"
246 f" {timeit.default_timer() - start_time:.2f} s."
247 )
249 if self.solve_gui:
250 gmsh.option.setNumber("Geometry.Volumes", 0)
251 gmsh.option.setNumber("Geometry.Surfaces", 0)
252 gmsh.option.setNumber("Geometry.Curves", 0)
253 gmsh.option.setNumber("Geometry.Points", 0)
254 posFiles = [
255 fileName
256 for fileName in os.listdir(self.solution_folder)
257 if fileName.endswith(".pos")
258 ]
259 for posFile in posFiles:
260 gmsh.open(os.path.join(self.solution_folder, posFile))
261 self.gu.launch_interactive_GUI()
262 else:
263 gmsh.clear()
264 gmsh.finalize()