Coverage for fiqus/MainFiQuS.py: 65%
244 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-12-19 01:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-12-19 01:48 +0000
1import argparse
2import csv
3import os
4import pathlib
5import sys
6import time
7import getpass
8import platform
9import subprocess
10import json
12import pandas as pd
14FiQuS_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
15sys.path.insert(0, FiQuS_path)
17from fiqus.utils.Utils import FilesAndFolders as Util
18from fiqus.utils.Utils import CheckForExceptions as Check
19from fiqus.utils.Utils import create_json_schema
20from fiqus.utils.Utils import get_data_settings
21from fiqus.utils.Utils import initialize_logger
22from fiqus.data.DataFiQuS import FDM, SolveDumpDataModel
23from fiqus.data.DataSettings import DataSettings
24from fiqus.mains.MainCCT import MainCCT
25from fiqus.mains.MainMultipole import MainMultipole
26from fiqus.mains.MainPancake3D import MainPancake3D
27from fiqus.mains.MainConductorAC_Strand import MainConductorAC_Strand
28from fiqus.mains.MainHomogenizedConductor import MainHomogenizedConductor
29from fiqus.mains.MainConductorAC_Rutherford import MainConductorAC_Rutherford
31class MainFiQuS:
32 """
33 This is the top level class of FiQuS.
34 """
36 def __init__(
37 self,
38 input_file_path: str = None,
39 model_folder: str = None,
40 GetDP_path=None,
41 fdm=None,
42 fds=None,
43 htcondor_jobid=None
44 ):
45 """
46 Main class for working with FiQuS simulations
47 :param input_file_path: full path to input file yaml
48 :type input_file_path: str
49 :param model_folder: full path to the base output folder, called model folder
50 :type model_folder: str
51 :param GetDP_path: full path to GetDP executable
52 :type GetDP_path: str
53 :param fdm: FiQuS Data Model - object of fiqus DataFiQus
54 :type fdm: object
55 :param fds: FiQuS Data Settings - object of DataSettings
56 :type fds: object
57 """
58 self.time_stamp = time.strftime("%Y-%m-%d-%H-%M-%S")
60 self.start_folder = os.getcwd()
61 self.wrk_folder = model_folder
62 self.file_name = None
64 # Load yaml input file
65 if not fdm:
66 self.fdm: FDM = Util.read_data_from_yaml(input_file_path, FDM)
67 copyInputFile = (
68 "copy"
69 f" {input_file_path} {os.path.join(self.wrk_folder, 'logs', f'INPUT_FILE_{self.time_stamp}.FiQuS.yaml')}"
70 )
71 subprocess.run(copyInputFile, shell=True, stdout=subprocess.DEVNULL)
72 else:
73 self.fdm: FDM = fdm
74 verbose = self.fdm.run.verbosity_FiQuS
75 self.logger = initialize_logger(
76 verbose=verbose, time_stamp=self.time_stamp, work_folder=self.wrk_folder
77 )
78 if verbose:
79 Util.print_welcome_graphics()
80 # Intialize logger
82 # Create JSON schema
83 create_json_schema(self.fdm)
85 # Check for input errors
86 Check.check_inputs(run=self.fdm.run)
88 # Initialize Main object
89 if self.fdm.magnet.type == "CCT_straight":
90 self.main_magnet = MainCCT(fdm=self.fdm, verbose=verbose)
91 elif self.fdm.magnet.type == "Pancake3D":
92 self.main_magnet = MainPancake3D(fdm=self.fdm, verbose=verbose)
93 elif self.fdm.magnet.type == "CACStrand":
94 self.main_magnet = MainConductorAC_Strand(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose)
95 elif self.fdm.magnet.type == "HomogenizedConductor":
96 self.main_magnet = MainHomogenizedConductor(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose)
97 elif self.fdm.magnet.type == "CACRutherford":
98 self.main_magnet = MainConductorAC_Rutherford(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose)
99 elif self.fdm.magnet.type == "multipole":
100 self.file_name = os.path.basename(input_file_path)[:-5]
101 if not self.fdm.magnet.geometry.geom_file_path:
102 self.fdm.magnet.geometry.geom_file_path = f"{input_file_path[:-5]}.geom"
103 self.main_magnet = MainMultipole(
104 fdm=self.fdm,
105 rgd_path=self.fdm.magnet.geometry.geom_file_path,
106 verbose=verbose,
107 )
109 else:
110 raise ValueError(
111 f"FiQuS does not support magnet type: {self.fdm.magnet.type}!"
112 )
114 # Load user paths for executables and additional files
115 self.logger.info(f'{getpass.getuser()} is running on {platform.platform()}')
116 if not fds:
117 fds = get_data_settings(GetDP_path=GetDP_path)
118 else:
119 fds = get_data_settings(GetDP_path=GetDP_path, settings=fds)
120 self.main_magnet.GetDP_path = fds.GetDP_path
121 self.logger.info(f"{self.main_magnet.GetDP_path} is going to be used for FE solving.")
123 # self.logger.info(gmsh.onelab.run(self.fdm.general.magnet_name, f"{self.main_magnet.settings['GetDP_path']} -info"))
125 # update htcondor csv
126 if htcondor_jobid:
127 base_path_model_files = fds.base_path_model_files
128 htcondor_csv_file = os.path.join(base_path_model_files, "htcondor_run_log.csv")
130 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Running")
132 # Save Model/Geometry/Mesh/Solution folder paths
133 self.save_folders()
135 # Build magnet
136 self.summary = dict.fromkeys(
137 [
138 "SJ",
139 "SICN",
140 "SIGE",
141 "Gamma",
142 "nodes",
143 "solution_time",
144 "overall_error",
145 "minimum_diff",
146 "maximum_diff",
147 ]
148 )
150 try:
151 self.build_magnet()
152 except Exception as e:
153 # update htcondor csv
154 if htcondor_jobid:
155 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Failed")
157 self.logger.error(f"Error: {e}")
158 raise e
159 else:
160 # update htcondor csv
161 if htcondor_jobid:
162 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Finished")
164 def save_folders(self):
165 """
166 Method to make or delete folders of FiQuS
167 :return: Nothing, only does file and folder operation
168 :rtype: None
169 """
170 def _check_and_generate_path(folder_type: str = None):
171 if folder_type == "Geometry":
172 folder = self.wrk_folder
173 elif folder_type == "Mesh":
174 folder = self.main_magnet.geom_folder
175 elif folder_type == "Solution":
176 folder = self.main_magnet.mesh_folder
177 else:
178 raise Exception("Incompatible type.")
180 if getattr(self.fdm.run, folder_type.lower()) is None:
181 # folder_key is not given, so it is computed
182 folder_key = Util.compute_folder_key(
183 folder_type=folder_type,
184 folder=folder,
185 overwrite=self.fdm.run.overwrite,
186 )
187 else:
188 # folder_key is given
189 folder_key = getattr(self.fdm.run, folder_type.lower())
191 required_folder = folder_type in required_folders
192 if self.fdm.run.overwrite and folder_type == (
193 required_folders[0] if required_folders else None
194 ):
195 Check.check_overwrite_conditions(
196 folder_type=folder_type, folder=folder, folder_key=folder_key
197 )
198 return Util.get_folder_path(
199 folder_type=folder_type,
200 folder=folder,
201 folder_key=folder_key,
202 overwrite=self.fdm.run.overwrite,
203 required_folder=required_folder,
204 )
206 if self.fdm.run.type == "start_from_yaml":
207 required_folders = ["Geometry", "Mesh", "Solution"]
208 elif self.fdm.run.type == "geometry_and_mesh":
209 required_folders = ["Geometry", "Mesh"]
210 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python":
211 required_folders = ["Mesh", "Solution"]
212 elif self.fdm.run.type in ["solve_with_post_process_python", "solve_only"]:
213 required_folders = ["Solution"]
214 elif self.fdm.run.type == "geometry_only":
215 required_folders = (
216 []
217 if self.fdm.run.geometry and not self.fdm.run.overwrite
218 else ["Geometry"]
219 )
220 elif self.fdm.run.type == "mesh_only":
221 required_folders = (
222 [] if self.fdm.run.mesh and not self.fdm.run.overwrite else ["Mesh"]
223 )
224 else: # post_process_getdp_only or post_process_python_only or plot_python
225 required_folders = []
229 self.main_magnet.geom_folder = _check_and_generate_path(folder_type="Geometry")
230 if not self.fdm.run.type in ["geometry_only"]:
231 self.main_magnet.mesh_folder = _check_and_generate_path(folder_type="Mesh")
232 if not (
233 self.fdm.run.type == "geometry_only"
234 or self.fdm.run.type == "mesh_only"
235 ):
236 self.main_magnet.solution_folder = _check_and_generate_path(
237 folder_type="Solution"
238 )
240 if self.fdm.run.type in [
241 "start_from_yaml",
242 "geometry_and_mesh",
243 "geometry_only",
244 ]:
245 Util.write_data_model_to_yaml(
246 os.path.join(self.main_magnet.geom_folder, "geometry.yaml"),
247 self.fdm.magnet.geometry,
248 by_alias=True,
249 with_comments=True,
250 )
251 if self.fdm.run.type in [
252 "start_from_yaml",
253 "geometry_and_mesh",
254 "mesh_and_solve_with_post_process_python",
255 "mesh_only",
256 ]:
257 Util.write_data_model_to_yaml(
258 os.path.join(self.main_magnet.mesh_folder, "mesh.yaml"),
259 self.fdm.magnet.mesh,
260 by_alias=True,
261 with_comments=True,
262 )
263 if self.fdm.run.type in [
264 "start_from_yaml",
265 "mesh_and_solve_with_post_process_python",
266 "solve_with_post_process_python",
267 "solve_only",
268 "post_process",
269 "plot_python",
270 "postprocess_veusz"
271 ]:
272 solve_dump_data = SolveDumpDataModel(
273 solve=self.fdm.magnet.solve,
274 circuit=self.fdm.circuit,
275 power_supply=self.fdm.power_supply,
276 quench_protection=self.fdm.quench_protection,
277 quench_detection=self.fdm.quench_detection,
278 conductors=self.fdm.conductors
279 )
280 Util.write_data_model_to_yaml(
281 os.path.join(self.main_magnet.solution_folder, "solve.yaml"),
282 solve_dump_data,
283 by_alias=True,
284 with_comments=True,
285 )
286 if self.fdm.run.type in [
287 "start_from_yaml",
288 "mesh_and_solve_with_post_process_python",
289 "solve_with_post_process_python",
290 "post_process_python_only",
291 "post_process_getdp_only",
292 "post_process",
293 "postprocess_veusz",
294 "plot_python"
295 ]:
296 Util.write_data_model_to_yaml(
297 os.path.join(self.main_magnet.solution_folder, "postproc.yaml"),
298 self.fdm.magnet.postproc,
299 by_alias=True,
300 with_comments=True,
301 )
303 try:
304 run_type = self.fdm.run.type
305 comments = self.fdm.run.comments
306 if self.main_magnet.geom_folder is not None:
307 geo_folder = os.path.relpath(self.main_magnet.geom_folder)
308 geo_folder = os.path.relpath(
309 geo_folder, os.path.join("tests", "_outputs")
310 )
311 else:
312 geo_folder = "-"
314 if self.main_magnet.mesh_folder is not None:
315 mesh_folder = os.path.relpath(self.main_magnet.mesh_folder)
316 mesh_folder = os.path.relpath(
317 mesh_folder, os.path.join("tests", "_outputs")
318 )
319 else:
320 mesh_folder = "-"
322 if self.main_magnet.solution_folder is not None:
323 solution_folder = os.path.relpath(self.main_magnet.solution_folder)
324 solution_folder = os.path.relpath(
325 solution_folder, os.path.join("tests", "_outputs")
326 )
327 else:
328 solution_folder = "-"
330 run_log_row = [
331 self.time_stamp,
332 run_type,
333 comments,
334 geo_folder,
335 mesh_folder,
336 solution_folder,
337 ]
338 self.add_to_run_log(
339 os.path.join(self.wrk_folder, "run_log.csv"), run_log_row
340 )
341 except:
342 self.logger.warning("Run log could not be completed.")
345 def build_magnet(self):
346 """
347 Main method to build magnets, i.e. to run various fiqus run types and magnet types
348 :return: none
349 :rtype: none
350 """
351 if self.fdm.run.type == "start_from_yaml":
352 self.main_magnet.generate_geometry()
353 self.main_magnet.pre_process()
354 self.main_magnet.load_geometry()
355 for key, value in self.main_magnet.mesh().items():
356 self.summary[key] = value
357 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp()
358 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
359 self.summary[key] = value
360 elif self.fdm.run.type == "pre_process_only":
361 self.main_magnet.pre_process()
362 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
363 self.summary[key] = value # todo: DISABLE FOR ONE GROUP ONLY
364 elif self.fdm.run.type == "geometry_only":
365 self.main_magnet.generate_geometry(
366 gui=(self.main_magnet.fdm.run.launch_gui if self.fdm.magnet.type != "CCT_straight" else False)
367 )
368 if self.fdm.magnet.type in ["CCT_straight", "CWS"]:
369 self.main_magnet.pre_process(gui=self.main_magnet.fdm.run.launch_gui)
370 elif self.fdm.run.type == "geometry_and_mesh":
371 self.main_magnet.generate_geometry()
372 self.main_magnet.pre_process()
373 self.main_magnet.load_geometry()
374 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items():
375 self.summary[key] = value
376 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python":
377 self.main_magnet.load_geometry()
378 for key, value in self.main_magnet.mesh().items():
379 self.summary[key] = value
380 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp()
381 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
382 self.summary[key] = value
383 elif self.fdm.run.type == "mesh_only":
384 self.main_magnet.load_geometry()
385 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items():
386 self.summary[key] = value
387 elif self.fdm.run.type == "solve_with_post_process_python":
388 self.summary["solution_time"] = (
389 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui)
390 )
391 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
392 self.summary[key] = value
393 elif self.fdm.run.type == "solve_only":
394 self.summary["solution_time"] = (
395 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui)
396 )
397 elif self.fdm.run.type == "postprocess_veusz":
398 self.main_magnet.post_process_veusz(gui=self.main_magnet.fdm.run.launch_gui)
399 elif self.fdm.run.type == "post_process_getdp_only":
400 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui)
401 elif self.fdm.run.type == "post_process_python_only":
402 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
403 self.summary[key] = value
404 elif self.fdm.run.type == "post_process":
405 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui)
406 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
407 self.summary[key] = value
408 elif self.fdm.run.type == "plot_python":
409 self.main_magnet.plot_python()
411 elif self.fdm.run.type == "batch_post_process_python":
412 self.main_magnet.batch_post_process_python()
413 os.chdir(self.start_folder)
415 if self.file_name:
416 file_path = os.path.join(self.wrk_folder, f"{self.file_name}.json")
417 with open(file_path, 'w', encoding='utf-8') as f:
418 json.dump(self.summary, f, indent=2)
420 @staticmethod
421 def add_to_run_log(path_to_csv, run_log_row):
422 # If file does not exist, write the header
423 if not os.path.isfile(path_to_csv):
424 header = [
425 "Time Stamp",
426 "Run Type",
427 "Comments",
428 "Geometry Directory",
429 "Mesh Directory",
430 "Solution Directory",
431 ]
432 with open(path_to_csv, "a", newline="") as csv_file:
433 writer = csv.writer(csv_file)
434 writer.writerow(header)
436 # Open the CSV file in append mode
437 with open(path_to_csv, "a+", newline="") as csv_file:
438 writer = csv.writer(csv_file)
439 writer.writerow(run_log_row)
441 def change_htcondor_run_log(self, htcondor_csv_file, htcondor_jobid, new_status="None"):
442 try:
443 df = pd.read_csv(htcondor_csv_file)
444 df.loc[df['Job ID'] == htcondor_jobid, 'Status'] = str(new_status)
445 self.logger.info(f"Changed status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.")
446 df.to_csv(htcondor_csv_file, index=False)
447 except:
448 self.logger.warning(f"Could not change status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.")
450if __name__ == "__main__":
451 parser = argparse.ArgumentParser(
452 prog="FiQuS",
453 description="Finite Elements Quench Simulator",
454 epilog="steam-team@cern.ch",
455 )
456 parser.add_argument(
457 dest="full_path_input",
458 type=str,
459 help="Full path to FiQuS input yaml file",
460 )
461 parser.add_argument(
462 "--output", '-o', dest="output_path", type=str, help="Full path to FiQuS output folder"
463 )
464 parser.add_argument(
465 "--getdp", '-g', dest="GetDP_path", type=str, help="Full path to GetDP executable"
466 )
468 parser.add_argument("--htcondor_jobid", '-j', type=int, default=0,
469 help="HTCondor job ID (optional)", required=False)
471 parser.add_argument("--fiqus_data_model", '-m', type=str,
472 help="Full path to FiQuS Data Model file (optional)", required=False)
474 parser.add_argument("--fiqus_data_settings", '-s', type=str,
475 help="Full path to FiQuS Data Settings file (optional)", required=False)
477 args, unknown = parser.parse_known_args()
479 # remove these options from sys.argv, otherwise they are passed onto Gmsh
480 # in Gmsh.initialize()
481 options_to_remove = ["-o", "-g", "-j", "-m", "-s"]
482 # Loop through and remove each option and its value
483 i = 0
484 while i < len(sys.argv):
485 if sys.argv[i] in options_to_remove:
486 sys.argv.pop(i) # Remove the option
487 if i < len(sys.argv):
488 sys.argv.pop(i) # Remove the associated value
489 else:
490 i += 1
492 if args.fiqus_data_model != None and args.fiqus_data_settings != None:
493 # read fdm and fds from a file (HTCondor case)
494 input_fdm = Util.read_data_from_yaml(args.fiqus_data_model, FDM)
495 input_fds = Util.read_data_from_yaml(args.fiqus_data_settings, DataSettings)
497 MainFiQuS(
498 input_file_path=args.full_path_input,
499 model_folder=args.output_path,
500 fdm=input_fdm,
501 fds=input_fds,
502 htcondor_jobid=args.htcondor_jobid
503 )
504 else:
505 # fdm and fds from input (STEAM SDK case)
506 MainFiQuS(
507 input_file_path=args.full_path_input,
508 model_folder=args.output_path,
509 GetDP_path=args.GetDP_path,
510 )
511 print("FiQuS run completed")