Coverage for fiqus/MainFiQuS.py: 66%
247 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-01-08 01:36 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-01-08 01:36 +0000
1import argparse
2import csv
3import os
4import pathlib
5import sys
6import time
7import getpass
8import platform
9import subprocess
10import json
12import pandas as pd
14FiQuS_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
15sys.path.insert(0, FiQuS_path)
17from fiqus.utils.Utils import FilesAndFolders as Util
18from fiqus.utils.Utils import CheckForExceptions as Check
19from fiqus.utils.Utils import create_json_schema
20from fiqus.utils.Utils import get_data_settings
21from fiqus.utils.Utils import initialize_logger
22from fiqus.data.DataFiQuS import FDM, SolveDumpDataModel
23from fiqus.data.DataSettings import DataSettings
24from fiqus.mains.MainCCT import MainCCT
25from fiqus.mains.MainMultipole import MainMultipole
26from fiqus.mains.MainPancake3D import MainPancake3D
27from fiqus.mains.MainConductorAC_Strand import MainConductorAC_Strand
28from fiqus.mains.MainHomogenizedConductor import MainHomogenizedConductor
29from fiqus.mains.MainConductorAC_Rutherford import MainConductorAC_Rutherford
30from fiqus.mains.MainConductorAC_CC import MainConductorAC_CC
32class MainFiQuS:
33 """
34 This is the top level class of FiQuS.
35 """
37 def __init__(
38 self,
39 input_file_path: str = None,
40 model_folder: str = None,
41 GetDP_path=None,
42 fdm=None,
43 fds=None,
44 htcondor_jobid=None
45 ):
46 """
47 Main class for working with FiQuS simulations
48 :param input_file_path: full path to input file yaml
49 :type input_file_path: str
50 :param model_folder: full path to the base output folder, called model folder
51 :type model_folder: str
52 :param GetDP_path: full path to GetDP executable
53 :type GetDP_path: str
54 :param fdm: FiQuS Data Model - object of fiqus DataFiQus
55 :type fdm: object
56 :param fds: FiQuS Data Settings - object of DataSettings
57 :type fds: object
58 """
59 self.time_stamp = time.strftime("%Y-%m-%d-%H-%M-%S")
61 self.start_folder = os.getcwd()
62 self.wrk_folder = model_folder
63 self.file_name = None
65 # Load yaml input file
66 if not fdm:
67 self.fdm: FDM = Util.read_data_from_yaml(input_file_path, FDM)
68 copyInputFile = (
69 "copy"
70 f" {input_file_path} {os.path.join(self.wrk_folder, 'logs', f'INPUT_FILE_{self.time_stamp}.FiQuS.yaml')}"
71 )
72 subprocess.run(copyInputFile, shell=True, stdout=subprocess.DEVNULL)
73 else:
74 self.fdm: FDM = fdm
75 verbose = self.fdm.run.verbosity_FiQuS
76 self.logger = initialize_logger(
77 verbose=verbose, time_stamp=self.time_stamp, work_folder=self.wrk_folder
78 )
79 if verbose:
80 Util.print_welcome_graphics()
81 # Intialize logger
83 # Create JSON schema
84 create_json_schema(self.fdm)
86 # Check for input errors
87 Check.check_inputs(run=self.fdm.run)
89 # Initialize Main object
90 if self.fdm.magnet.type == "CCT_straight":
91 self.main_magnet = MainCCT(fdm=self.fdm, verbose=verbose)
92 elif self.fdm.magnet.type == "Pancake3D":
93 self.main_magnet = MainPancake3D(fdm=self.fdm, verbose=verbose)
94 elif self.fdm.magnet.type == "CACStrand":
95 self.main_magnet = MainConductorAC_Strand(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose)
96 elif self.fdm.magnet.type == "CACCC":
97 self.main_magnet = MainConductorAC_CC(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose)
98 elif self.fdm.magnet.type == "HomogenizedConductor":
99 self.main_magnet = MainHomogenizedConductor(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose)
100 elif self.fdm.magnet.type == "CACRutherford":
101 self.main_magnet = MainConductorAC_Rutherford(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose)
102 elif self.fdm.magnet.type == "multipole":
103 self.file_name = os.path.basename(input_file_path)[:-5]
104 if not self.fdm.magnet.geometry.geom_file_path:
105 self.fdm.magnet.geometry.geom_file_path = f"{input_file_path[:-5]}.geom"
106 self.main_magnet = MainMultipole(
107 fdm=self.fdm,
108 rgd_path=self.fdm.magnet.geometry.geom_file_path,
109 verbose=verbose,
110 )
112 else:
113 raise ValueError(
114 f"FiQuS does not support magnet type: {self.fdm.magnet.type}!"
115 )
117 # Load user paths for executables and additional files
118 self.logger.info(f'{getpass.getuser()} is running on {platform.platform()}')
119 if not fds:
120 fds = get_data_settings(GetDP_path=GetDP_path)
121 else:
122 fds = get_data_settings(GetDP_path=GetDP_path, settings=fds)
123 self.main_magnet.GetDP_path = fds.GetDP_path
124 self.logger.info(f"{self.main_magnet.GetDP_path} is going to be used for FE solving.")
126 # self.logger.info(gmsh.onelab.run(self.fdm.general.magnet_name, f"{self.main_magnet.settings['GetDP_path']} -info"))
128 # update htcondor csv
129 if htcondor_jobid:
130 base_path_model_files = fds.base_path_model_files
131 htcondor_csv_file = os.path.join(base_path_model_files, "htcondor_run_log.csv")
133 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Running")
135 # Save Model/Geometry/Mesh/Solution folder paths
136 self.save_folders()
138 # Build magnet
139 self.summary = dict.fromkeys(
140 [
141 "SJ",
142 "SICN",
143 "SIGE",
144 "Gamma",
145 "nodes",
146 "solution_time",
147 "overall_error",
148 "minimum_diff",
149 "maximum_diff",
150 ]
151 )
153 try:
154 self.build_magnet()
155 except Exception as e:
156 # update htcondor csv
157 if htcondor_jobid:
158 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Failed")
160 self.logger.error(f"Error: {e}")
161 raise e
162 else:
163 # update htcondor csv
164 if htcondor_jobid:
165 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Finished")
167 def save_folders(self):
168 """
169 Method to make or delete folders of FiQuS
170 :return: Nothing, only does file and folder operation
171 :rtype: None
172 """
173 def _check_and_generate_path(folder_type: str = None):
174 if folder_type == "Geometry":
175 folder = self.wrk_folder
176 elif folder_type == "Mesh":
177 folder = self.main_magnet.geom_folder
178 elif folder_type == "Solution":
179 folder = self.main_magnet.mesh_folder
180 else:
181 raise Exception("Incompatible type.")
183 if getattr(self.fdm.run, folder_type.lower()) is None:
184 # folder_key is not given, so it is computed
185 folder_key = Util.compute_folder_key(
186 folder_type=folder_type,
187 folder=folder,
188 overwrite=self.fdm.run.overwrite,
189 )
190 else:
191 # folder_key is given
192 folder_key = getattr(self.fdm.run, folder_type.lower())
194 required_folder = folder_type in required_folders
195 if self.fdm.run.overwrite and folder_type == (
196 required_folders[0] if required_folders else None
197 ):
198 Check.check_overwrite_conditions(
199 folder_type=folder_type, folder=folder, folder_key=folder_key
200 )
201 return Util.get_folder_path(
202 folder_type=folder_type,
203 folder=folder,
204 folder_key=folder_key,
205 overwrite=self.fdm.run.overwrite,
206 required_folder=required_folder,
207 )
209 if self.fdm.run.type == "start_from_yaml":
210 required_folders = ["Geometry", "Mesh", "Solution"]
211 elif self.fdm.run.type == "geometry_and_mesh":
212 required_folders = ["Geometry", "Mesh"]
213 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python":
214 required_folders = ["Mesh", "Solution"]
215 elif self.fdm.run.type in ["solve_with_post_process_python", "solve_only"]:
216 required_folders = ["Solution"]
217 elif self.fdm.run.type == "geometry_only":
218 required_folders = (
219 []
220 if self.fdm.run.geometry and not self.fdm.run.overwrite
221 else ["Geometry"]
222 )
223 elif self.fdm.run.type == "mesh_only":
224 required_folders = (
225 [] if self.fdm.run.mesh and not self.fdm.run.overwrite else ["Mesh"]
226 )
227 else: # post_process_getdp_only or post_process_python_only or plot_python
228 required_folders = []
232 self.main_magnet.geom_folder = _check_and_generate_path(folder_type="Geometry")
233 if not self.fdm.run.type in ["geometry_only"]:
234 self.main_magnet.mesh_folder = _check_and_generate_path(folder_type="Mesh")
235 if not (
236 self.fdm.run.type == "geometry_only"
237 or self.fdm.run.type == "mesh_only"
238 ):
239 self.main_magnet.solution_folder = _check_and_generate_path(
240 folder_type="Solution"
241 )
243 if self.fdm.run.type in [
244 "start_from_yaml",
245 "geometry_and_mesh",
246 "geometry_only",
247 ]:
248 Util.write_data_model_to_yaml(
249 os.path.join(self.main_magnet.geom_folder, "geometry.yaml"),
250 self.fdm.magnet.geometry,
251 by_alias=True,
252 with_comments=True,
253 )
254 if self.fdm.run.type in [
255 "start_from_yaml",
256 "geometry_and_mesh",
257 "mesh_and_solve_with_post_process_python",
258 "mesh_only",
259 ]:
260 Util.write_data_model_to_yaml(
261 os.path.join(self.main_magnet.mesh_folder, "mesh.yaml"),
262 self.fdm.magnet.mesh,
263 by_alias=True,
264 with_comments=True,
265 )
266 if self.fdm.run.type in [
267 "start_from_yaml",
268 "mesh_and_solve_with_post_process_python",
269 "solve_with_post_process_python",
270 "solve_only",
271 "post_process",
272 "plot_python",
273 "postprocess_veusz"
274 ]:
275 solve_dump_data = SolveDumpDataModel(
276 solve=self.fdm.magnet.solve,
277 circuit=self.fdm.circuit,
278 power_supply=self.fdm.power_supply,
279 quench_protection=self.fdm.quench_protection,
280 quench_detection=self.fdm.quench_detection,
281 conductors=self.fdm.conductors
282 )
283 Util.write_data_model_to_yaml(
284 os.path.join(self.main_magnet.solution_folder, "solve.yaml"),
285 solve_dump_data,
286 by_alias=True,
287 with_comments=True,
288 )
289 if self.fdm.run.type in [
290 "start_from_yaml",
291 "mesh_and_solve_with_post_process_python",
292 "solve_with_post_process_python",
293 "post_process_python_only",
294 "post_process_getdp_only",
295 "post_process",
296 "postprocess_veusz",
297 "plot_python"
298 ]:
299 Util.write_data_model_to_yaml(
300 os.path.join(self.main_magnet.solution_folder, "postproc.yaml"),
301 self.fdm.magnet.postproc,
302 by_alias=True,
303 with_comments=True,
304 )
306 try:
307 run_type = self.fdm.run.type
308 comments = self.fdm.run.comments
309 if self.main_magnet.geom_folder is not None:
310 geo_folder = os.path.relpath(self.main_magnet.geom_folder)
311 geo_folder = os.path.relpath(
312 geo_folder, os.path.join("tests", "_outputs")
313 )
314 else:
315 geo_folder = "-"
317 if self.main_magnet.mesh_folder is not None:
318 mesh_folder = os.path.relpath(self.main_magnet.mesh_folder)
319 mesh_folder = os.path.relpath(
320 mesh_folder, os.path.join("tests", "_outputs")
321 )
322 else:
323 mesh_folder = "-"
325 if self.main_magnet.solution_folder is not None:
326 solution_folder = os.path.relpath(self.main_magnet.solution_folder)
327 solution_folder = os.path.relpath(
328 solution_folder, os.path.join("tests", "_outputs")
329 )
330 else:
331 solution_folder = "-"
333 run_log_row = [
334 self.time_stamp,
335 run_type,
336 comments,
337 geo_folder,
338 mesh_folder,
339 solution_folder,
340 ]
341 self.add_to_run_log(
342 os.path.join(self.wrk_folder, "run_log.csv"), run_log_row
343 )
344 except:
345 self.logger.warning("Run log could not be completed.")
348 def build_magnet(self):
349 """
350 Main method to build magnets, i.e. to run various fiqus run types and magnet types
351 :return: none
352 :rtype: none
353 """
354 if self.fdm.run.type == "start_from_yaml":
355 self.main_magnet.generate_geometry()
356 self.main_magnet.pre_process()
357 self.main_magnet.load_geometry()
358 for key, value in self.main_magnet.mesh().items():
359 self.summary[key] = value
360 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp()
361 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
362 self.summary[key] = value
363 elif self.fdm.run.type == "pre_process_only":
364 self.main_magnet.pre_process()
365 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
366 self.summary[key] = value # todo: DISABLE FOR ONE GROUP ONLY
367 elif self.fdm.run.type == "geometry_only":
368 self.main_magnet.generate_geometry(
369 gui=(self.main_magnet.fdm.run.launch_gui if self.fdm.magnet.type != "CCT_straight" else False)
370 )
371 if self.fdm.magnet.type in ["CCT_straight", "CWS"]:
372 self.main_magnet.pre_process(gui=self.main_magnet.fdm.run.launch_gui)
373 elif self.fdm.run.type == "geometry_and_mesh":
374 self.main_magnet.generate_geometry()
375 self.main_magnet.pre_process()
376 self.main_magnet.load_geometry()
377 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items():
378 self.summary[key] = value
379 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python":
380 self.main_magnet.load_geometry()
381 for key, value in self.main_magnet.mesh().items():
382 self.summary[key] = value
383 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp()
384 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
385 self.summary[key] = value
386 elif self.fdm.run.type == "mesh_only":
387 self.main_magnet.load_geometry()
388 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items():
389 self.summary[key] = value
390 elif self.fdm.run.type == "solve_with_post_process_python":
391 self.summary["solution_time"] = (
392 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui)
393 )
394 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
395 self.summary[key] = value
396 elif self.fdm.run.type == "solve_only":
397 self.summary["solution_time"] = (
398 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui)
399 )
400 elif self.fdm.run.type == "postprocess_veusz":
401 self.main_magnet.post_process_veusz(gui=self.main_magnet.fdm.run.launch_gui)
402 elif self.fdm.run.type == "post_process_getdp_only":
403 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui)
404 elif self.fdm.run.type == "post_process_python_only":
405 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
406 self.summary[key] = value
407 elif self.fdm.run.type == "post_process":
408 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui)
409 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
410 self.summary[key] = value
411 elif self.fdm.run.type == "plot_python":
412 self.main_magnet.plot_python()
414 elif self.fdm.run.type == "batch_post_process_python":
415 self.main_magnet.batch_post_process_python()
416 os.chdir(self.start_folder)
418 if self.file_name:
419 file_path = os.path.join(self.wrk_folder, f"{self.file_name}.json")
420 with open(file_path, 'w', encoding='utf-8') as f:
421 json.dump(self.summary, f, indent=2)
423 @staticmethod
424 def add_to_run_log(path_to_csv, run_log_row):
425 # If file does not exist, write the header
426 if not os.path.isfile(path_to_csv):
427 header = [
428 "Time Stamp",
429 "Run Type",
430 "Comments",
431 "Geometry Directory",
432 "Mesh Directory",
433 "Solution Directory",
434 ]
435 with open(path_to_csv, "a", newline="") as csv_file:
436 writer = csv.writer(csv_file)
437 writer.writerow(header)
439 # Open the CSV file in append mode
440 with open(path_to_csv, "a+", newline="") as csv_file:
441 writer = csv.writer(csv_file)
442 writer.writerow(run_log_row)
444 def change_htcondor_run_log(self, htcondor_csv_file, htcondor_jobid, new_status="None"):
445 try:
446 df = pd.read_csv(htcondor_csv_file)
447 df.loc[df['Job ID'] == htcondor_jobid, 'Status'] = str(new_status)
448 self.logger.info(f"Changed status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.")
449 df.to_csv(htcondor_csv_file, index=False)
450 except:
451 self.logger.warning(f"Could not change status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.")
453if __name__ == "__main__":
454 parser = argparse.ArgumentParser(
455 prog="FiQuS",
456 description="Finite Elements Quench Simulator",
457 epilog="steam-team@cern.ch",
458 )
459 parser.add_argument(
460 dest="full_path_input",
461 type=str,
462 help="Full path to FiQuS input yaml file",
463 )
464 parser.add_argument(
465 "--output", '-o', dest="output_path", type=str, help="Full path to FiQuS output folder"
466 )
467 parser.add_argument(
468 "--getdp", '-g', dest="GetDP_path", type=str, help="Full path to GetDP executable"
469 )
471 parser.add_argument("--htcondor_jobid", '-j', type=int, default=0,
472 help="HTCondor job ID (optional)", required=False)
474 parser.add_argument("--fiqus_data_model", '-m', type=str,
475 help="Full path to FiQuS Data Model file (optional)", required=False)
477 parser.add_argument("--fiqus_data_settings", '-s', type=str,
478 help="Full path to FiQuS Data Settings file (optional)", required=False)
480 args, unknown = parser.parse_known_args()
482 # remove these options from sys.argv, otherwise they are passed onto Gmsh
483 # in Gmsh.initialize()
484 options_to_remove = ["-o", "-g", "-j", "-m", "-s"]
485 # Loop through and remove each option and its value
486 i = 0
487 while i < len(sys.argv):
488 if sys.argv[i] in options_to_remove:
489 sys.argv.pop(i) # Remove the option
490 if i < len(sys.argv):
491 sys.argv.pop(i) # Remove the associated value
492 else:
493 i += 1
495 if args.fiqus_data_model != None and args.fiqus_data_settings != None:
496 # read fdm and fds from a file (HTCondor case)
497 input_fdm = Util.read_data_from_yaml(args.fiqus_data_model, FDM)
498 input_fds = Util.read_data_from_yaml(args.fiqus_data_settings, DataSettings)
500 MainFiQuS(
501 input_file_path=args.full_path_input,
502 model_folder=args.output_path,
503 fdm=input_fdm,
504 fds=input_fds,
505 htcondor_jobid=args.htcondor_jobid
506 )
507 else:
508 # fdm and fds from input (STEAM SDK case)
509 MainFiQuS(
510 input_file_path=args.full_path_input,
511 model_folder=args.output_path,
512 GetDP_path=args.GetDP_path,
513 )
514 print("FiQuS run completed")