Coverage for fiqus/MainFiQuS.py: 65%
241 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-11-29 01:35 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-11-29 01:35 +0000
1import argparse
2import csv
3import os
4import pathlib
5import sys
6import time
7import getpass
8import platform
9import subprocess
10import json
12import pandas as pd
14FiQuS_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
15sys.path.insert(0, FiQuS_path)
17from fiqus.utils.Utils import FilesAndFolders as Util
18from fiqus.utils.Utils import CheckForExceptions as Check
19from fiqus.utils.Utils import create_json_schema
20from fiqus.utils.Utils import get_data_settings
21from fiqus.utils.Utils import initialize_logger
22from fiqus.data.DataFiQuS import FDM, SolveDumpDataModel
23from fiqus.data.DataSettings import DataSettings
24from fiqus.mains.MainCCT import MainCCT
25from fiqus.mains.MainMultipole import MainMultipole
26from fiqus.mains.MainPancake3D import MainPancake3D
27from fiqus.mains.MainConductorAC_Strand import MainConductorAC_Strand
28from fiqus.mains.MainHomogenizedConductor import MainHomogenizedConductor
30class MainFiQuS:
31 """
32 This is the top level class of FiQuS.
33 """
35 def __init__(
36 self,
37 input_file_path: str = None,
38 model_folder: str = None,
39 GetDP_path=None,
40 fdm=None,
41 fds=None,
42 htcondor_jobid=None
43 ):
44 """
45 Main class for working with FiQuS simulations
46 :param input_file_path: full path to input file yaml
47 :type input_file_path: str
48 :param model_folder: full path to the base output folder, called model folder
49 :type model_folder: str
50 :param GetDP_path: full path to GetDP executable
51 :type GetDP_path: str
52 :param fdm: FiQuS Data Model - object of fiqus DataFiQus
53 :type fdm: object
54 :param fds: FiQuS Data Settings - object of DataSettings
55 :type fds: object
56 """
57 self.time_stamp = time.strftime("%Y-%m-%d-%H-%M-%S")
59 self.start_folder = os.getcwd()
60 self.wrk_folder = model_folder
61 self.file_name = None
63 # Load yaml input file
64 if not fdm:
65 self.fdm: FDM = Util.read_data_from_yaml(input_file_path, FDM)
66 copyInputFile = (
67 "copy"
68 f" {input_file_path} {os.path.join(self.wrk_folder, 'logs', f'INPUT_FILE_{self.time_stamp}.FiQuS.yaml')}"
69 )
70 subprocess.run(copyInputFile, shell=True, stdout=subprocess.DEVNULL)
71 else:
72 self.fdm: FDM = fdm
73 verbose = self.fdm.run.verbosity_FiQuS
74 self.logger = initialize_logger(
75 verbose=verbose, time_stamp=self.time_stamp, work_folder=self.wrk_folder
76 )
77 if verbose:
78 Util.print_welcome_graphics()
79 # Intialize logger
81 # Create JSON schema
82 create_json_schema(self.fdm)
84 # Check for input errors
85 Check.check_inputs(run=self.fdm.run)
87 # Initialize Main object
88 if self.fdm.magnet.type == "CCT_straight":
89 self.main_magnet = MainCCT(fdm=self.fdm, verbose=verbose)
90 elif self.fdm.magnet.type == "Pancake3D":
91 self.main_magnet = MainPancake3D(fdm=self.fdm, verbose=verbose)
92 elif self.fdm.magnet.type == "CACStrand":
93 self.main_magnet = MainConductorAC_Strand(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose)
94 elif self.fdm.magnet.type == "HomogenizedConductor":
95 self.main_magnet = MainHomogenizedConductor(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose)
96 elif self.fdm.magnet.type == "multipole":
97 self.file_name = os.path.basename(input_file_path)[:-5]
98 if not self.fdm.magnet.geometry.geom_file_path:
99 self.fdm.magnet.geometry.geom_file_path = f"{input_file_path[:-5]}.geom"
100 self.main_magnet = MainMultipole(
101 fdm=self.fdm,
102 rgd_path=self.fdm.magnet.geometry.geom_file_path,
103 verbose=verbose,
104 )
106 else:
107 raise ValueError(
108 f"FiQuS does not support magnet type: {self.fdm.magnet.type}!"
109 )
111 # Load user paths for executables and additional files
112 self.logger.info(f'{getpass.getuser()} is running on {platform.platform()}')
113 if not fds:
114 fds = get_data_settings(GetDP_path=GetDP_path)
115 else:
116 fds = get_data_settings(GetDP_path=GetDP_path, settings=fds)
117 self.main_magnet.GetDP_path = fds.GetDP_path
118 self.logger.info(f"{self.main_magnet.GetDP_path} is going to be used for FE solving.")
120 # self.logger.info(gmsh.onelab.run(self.fdm.general.magnet_name, f"{self.main_magnet.settings['GetDP_path']} -info"))
122 # update htcondor csv
123 if htcondor_jobid:
124 base_path_model_files = fds.base_path_model_files
125 htcondor_csv_file = os.path.join(base_path_model_files, "htcondor_run_log.csv")
127 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Running")
129 # Save Model/Geometry/Mesh/Solution folder paths
130 self.save_folders()
132 # Build magnet
133 self.summary = dict.fromkeys(
134 [
135 "SJ",
136 "SICN",
137 "SIGE",
138 "Gamma",
139 "nodes",
140 "solution_time",
141 "overall_error",
142 "minimum_diff",
143 "maximum_diff",
144 ]
145 )
147 try:
148 self.build_magnet()
149 except Exception as e:
150 # update htcondor csv
151 if htcondor_jobid:
152 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Failed")
154 self.logger.error(f"Error: {e}")
155 raise e
156 else:
157 # update htcondor csv
158 if htcondor_jobid:
159 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Finished")
161 def save_folders(self):
162 """
163 Method to make or delete folders of FiQuS
164 :return: Nothing, only does file and folder operation
165 :rtype: None
166 """
167 def _check_and_generate_path(folder_type: str = None):
168 if folder_type == "Geometry":
169 folder = self.wrk_folder
170 elif folder_type == "Mesh":
171 folder = self.main_magnet.geom_folder
172 elif folder_type == "Solution":
173 folder = self.main_magnet.mesh_folder
174 else:
175 raise Exception("Incompatible type.")
177 if getattr(self.fdm.run, folder_type.lower()) is None:
178 # folder_key is not given, so it is computed
179 folder_key = Util.compute_folder_key(
180 folder_type=folder_type,
181 folder=folder,
182 overwrite=self.fdm.run.overwrite,
183 )
184 else:
185 # folder_key is given
186 folder_key = getattr(self.fdm.run, folder_type.lower())
188 required_folder = folder_type in required_folders
189 if self.fdm.run.overwrite and folder_type == (
190 required_folders[0] if required_folders else None
191 ):
192 Check.check_overwrite_conditions(
193 folder_type=folder_type, folder=folder, folder_key=folder_key
194 )
195 return Util.get_folder_path(
196 folder_type=folder_type,
197 folder=folder,
198 folder_key=folder_key,
199 overwrite=self.fdm.run.overwrite,
200 required_folder=required_folder,
201 )
203 if self.fdm.run.type == "start_from_yaml":
204 required_folders = ["Geometry", "Mesh", "Solution"]
205 elif self.fdm.run.type == "geometry_and_mesh":
206 required_folders = ["Geometry", "Mesh"]
207 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python":
208 required_folders = ["Mesh", "Solution"]
209 elif self.fdm.run.type in ["solve_with_post_process_python", "solve_only"]:
210 required_folders = ["Solution"]
211 elif self.fdm.run.type == "geometry_only":
212 required_folders = (
213 []
214 if self.fdm.run.geometry and not self.fdm.run.overwrite
215 else ["Geometry"]
216 )
217 elif self.fdm.run.type == "mesh_only":
218 required_folders = (
219 [] if self.fdm.run.mesh and not self.fdm.run.overwrite else ["Mesh"]
220 )
221 else: # post_process_getdp_only or post_process_python_only or plot_python
222 required_folders = []
226 self.main_magnet.geom_folder = _check_and_generate_path(folder_type="Geometry")
227 if not self.fdm.run.type in ["geometry_only"]:
228 self.main_magnet.mesh_folder = _check_and_generate_path(folder_type="Mesh")
229 if not (
230 self.fdm.run.type == "geometry_only"
231 or self.fdm.run.type == "mesh_only"
232 ):
233 self.main_magnet.solution_folder = _check_and_generate_path(
234 folder_type="Solution"
235 )
237 if self.fdm.run.type in [
238 "start_from_yaml",
239 "geometry_and_mesh",
240 "geometry_only",
241 ]:
242 Util.write_data_model_to_yaml(
243 os.path.join(self.main_magnet.geom_folder, "geometry.yaml"),
244 self.fdm.magnet.geometry,
245 by_alias=True,
246 with_comments=True,
247 )
248 if self.fdm.run.type in [
249 "start_from_yaml",
250 "geometry_and_mesh",
251 "mesh_and_solve_with_post_process_python",
252 "mesh_only",
253 ]:
254 Util.write_data_model_to_yaml(
255 os.path.join(self.main_magnet.mesh_folder, "mesh.yaml"),
256 self.fdm.magnet.mesh,
257 by_alias=True,
258 with_comments=True,
259 )
260 if self.fdm.run.type in [
261 "start_from_yaml",
262 "mesh_and_solve_with_post_process_python",
263 "solve_with_post_process_python",
264 "solve_only",
265 "post_process",
266 "plot_python",
267 "postprocess_veusz"
268 ]:
269 solve_dump_data = SolveDumpDataModel(
270 solve=self.fdm.magnet.solve,
271 circuit=self.fdm.circuit,
272 power_supply=self.fdm.power_supply,
273 quench_protection=self.fdm.quench_protection,
274 quench_detection=self.fdm.quench_detection,
275 conductors=self.fdm.conductors
276 )
277 Util.write_data_model_to_yaml(
278 os.path.join(self.main_magnet.solution_folder, "solve.yaml"),
279 solve_dump_data,
280 by_alias=True,
281 with_comments=True,
282 )
283 if self.fdm.run.type in [
284 "start_from_yaml",
285 "mesh_and_solve_with_post_process_python",
286 "solve_with_post_process_python",
287 "post_process_python_only",
288 "post_process_getdp_only",
289 "post_process",
290 "postprocess_veusz",
291 "plot_python"
292 ]:
293 Util.write_data_model_to_yaml(
294 os.path.join(self.main_magnet.solution_folder, "postproc.yaml"),
295 self.fdm.magnet.postproc,
296 by_alias=True,
297 with_comments=True,
298 )
300 try:
301 run_type = self.fdm.run.type
302 comments = self.fdm.run.comments
303 if self.main_magnet.geom_folder is not None:
304 geo_folder = os.path.relpath(self.main_magnet.geom_folder)
305 geo_folder = os.path.relpath(
306 geo_folder, os.path.join("tests", "_outputs")
307 )
308 else:
309 geo_folder = "-"
311 if self.main_magnet.mesh_folder is not None:
312 mesh_folder = os.path.relpath(self.main_magnet.mesh_folder)
313 mesh_folder = os.path.relpath(
314 mesh_folder, os.path.join("tests", "_outputs")
315 )
316 else:
317 mesh_folder = "-"
319 if self.main_magnet.solution_folder is not None:
320 solution_folder = os.path.relpath(self.main_magnet.solution_folder)
321 solution_folder = os.path.relpath(
322 solution_folder, os.path.join("tests", "_outputs")
323 )
324 else:
325 solution_folder = "-"
327 run_log_row = [
328 self.time_stamp,
329 run_type,
330 comments,
331 geo_folder,
332 mesh_folder,
333 solution_folder,
334 ]
335 self.add_to_run_log(
336 os.path.join(self.wrk_folder, "run_log.csv"), run_log_row
337 )
338 except:
339 self.logger.warning("Run log could not be completed.")
342 def build_magnet(self):
343 """
344 Main method to build magnets, i.e. to run various fiqus run types and magnet types
345 :return: none
346 :rtype: none
347 """
348 if self.fdm.run.type == "start_from_yaml":
349 self.main_magnet.generate_geometry()
350 self.main_magnet.pre_process()
351 self.main_magnet.load_geometry()
352 for key, value in self.main_magnet.mesh().items():
353 self.summary[key] = value
354 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp()
355 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
356 self.summary[key] = value
357 elif self.fdm.run.type == "pre_process_only":
358 self.main_magnet.pre_process()
359 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
360 self.summary[key] = value # todo: DISABLE FOR ONE GROUP ONLY
361 elif self.fdm.run.type == "geometry_only":
362 self.main_magnet.generate_geometry(
363 gui=(self.main_magnet.fdm.run.launch_gui if self.fdm.magnet.type != "CCT_straight" else False)
364 )
365 if self.fdm.magnet.type in ["CCT_straight", "CWS"]:
366 self.main_magnet.pre_process(gui=self.main_magnet.fdm.run.launch_gui)
367 elif self.fdm.run.type == "geometry_and_mesh":
368 self.main_magnet.generate_geometry()
369 self.main_magnet.pre_process()
370 self.main_magnet.load_geometry()
371 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items():
372 self.summary[key] = value
373 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python":
374 self.main_magnet.load_geometry()
375 for key, value in self.main_magnet.mesh().items():
376 self.summary[key] = value
377 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp()
378 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
379 self.summary[key] = value
380 elif self.fdm.run.type == "mesh_only":
381 self.main_magnet.load_geometry()
382 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items():
383 self.summary[key] = value
384 elif self.fdm.run.type == "solve_with_post_process_python":
385 self.summary["solution_time"] = (
386 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui)
387 )
388 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
389 self.summary[key] = value
390 elif self.fdm.run.type == "solve_only":
391 self.summary["solution_time"] = (
392 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui)
393 )
394 elif self.fdm.run.type == "postprocess_veusz":
395 self.main_magnet.post_process_veusz(gui=self.main_magnet.fdm.run.launch_gui)
396 elif self.fdm.run.type == "post_process_getdp_only":
397 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui)
398 elif self.fdm.run.type == "post_process_python_only":
399 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
400 self.summary[key] = value
401 elif self.fdm.run.type == "post_process":
402 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui)
403 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
404 self.summary[key] = value
405 elif self.fdm.run.type == "plot_python":
406 self.main_magnet.plot_python()
408 elif self.fdm.run.type == "batch_post_process_python":
409 self.main_magnet.batch_post_process_python()
410 os.chdir(self.start_folder)
412 if self.file_name:
413 file_path = os.path.join(self.wrk_folder, f"{self.file_name}.json")
414 with open(file_path, 'w', encoding='utf-8') as f:
415 json.dump(self.summary, f, indent=2)
417 @staticmethod
418 def add_to_run_log(path_to_csv, run_log_row):
419 # If file does not exist, write the header
420 if not os.path.isfile(path_to_csv):
421 header = [
422 "Time Stamp",
423 "Run Type",
424 "Comments",
425 "Geometry Directory",
426 "Mesh Directory",
427 "Solution Directory",
428 ]
429 with open(path_to_csv, "a", newline="") as csv_file:
430 writer = csv.writer(csv_file)
431 writer.writerow(header)
433 # Open the CSV file in append mode
434 with open(path_to_csv, "a+", newline="") as csv_file:
435 writer = csv.writer(csv_file)
436 writer.writerow(run_log_row)
438 def change_htcondor_run_log(self, htcondor_csv_file, htcondor_jobid, new_status="None"):
439 try:
440 df = pd.read_csv(htcondor_csv_file)
441 df.loc[df['Job ID'] == htcondor_jobid, 'Status'] = str(new_status)
442 self.logger.info(f"Changed status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.")
443 df.to_csv(htcondor_csv_file, index=False)
444 except:
445 self.logger.warning(f"Could not change status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.")
447if __name__ == "__main__":
448 parser = argparse.ArgumentParser(
449 prog="FiQuS",
450 description="Finite Elements Quench Simulator",
451 epilog="steam-team@cern.ch",
452 )
453 parser.add_argument(
454 dest="full_path_input",
455 type=str,
456 help="Full path to FiQuS input yaml file",
457 )
458 parser.add_argument(
459 "--output", '-o', dest="output_path", type=str, help="Full path to FiQuS output folder"
460 )
461 parser.add_argument(
462 "--getdp", '-g', dest="GetDP_path", type=str, help="Full path to GetDP executable"
463 )
465 parser.add_argument("--htcondor_jobid", '-j', type=int, default=0,
466 help="HTCondor job ID (optional)", required=False)
468 parser.add_argument("--fiqus_data_model", '-m', type=str,
469 help="Full path to FiQuS Data Model file (optional)", required=False)
471 parser.add_argument("--fiqus_data_settings", '-s', type=str,
472 help="Full path to FiQuS Data Settings file (optional)", required=False)
474 args, unknown = parser.parse_known_args()
476 # remove these options from sys.argv, otherwise they are passed onto Gmsh
477 # in Gmsh.initialize()
478 options_to_remove = ["-o", "-g", "-j", "-m", "-s"]
479 # Loop through and remove each option and its value
480 i = 0
481 while i < len(sys.argv):
482 if sys.argv[i] in options_to_remove:
483 sys.argv.pop(i) # Remove the option
484 if i < len(sys.argv):
485 sys.argv.pop(i) # Remove the associated value
486 else:
487 i += 1
489 if args.fiqus_data_model != None and args.fiqus_data_settings != None:
490 # read fdm and fds from a file (HTCondor case)
491 input_fdm = Util.read_data_from_yaml(args.fiqus_data_model, FDM)
492 input_fds = Util.read_data_from_yaml(args.fiqus_data_settings, DataSettings)
494 MainFiQuS(
495 input_file_path=args.full_path_input,
496 model_folder=args.output_path,
497 fdm=input_fdm,
498 fds=input_fds,
499 htcondor_jobid=args.htcondor_jobid
500 )
501 else:
502 # fdm and fds from input (STEAM SDK case)
503 MainFiQuS(
504 input_file_path=args.full_path_input,
505 model_folder=args.output_path,
506 GetDP_path=args.GetDP_path,
507 )
508 print("FiQuS run completed")