Coverage for fiqus/getdp_runners/RunGetdpPancake3D.py: 58%
118 statements
« prev ^ index » next coverage.py v6.4.4, created at 2024-05-20 03:24 +0200
« prev ^ index » next coverage.py v6.4.4, created at 2024-05-20 03:24 +0200
1import os
2import timeit
3import logging
4import subprocess
5import re
7import gmsh
9from fiqus.utils.Utils import FilesAndFolders
10from fiqus.utils.Utils import GmshUtils
11from fiqus.data.RegionsModelFiQuS import RegionsModel
12from fiqus.pro_assemblers.ProAssembler import ASS_PRO as ass_pro
13from fiqus.mains.MainPancake3D import Base
14from fiqus.data.DataFiQuSPancake3D import Pancake3DGeometry, Pancake3DMesh
15from fiqus.parsers.ParserRES import ParserRES
17logger = logging.getLogger(__name__)
20class Solve(Base):
21 """
22 Main class to run GetDP for Pancake3D.
24 :param fdm: FiQuS data model
25 :param GetDP_path: Settings for GetDP
26 :type GetDP_path: dict
27 """
29 def __init__(
30 self,
31 fdm,
32 GetDP_path: str = None,
33 geom_folder=None,
34 mesh_folder=None,
35 solution_folder=None,
36 ) -> None:
37 super().__init__(fdm, geom_folder, mesh_folder, solution_folder)
39 self.GetDP_path = GetDP_path # Some settings for GetDP
40 # check for init from res file option
41 if self.dm.magnet.solve.initFromPrevious:
42 self.res_file_path = os.path.join(
43 self.mesh_folder,
44 f"Solution_{self.dm.magnet.solve.initFromPrevious}",
45 self.magnet_name + ".res",
46 )
48 if not os.path.isfile(self.res_file_path):
49 raise ValueError(f"Res file {self.res_file_path} does not exist.")
50 else:
51 if self.dm.run.type not in ["solve_only", "solve_with_post_process_python"]:
52 raise ValueError(
53 f"Run type should be solve only for init from res file option."
54 )
56 if self.dm.magnet.solve.type == "weaklyCoupled":
57 _no_of_previous_solutions = 2
58 else:
59 _no_of_previous_solutions = 1
61 logger.info(
62 f"Initializing from previous solution {self.res_file_path}."
63 )
65 # parse given res file
66 parsed_init_res = ParserRES(self.res_file_path)
68 # remove all but no_of_previous_solutions
69 parsed_init_res.solution["time_real"] = parsed_init_res.solution[
70 "time_real"
71 ][-_no_of_previous_solutions:]
72 parsed_init_res.solution["time_imag"] = parsed_init_res.solution[
73 "time_imag"
74 ][-_no_of_previous_solutions:]
75 parsed_init_res.solution["time_step"] = parsed_init_res.solution[
76 "time_step"
77 ][-_no_of_previous_solutions:]
78 parsed_init_res.solution["dof_data"] = parsed_init_res.solution[
79 "dof_data"
80 ][-_no_of_previous_solutions:]
81 parsed_init_res.solution["solution"] = parsed_init_res.solution[
82 "solution"
83 ][-_no_of_previous_solutions:]
85 self.res_file_without_previous_solutions = os.path.join(
86 self.solution_folder, self.magnet_name + ".res"
87 )
88 ParserRES(self.res_file_without_previous_solutions, parsed_init_res)
90 # Create pro file:
91 self.ap = ass_pro(os.path.join(self.solution_folder, self.magnet_name))
93 # Start GMSH:
94 self.gu = GmshUtils(self.mesh_folder)
95 self.gu.initialize()
97 # Read regions model:
98 self.rm = FilesAndFolders.read_data_from_yaml(self.regions_file, RegionsModel)
100 # Check if the geometry data and mesh data has not been altered after they are
101 # created:
102 previousGeo = FilesAndFolders.read_data_from_yaml(
103 self.geometry_data_file, Pancake3DGeometry
104 )
105 previousMesh = FilesAndFolders.read_data_from_yaml(
106 self.mesh_data_file, Pancake3DMesh
107 )
109 if previousGeo.dict() != self.geo.dict():
110 raise ValueError(
111 "Geometry data has been changed. Please regenerate the geometry or load"
112 " the previous geometry data."
113 )
114 elif previousMesh.dict() != self.mesh.dict():
115 raise ValueError(
116 "Mesh data has been changed. Please regenerate the mesh or load the"
117 " previous mesh data."
118 )
120 def assemble_pro(self):
121 logger.info(f"Assembling pro file ({self.pro_file}) has been started.")
122 start_time = timeit.default_timer()
124 self.ap.assemble_combined_pro(
125 template=self.solve.proTemplate,
126 rm=self.rm,
127 dm=self.dm,
128 )
130 logger.info(
131 f"Assembling pro file ({self.pro_file}) has been finished in"
132 f" {timeit.default_timer() - start_time:.2f} s."
133 )
135 def run_getdp(self, solve=True, postOperation=True):
136 logger.info("Solving Pancake3D magnet has been started.")
137 start_time = timeit.default_timer()
139 getdpArguments = ["-v2", "-mat_mumps_cntl_1", "0", "-verbose", "6"]
141 if self.dm.magnet.solve.initFromPrevious:
142 getdpArguments.extend(
143 ["-restart", "-res", str(self.res_file_without_previous_solutions)]
144 )
146 # Add solve argument
147 if solve:
148 getdpArguments.extend(["-solve", f"RESOLUTION_{self.solve.type}"])
150 # Add post operation argument
151 if postOperation:
152 posStringList = ["-pos"]
153 if solve is False:
154 # Quantities to be saved:
155 if self.dm.magnet.solve.save is not None:
156 for quantity in self.dm.magnet.solve.save:
157 posStringList.append(f"{quantity.getdpPostOperationName}")
159 if self.dm.magnet.postproc is not None:
160 if self.dm.magnet.postproc.timeSeriesPlots is not None:
161 # Post-operations for Python post-processing:
162 for timeSeriesPlot in self.dm.magnet.postproc.timeSeriesPlots:
163 posStringList.append(
164 f"POSTOP_timeSeriesPlot_{timeSeriesPlot.quantity}"
165 )
167 if self.dm.magnet.postproc.magneticFieldOnCutPlane is not None:
168 # Post-operations for Python post-processing:
169 posStringList.append("POSTOP_magneticFieldOnCutPlaneVector")
170 posStringList.append("POSTOP_magneticFieldOnCutPlaneMagnitude")
172 else:
173 posStringList.append("POSTOP_dummy")
174 getdpArguments.extend(posStringList)
176 # Add mesh argument
177 getdpArguments.extend(["-msh", f"{self.mesh_file}"])
179 # Add pre-processing argument
180 getdpArguments.extend(["-pre", f"RESOLUTION_{self.solve.type}"])
182 getdp_binary = self.GetDP_path
184 getdpProcess = subprocess.Popen(
185 [getdp_binary, self.pro_file] + getdpArguments,
186 stdout=subprocess.PIPE,
187 stderr=subprocess.STDOUT,
188 )
189 with getdpProcess.stdout:
190 for line in iter(getdpProcess.stdout.readline, b""):
191 line = line.decode("utf-8").rstrip()
192 line = line.split("\r")[-1]
193 if "Info" in line:
194 parsedLine = re.sub(r"Info\s*:\s*", "", line)
195 logger.info(parsedLine)
196 elif "Warning" in line:
197 parsedLine = re.sub(r"Warning\s*:\s*", "", line)
198 if "Unknown" not in parsedLine:
199 logger.warning(parsedLine)
200 elif "Error" in line:
201 parsedLine = re.sub(r"Error\s*:\s*", "", line)
202 logger.error(parsedLine)
203 logger.error("Solving Pancake3D magnet has failed.")
204 raise Exception(parsedLine)
205 elif "Critical" in line:
206 parsedLine = re.sub(r"Critical\s*:\s*", "", line)
207 if "Quench started!" in parsedLine:
208 logger.critical(r" _____ _ ___ ____ ")
209 logger.critical(r"| ___(_)/ _ \ _ _/ ___| ")
210 logger.critical(r"| |_ | | | | | | | \___ \ ")
211 logger.critical(r"| _| | | |_| | |_| |___) |")
212 logger.critical(r"|_| |_|\__\_\\__,_|____/ ")
213 logger.critical("")
214 logger.critical("The coil has been quenched!")
215 else:
216 logger.critical(parsedLine)
217 # else:
218 # logger.info(line)
220 getdpProcess.wait()
222 logger.info(
223 "Solving Pancake3D magnet has been finished in"
224 f" {timeit.default_timer() - start_time:.2f} s."
225 )
227 if self.solve_gui:
228 gmsh.option.setNumber("Geometry.Volumes", 0)
229 gmsh.option.setNumber("Geometry.Surfaces", 0)
230 gmsh.option.setNumber("Geometry.Curves", 0)
231 gmsh.option.setNumber("Geometry.Points", 0)
232 posFiles = [
233 fileName
234 for fileName in os.listdir(self.solution_folder)
235 if fileName.endswith(".pos")
236 ]
237 for posFile in posFiles:
238 gmsh.open(os.path.join(self.solution_folder, posFile))
239 self.gu.launch_interactive_GUI()
240 else:
241 gmsh.clear()
242 gmsh.finalize()