Coverage for fiqus/MainFiQuS.py: 66%
245 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-02-01 01:38 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-02-01 01:38 +0000
1import argparse
2import csv
3import os
4import pathlib
5import sys
6import time
7import getpass
8import platform
9import subprocess
10import json
12import pandas as pd
14FiQuS_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
15sys.path.insert(0, FiQuS_path)
17from fiqus.utils.Utils import FilesAndFolders as Util
18from fiqus.utils.Utils import CheckForExceptions as Check
19from fiqus.utils.Utils import create_json_schema
20from fiqus.utils.Utils import get_data_settings
21from fiqus.utils.Utils import initialize_logger
22from fiqus.data.DataFiQuS import FDM, SolveDumpDataModel
23from fiqus.data.DataSettings import DataSettings
24from fiqus.mains.MainCCT import MainCCT
25from fiqus.mains.MainMultipole import MainMultipole
26from fiqus.mains.MainPancake3D import MainPancake3D
27from fiqus.mains.MainConductorAC_Strand import MainConductorAC_Strand
28from fiqus.mains.MainHomogenizedConductor import MainHomogenizedConductor
29from fiqus.mains.MainConductorAC_Rutherford import MainConductorAC_Rutherford
30from fiqus.mains.MainConductorAC_CC import MainConductorAC_CC
32class MainFiQuS:
33 """
34 This is the top level class of FiQuS.
35 """
37 def __init__(
38 self,
39 input_file_path: str = None,
40 model_folder: str = None,
41 GetDP_path=None,
42 fdm=None,
43 fds=None,
44 htcondor_jobid=None
45 ):
46 """
47 Main class for working with FiQuS simulations
48 :param input_file_path: full path to input file yaml
49 :type input_file_path: str
50 :param model_folder: full path to the base output folder, called model folder
51 :type model_folder: str
52 :param GetDP_path: full path to GetDP executable
53 :type GetDP_path: str
54 :param fdm: FiQuS Data Model - object of fiqus DataFiQus
55 :type fdm: object
56 :param fds: FiQuS Data Settings - object of DataSettings
57 :type fds: object
58 """
59 self.time_stamp = time.strftime("%Y-%m-%d-%H-%M-%S")
61 self.start_folder = os.getcwd()
62 self.wrk_folder = model_folder
63 self.file_name = None
65 # Load yaml input file
66 if not fdm:
67 self.fdm: FDM = Util.read_data_from_yaml(input_file_path, FDM)
68 copyInputFile = (
69 "copy"
70 f" {input_file_path} {os.path.join(self.wrk_folder, 'logs', f'INPUT_FILE_{self.time_stamp}.FiQuS.yaml')}"
71 )
72 subprocess.run(copyInputFile, shell=True, stdout=subprocess.DEVNULL)
73 else:
74 self.fdm: FDM = fdm
75 verbose = self.fdm.run.verbosity_FiQuS
76 self.logger = initialize_logger(
77 verbose=verbose, time_stamp=self.time_stamp, work_folder=self.wrk_folder
78 )
79 if verbose:
80 Util.print_welcome_graphics()
81 # Intialize logger
83 # Create JSON schema
84 create_json_schema(self.fdm)
86 # Check for input errors
87 Check.check_inputs(run=self.fdm.run)
89 # Initialize Main object
90 if self.fdm.magnet.type == "CCT_straight":
91 self.main_magnet = MainCCT(fdm=self.fdm, verbose=verbose)
92 elif self.fdm.magnet.type == "Pancake3D":
93 self.main_magnet = MainPancake3D(fdm=self.fdm, verbose=verbose)
94 elif self.fdm.magnet.type == "CACStrand":
95 self.main_magnet = MainConductorAC_Strand(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose)
96 elif self.fdm.magnet.type == "CACCC":
97 self.main_magnet = MainConductorAC_CC(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose)
98 elif self.fdm.magnet.type == "HomogenizedConductor":
99 self.main_magnet = MainHomogenizedConductor(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose)
100 elif self.fdm.magnet.type == "CACRutherford":
101 self.main_magnet = MainConductorAC_Rutherford(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose)
102 elif self.fdm.magnet.type == "multipole":
103 self.file_name = os.path.basename(input_file_path)[:-5]
104 if not self.fdm.magnet.geometry.geom_file_path:
105 self.fdm.magnet.geometry.geom_file_path = f"{input_file_path[:-5]}.geom"
106 self.main_magnet = MainMultipole(fdm=self.fdm,rgd_path=self.fdm.magnet.geometry.geom_file_path,verbose=verbose, inputs_folder_path=pathlib.Path(input_file_path).parent)
107 else:
108 raise ValueError(
109 f"FiQuS does not support magnet type: {self.fdm.magnet.type}!"
110 )
112 # Load user paths for executables and additional files
113 self.logger.info(f'{getpass.getuser()} is running on {platform.platform()}')
114 if not fds:
115 fds = get_data_settings(GetDP_path=GetDP_path)
116 else:
117 fds = get_data_settings(GetDP_path=GetDP_path, settings=fds)
118 self.main_magnet.GetDP_path = fds.GetDP_path
119 self.logger.info(f"{self.main_magnet.GetDP_path} is going to be used for FE solving.")
121 # self.logger.info(gmsh.onelab.run(self.fdm.general.magnet_name, f"{self.main_magnet.settings['GetDP_path']} -info"))
123 # update htcondor csv
124 if htcondor_jobid:
125 base_path_model_files = fds.base_path_model_files
126 htcondor_csv_file = os.path.join(base_path_model_files, "htcondor_run_log.csv")
128 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Running")
130 # Save Model/Geometry/Mesh/Solution folder paths
131 self.save_folders()
133 # Build magnet
134 self.summary = dict.fromkeys(
135 [
136 "SJ",
137 "SICN",
138 "SIGE",
139 "Gamma",
140 "nodes",
141 "solution_time",
142 "overall_error",
143 "minimum_diff",
144 "maximum_diff",
145 ]
146 )
148 try:
149 self.build_magnet()
150 except Exception as e:
151 # update htcondor csv
152 if htcondor_jobid:
153 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Failed")
155 self.logger.error(f"Error: {e}")
156 raise e
157 else:
158 # update htcondor csv
159 if htcondor_jobid:
160 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Finished")
162 def save_folders(self):
163 """
164 Method to make or delete folders of FiQuS
165 :return: Nothing, only does file and folder operation
166 :rtype: None
167 """
168 def _check_and_generate_path(folder_type: str = None):
169 if folder_type == "Geometry":
170 folder = self.wrk_folder
171 elif folder_type == "Mesh":
172 folder = self.main_magnet.geom_folder
173 elif folder_type == "Solution":
174 folder = self.main_magnet.mesh_folder
175 else:
176 raise Exception("Incompatible type.")
178 if getattr(self.fdm.run, folder_type.lower()) is None:
179 # folder_key is not given, so it is computed
180 folder_key = Util.compute_folder_key(
181 folder_type=folder_type,
182 folder=folder,
183 overwrite=self.fdm.run.overwrite,
184 )
185 else:
186 # folder_key is given
187 folder_key = getattr(self.fdm.run, folder_type.lower())
189 required_folder = folder_type in required_folders
190 if self.fdm.run.overwrite and folder_type == (
191 required_folders[0] if required_folders else None
192 ):
193 Check.check_overwrite_conditions(
194 folder_type=folder_type, folder=folder, folder_key=folder_key
195 )
196 return Util.get_folder_path(
197 folder_type=folder_type,
198 folder=folder,
199 folder_key=folder_key,
200 overwrite=self.fdm.run.overwrite,
201 required_folder=required_folder,
202 )
204 if self.fdm.run.type == "start_from_yaml":
205 required_folders = ["Geometry", "Mesh", "Solution"]
206 elif self.fdm.run.type == "geometry_and_mesh":
207 required_folders = ["Geometry", "Mesh"]
208 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python":
209 required_folders = ["Mesh", "Solution"]
210 elif self.fdm.run.type in ["solve_with_post_process_python", "solve_only"]:
211 required_folders = ["Solution"]
212 elif self.fdm.run.type == "geometry_only":
213 required_folders = (
214 []
215 if self.fdm.run.geometry and not self.fdm.run.overwrite
216 else ["Geometry"]
217 )
218 elif self.fdm.run.type == "mesh_only":
219 required_folders = (
220 [] if self.fdm.run.mesh and not self.fdm.run.overwrite else ["Mesh"]
221 )
222 else: # post_process_getdp_only or post_process_python_only or plot_python
223 required_folders = []
227 self.main_magnet.geom_folder = _check_and_generate_path(folder_type="Geometry")
228 if not self.fdm.run.type in ["geometry_only"]:
229 self.main_magnet.mesh_folder = _check_and_generate_path(folder_type="Mesh")
230 if not (
231 self.fdm.run.type == "geometry_only"
232 or self.fdm.run.type == "mesh_only"
233 ):
234 self.main_magnet.solution_folder = _check_and_generate_path(
235 folder_type="Solution"
236 )
238 if self.fdm.run.type in [
239 "start_from_yaml",
240 "geometry_and_mesh",
241 "geometry_only",
242 ]:
243 Util.write_data_model_to_yaml(
244 os.path.join(self.main_magnet.geom_folder, "geometry.yaml"),
245 self.fdm.magnet.geometry,
246 by_alias=True,
247 with_comments=True,
248 )
249 if self.fdm.run.type in [
250 "start_from_yaml",
251 "geometry_and_mesh",
252 "mesh_and_solve_with_post_process_python",
253 "mesh_only",
254 ]:
255 Util.write_data_model_to_yaml(
256 os.path.join(self.main_magnet.mesh_folder, "mesh.yaml"),
257 self.fdm.magnet.mesh,
258 by_alias=True,
259 with_comments=True,
260 )
261 if self.fdm.run.type in [
262 "start_from_yaml",
263 "mesh_and_solve_with_post_process_python",
264 "solve_with_post_process_python",
265 "solve_only",
266 "post_process",
267 "plot_python",
268 "postprocess_veusz"
269 ]:
270 solve_dump_data = SolveDumpDataModel(
271 solve=self.fdm.magnet.solve,
272 circuit=self.fdm.circuit,
273 power_supply=self.fdm.power_supply,
274 quench_protection=self.fdm.quench_protection,
275 quench_detection=self.fdm.quench_detection,
276 conductors=self.fdm.conductors
277 )
278 Util.write_data_model_to_yaml(
279 os.path.join(self.main_magnet.solution_folder, "solve.yaml"),
280 solve_dump_data,
281 by_alias=True,
282 with_comments=True,
283 )
284 if self.fdm.run.type in [
285 "start_from_yaml",
286 "mesh_and_solve_with_post_process_python",
287 "solve_with_post_process_python",
288 "post_process_python_only",
289 "post_process_getdp_only",
290 "post_process",
291 "postprocess_veusz",
292 "plot_python"
293 ]:
294 Util.write_data_model_to_yaml(
295 os.path.join(self.main_magnet.solution_folder, "postproc.yaml"),
296 self.fdm.magnet.postproc,
297 by_alias=True,
298 with_comments=True,
299 )
301 try:
302 run_type = self.fdm.run.type
303 comments = self.fdm.run.comments
304 if self.main_magnet.geom_folder is not None:
305 geo_folder = os.path.relpath(self.main_magnet.geom_folder)
306 geo_folder = os.path.relpath(
307 geo_folder, os.path.join("tests", "_outputs")
308 )
309 else:
310 geo_folder = "-"
312 if self.main_magnet.mesh_folder is not None:
313 mesh_folder = os.path.relpath(self.main_magnet.mesh_folder)
314 mesh_folder = os.path.relpath(
315 mesh_folder, os.path.join("tests", "_outputs")
316 )
317 else:
318 mesh_folder = "-"
320 if self.main_magnet.solution_folder is not None:
321 solution_folder = os.path.relpath(self.main_magnet.solution_folder)
322 solution_folder = os.path.relpath(
323 solution_folder, os.path.join("tests", "_outputs")
324 )
325 else:
326 solution_folder = "-"
328 run_log_row = [
329 self.time_stamp,
330 run_type,
331 comments,
332 geo_folder,
333 mesh_folder,
334 solution_folder,
335 ]
336 self.add_to_run_log(
337 os.path.join(self.wrk_folder, "run_log.csv"), run_log_row
338 )
339 except:
340 self.logger.warning("Run log could not be completed.")
343 def build_magnet(self):
344 """
345 Main method to build magnets, i.e. to run various fiqus run types and magnet types
346 :return: none
347 :rtype: none
348 """
349 if self.fdm.run.type == "start_from_yaml":
350 self.main_magnet.generate_geometry()
351 self.main_magnet.pre_process()
352 self.main_magnet.load_geometry()
353 for key, value in self.main_magnet.mesh().items():
354 self.summary[key] = value
355 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp()
356 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
357 self.summary[key] = value
358 elif self.fdm.run.type == "pre_process_only":
359 self.main_magnet.pre_process()
360 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
361 self.summary[key] = value # todo: DISABLE FOR ONE GROUP ONLY
362 elif self.fdm.run.type == "geometry_only":
363 self.main_magnet.generate_geometry(
364 gui=(self.main_magnet.fdm.run.launch_gui if self.fdm.magnet.type != "CCT_straight" else False)
365 )
366 if self.fdm.magnet.type in ["CCT_straight", "CWS"]:
367 self.main_magnet.pre_process(gui=self.main_magnet.fdm.run.launch_gui)
368 elif self.fdm.run.type == "geometry_and_mesh":
369 self.main_magnet.generate_geometry()
370 self.main_magnet.pre_process()
371 self.main_magnet.load_geometry()
372 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items():
373 self.summary[key] = value
374 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python":
375 self.main_magnet.load_geometry()
376 for key, value in self.main_magnet.mesh().items():
377 self.summary[key] = value
378 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp()
379 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
380 self.summary[key] = value
381 elif self.fdm.run.type == "mesh_only":
382 self.main_magnet.load_geometry()
383 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items():
384 self.summary[key] = value
385 elif self.fdm.run.type == "solve_with_post_process_python":
386 self.summary["solution_time"] = (
387 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui)
388 )
389 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
390 self.summary[key] = value
391 elif self.fdm.run.type == "solve_only":
392 self.summary["solution_time"] = (
393 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui)
394 )
395 elif self.fdm.run.type == "post_process_getdp_only":
396 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui)
397 elif self.fdm.run.type == "post_process_python_only":
398 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
399 self.summary[key] = value
400 elif self.fdm.run.type == "post_process":
401 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui)
402 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
403 self.summary[key] = value
404 elif self.fdm.run.type == "plot_python":
405 self.main_magnet.plot_python()
407 elif self.fdm.run.type == "batch_post_process_python":
408 self.main_magnet.batch_post_process_python()
409 os.chdir(self.start_folder)
411 if self.file_name:
412 file_path = os.path.join(self.wrk_folder, f"{self.file_name}.json")
413 with open(file_path, 'w', encoding='utf-8') as f:
414 json.dump(self.summary, f, indent=2)
416 @staticmethod
417 def add_to_run_log(path_to_csv, run_log_row):
418 # If file does not exist, write the header
419 if not os.path.isfile(path_to_csv):
420 header = [
421 "Time Stamp",
422 "Run Type",
423 "Comments",
424 "Geometry Directory",
425 "Mesh Directory",
426 "Solution Directory",
427 ]
428 with open(path_to_csv, "a", newline="") as csv_file:
429 writer = csv.writer(csv_file)
430 writer.writerow(header)
432 # Open the CSV file in append mode
433 with open(path_to_csv, "a+", newline="") as csv_file:
434 writer = csv.writer(csv_file)
435 writer.writerow(run_log_row)
437 def change_htcondor_run_log(self, htcondor_csv_file, htcondor_jobid, new_status="None"):
438 try:
439 df = pd.read_csv(htcondor_csv_file)
440 df.loc[df['Job ID'] == htcondor_jobid, 'Status'] = str(new_status)
441 self.logger.info(f"Changed status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.")
442 df.to_csv(htcondor_csv_file, index=False)
443 except:
444 self.logger.warning(f"Could not change status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.")
446if __name__ == "__main__":
447 parser = argparse.ArgumentParser(
448 prog="FiQuS",
449 description="Finite Elements Quench Simulator",
450 epilog="steam-team@cern.ch",
451 )
452 parser.add_argument(
453 dest="full_path_input",
454 type=str,
455 help="Full path to FiQuS input yaml file",
456 )
457 parser.add_argument(
458 "--output", '-o', dest="output_path", type=str, help="Full path to FiQuS output folder"
459 )
460 parser.add_argument(
461 "--getdp", '-g', dest="GetDP_path", type=str, help="Full path to GetDP executable"
462 )
464 parser.add_argument("--htcondor_jobid", '-j', type=int, default=0,
465 help="HTCondor job ID (optional)", required=False)
467 parser.add_argument("--fiqus_data_model", '-m', type=str,
468 help="Full path to FiQuS Data Model file (optional)", required=False)
470 parser.add_argument("--fiqus_data_settings", '-s', type=str,
471 help="Full path to FiQuS Data Settings file (optional)", required=False)
473 args, unknown = parser.parse_known_args()
475 # remove these options from sys.argv, otherwise they are passed onto Gmsh
476 # in Gmsh.initialize()
477 options_to_remove = ["-o", "-g", "-j", "-m", "-s"]
478 # Loop through and remove each option and its value
479 i = 0
480 while i < len(sys.argv):
481 if sys.argv[i] in options_to_remove:
482 sys.argv.pop(i) # Remove the option
483 if i < len(sys.argv):
484 sys.argv.pop(i) # Remove the associated value
485 else:
486 i += 1
488 if args.fiqus_data_model != None and args.fiqus_data_settings != None:
489 # read fdm and fds from a file (HTCondor case)
490 input_fdm = Util.read_data_from_yaml(args.fiqus_data_model, FDM)
491 input_fds = Util.read_data_from_yaml(args.fiqus_data_settings, DataSettings)
493 MainFiQuS(
494 input_file_path=args.full_path_input,
495 model_folder=args.output_path,
496 fdm=input_fdm,
497 fds=input_fds,
498 htcondor_jobid=args.htcondor_jobid
499 )
500 else:
501 # fdm and fds from input (STEAM SDK case)
502 MainFiQuS(
503 input_file_path=args.full_path_input,
504 model_folder=args.output_path,
505 GetDP_path=args.GetDP_path,
506 )
507 print("FiQuS run completed")