Coverage for fiqus/MainFiQuS.py: 63%
241 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-14 02:37 +0100
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-14 02:37 +0100
1import argparse
2import csv
3import os
4import pathlib
5import sys
6import time
7import getpass
8import platform
9import subprocess
10import json
12import pandas as pd
14FiQuS_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
15sys.path.insert(0, FiQuS_path)
17from fiqus.utils.Utils import FilesAndFolders as Util
18from fiqus.utils.Utils import CheckForExceptions as Check
19from fiqus.utils.Utils import create_json_schema
20from fiqus.utils.Utils import get_data_settings
21from fiqus.utils.Utils import initialize_logger
22from fiqus.data.DataFiQuS import FDM
23from fiqus.data.DataSettings import DataSettings
24from fiqus.mains.MainCCT import MainCCT
25from fiqus.mains.MainMultipole import MainMultipole
26from fiqus.mains.MainPancake3D import MainPancake3D
27from fiqus.mains.MainConductorAC_Strand import MainConductorAC_Strand
29class MainFiQuS:
30 """
31 This is the top level class of FiQuS.
32 """
34 def __init__(
35 self,
36 input_file_path: str = None,
37 model_folder: str = None,
38 GetDP_path=None,
39 fdm=None,
40 fds=None,
41 htcondor_jobid=None
42 ):
43 """
44 Main class for working with FiQuS simulations
45 :param input_file_path: full path to input file yaml
46 :type input_file_path: str
47 :param model_folder: full path to the base output folder, called model folder
48 :type model_folder: str
49 :param GetDP_path: full path to GetDP executable
50 :type GetDP_path: str
51 :param fdm: FiQuS Data Model - object of fiqus DataFiQus
52 :type fdm: object
53 :param fds: FiQuS Data Settings - object of DataSettings
54 :type fds: object
55 """
56 self.time_stamp = time.strftime("%Y-%m-%d-%H-%M-%S")
58 self.start_folder = os.getcwd()
59 self.wrk_folder = model_folder
60 self.file_name = None
62 # Load yaml input file
63 if not fdm:
64 self.fdm: FDM = Util.read_data_from_yaml(input_file_path, FDM)
65 copyInputFile = (
66 "copy"
67 f" {input_file_path} {os.path.join(self.wrk_folder, 'logs', f'INPUT_FILE_{self.time_stamp}.FiQuS.yaml')}"
68 )
69 subprocess.run(copyInputFile, shell=True, stdout=subprocess.DEVNULL)
70 else:
71 self.fdm: FDM = fdm
72 verbose = self.fdm.run.verbosity_FiQuS
73 self.logger = initialize_logger(
74 verbose=verbose, time_stamp=self.time_stamp, work_folder=self.wrk_folder
75 )
76 if verbose:
77 Util.print_welcome_graphics()
78 # Intialize logger
80 # Create JSON schema
81 create_json_schema(self.fdm)
83 # Check for input errors
84 Check.check_inputs(run=self.fdm.run)
86 # Initialize Main object
87 if self.fdm.magnet.type == "CCT_straight":
88 self.main_magnet = MainCCT(fdm=self.fdm, verbose=verbose)
89 elif self.fdm.magnet.type == "CWS":
90 self.main_magnet = MainCWS(
91 fdm=self.fdm,
92 inputs_folder_path=pathlib.Path(input_file_path).parent,
93 verbose=self.fdm.run.verbosity_FiQuS,
94 )
95 elif self.fdm.magnet.type == "Pancake3D":
96 self.main_magnet = MainPancake3D(fdm=self.fdm, verbose=verbose)
97 elif self.fdm.magnet.type == "CACStrand":
98 self.main_magnet = MainConductorAC_Strand(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose)
99 elif self.fdm.magnet.type == "CACRutherford":
100 self.main_magnet = MainConductorAC_Rutherford(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose)
101 elif self.fdm.magnet.type == "Racetrack":
102 self.main_magnet = MainRacetrack(fdm=self.fdm, verbose=verbose)
103 elif self.fdm.magnet.type == "Racetrack3D":
104 self.main_magnet = MainRacetrack3D(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose)
105 elif self.fdm.magnet.type == "multipole":
106 self.file_name = os.path.basename(input_file_path)[:-5]
107 if not self.fdm.magnet.geometry.geom_file_path:
108 self.fdm.magnet.geometry.geom_file_path = f"{input_file_path[:-5]}.geom"
109 self.main_magnet = MainMultipole(
110 fdm=self.fdm,
111 rgd_path=self.fdm.magnet.geometry.geom_file_path,
112 verbose=verbose,
113 )
115 else:
116 raise ValueError(
117 f"FiQuS does not support magnet type: {self.fdm.magnet.type}!"
118 )
120 # Load user paths for executables and additional files
121 self.logger.info(f'{getpass.getuser()} is running on {platform.platform()}')
122 if not fds:
123 fds = get_data_settings(GetDP_path=GetDP_path)
124 else:
125 fds = get_data_settings(GetDP_path=GetDP_path, settings=fds)
126 self.main_magnet.GetDP_path = fds.GetDP_path
127 self.logger.info(f"{self.main_magnet.GetDP_path} is going to be used for FE solving.")
129 # self.logger.info(gmsh.onelab.run(self.fdm.general.magnet_name, f"{self.main_magnet.settings['GetDP_path']} -info"))
131 # update htcondor csv
132 if htcondor_jobid:
133 base_path_model_files = fds.base_path_model_files
134 htcondor_csv_file = os.path.join(base_path_model_files, "htcondor_run_log.csv")
136 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Running")
138 # Save Model/Geometry/Mesh/Solution folder paths
139 self.save_folders()
141 # Build magnet
142 self.summary = dict.fromkeys(
143 [
144 "SJ",
145 "SICN",
146 "SIGE",
147 "Gamma",
148 "nodes",
149 "solution_time",
150 "overall_error",
151 "minimum_diff",
152 "maximum_diff",
153 ]
154 )
156 try:
157 self.build_magnet()
158 except Exception as e:
159 # update htcondor csv
160 if htcondor_jobid:
161 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Failed")
163 self.logger.error(f"Error: {e}")
164 raise e
165 else:
166 # update htcondor csv
167 if htcondor_jobid:
168 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Finished")
170 def save_folders(self):
171 """
172 Method to make or delete folders of FiQuS
173 :return: Nothing, only does file and folder operation
174 :rtype: None
175 """
176 def _check_and_generate_path(folder_type: str = None):
177 if folder_type == "Geometry":
178 folder = self.wrk_folder
179 elif folder_type == "Mesh":
180 folder = self.main_magnet.geom_folder
181 elif folder_type == "Solution":
182 folder = self.main_magnet.mesh_folder
183 else:
184 raise Exception("Incompatible type.")
186 if getattr(self.fdm.run, folder_type.lower()) is None:
187 # folder_key is not given, so it is computed
188 folder_key = Util.compute_folder_key(
189 folder_type=folder_type,
190 folder=folder,
191 overwrite=self.fdm.run.overwrite,
192 )
193 else:
194 # folder_key is given
195 folder_key = getattr(self.fdm.run, folder_type.lower())
197 required_folder = folder_type in required_folders
198 if self.fdm.run.overwrite and folder_type == (
199 required_folders[0] if required_folders else None
200 ):
201 Check.check_overwrite_conditions(
202 folder_type=folder_type, folder=folder, folder_key=folder_key
203 )
204 return Util.get_folder_path(
205 folder_type=folder_type,
206 folder=folder,
207 folder_key=folder_key,
208 overwrite=self.fdm.run.overwrite,
209 required_folder=required_folder,
210 )
212 if self.fdm.run.type == "start_from_yaml":
213 required_folders = ["Geometry", "Mesh", "Solution"]
214 elif self.fdm.run.type == "geometry_and_mesh":
215 required_folders = ["Geometry", "Mesh"]
216 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python":
217 required_folders = ["Mesh", "Solution"]
218 elif self.fdm.run.type in ["solve_with_post_process_python", "solve_only"]:
219 required_folders = ["Solution"]
220 elif self.fdm.run.type == "geometry_only":
221 required_folders = (
222 []
223 if self.fdm.run.geometry and not self.fdm.run.overwrite
224 else ["Geometry"]
225 )
226 elif self.fdm.run.type == "mesh_only":
227 required_folders = (
228 [] if self.fdm.run.mesh and not self.fdm.run.overwrite else ["Mesh"]
229 )
230 else: # post_process_getdp_only or post_process_python_only or plot_python
231 required_folders = []
233 fdm = self.main_magnet.fdm.magnet
235 self.main_magnet.geom_folder = _check_and_generate_path(folder_type="Geometry")
236 if not self.fdm.run.type in ["geometry_only"]:
237 self.main_magnet.mesh_folder = _check_and_generate_path(folder_type="Mesh")
238 if not (
239 self.fdm.run.type == "geometry_only"
240 or self.fdm.run.type == "mesh_only"
241 ):
242 self.main_magnet.solution_folder = _check_and_generate_path(
243 folder_type="Solution"
244 )
246 if self.fdm.run.type in [
247 "start_from_yaml",
248 "geometry_and_mesh",
249 "geometry_only",
250 ]:
251 Util.write_data_model_to_yaml(
252 os.path.join(self.main_magnet.geom_folder, "geometry.yaml"),
253 fdm.geometry,
254 by_alias=True,
255 with_comments=True,
256 )
257 if self.fdm.run.type in [
258 "start_from_yaml",
259 "geometry_and_mesh",
260 "mesh_and_solve_with_post_process_python",
261 "mesh_only",
262 ]:
263 Util.write_data_model_to_yaml(
264 os.path.join(self.main_magnet.mesh_folder, "mesh.yaml"),
265 fdm.mesh,
266 by_alias=True,
267 with_comments=True,
268 )
269 if self.fdm.run.type in [
270 "start_from_yaml",
271 "mesh_and_solve_with_post_process_python",
272 "solve_with_post_process_python",
273 "solve_only",
274 "post_process",
275 "plot_python"
276 ]:
277 Util.write_data_model_to_yaml(
278 os.path.join(self.main_magnet.solution_folder, "solve.yaml"),
279 fdm.solve,
280 by_alias=True,
281 with_comments=True,
282 )
283 if self.fdm.run.type in [
284 "start_from_yaml",
285 "mesh_and_solve_with_post_process_python",
286 "solve_with_post_process_python",
287 "post_process_python_only",
288 "post_process_getdp_only",
289 "post_process",
290 "plot_python"
291 ]:
292 Util.write_data_model_to_yaml(
293 os.path.join(self.main_magnet.solution_folder, "postproc.yaml"),
294 fdm.postproc,
295 by_alias=True,
296 with_comments=True,
297 )
299 try:
300 run_type = self.fdm.run.type
301 comments = self.fdm.run.comments
302 if self.main_magnet.geom_folder is not None:
303 geo_folder = os.path.relpath(self.main_magnet.geom_folder)
304 geo_folder = os.path.relpath(
305 geo_folder, os.path.join("tests", "_outputs")
306 )
307 else:
308 geo_folder = "-"
310 if self.main_magnet.mesh_folder is not None:
311 mesh_folder = os.path.relpath(self.main_magnet.mesh_folder)
312 mesh_folder = os.path.relpath(
313 mesh_folder, os.path.join("tests", "_outputs")
314 )
315 else:
316 mesh_folder = "-"
318 if self.main_magnet.solution_folder is not None:
319 solution_folder = os.path.relpath(self.main_magnet.solution_folder)
320 solution_folder = os.path.relpath(
321 solution_folder, os.path.join("tests", "_outputs")
322 )
323 else:
324 solution_folder = "-"
326 run_log_row = [
327 self.time_stamp,
328 run_type,
329 comments,
330 geo_folder,
331 mesh_folder,
332 solution_folder,
333 ]
334 self.add_to_run_log(
335 os.path.join(self.wrk_folder, "run_log.csv"), run_log_row
336 )
337 except:
338 self.logger.warning("Run log could not be completed.")
340 def build_magnet(self):
341 """
342 Main method to build magnets, i.e. to run various fiqus run types and magnet types
343 :return: none
344 :rtype: none
345 """
346 if self.fdm.run.type == "start_from_yaml":
347 self.main_magnet.generate_geometry()
348 self.main_magnet.pre_process()
349 self.main_magnet.load_geometry()
350 for key, value in self.main_magnet.mesh().items():
351 self.summary[key] = value
352 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp()
353 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
354 self.summary[key] = value
355 elif self.fdm.run.type == "pre_process_only":
356 self.main_magnet.pre_process()
357 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
358 self.summary[key] = value # todo: DISABLE FOR ONE GROUP ONLY
359 elif self.fdm.run.type == "geometry_only":
360 self.main_magnet.generate_geometry(
361 gui=(self.main_magnet.fdm.run.launch_gui if self.fdm.magnet.type != "CCT_straight" else False)
362 )
363 if self.fdm.magnet.type in ["CCT_straight", "CWS"]:
364 self.main_magnet.pre_process(gui=self.main_magnet.fdm.run.launch_gui)
365 elif self.fdm.run.type == "geometry_and_mesh":
366 self.main_magnet.generate_geometry()
367 self.main_magnet.pre_process()
368 self.main_magnet.load_geometry()
369 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items():
370 self.summary[key] = value
371 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python":
372 self.main_magnet.load_geometry()
373 for key, value in self.main_magnet.mesh().items():
374 self.summary[key] = value
375 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp()
376 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
377 self.summary[key] = value
378 elif self.fdm.run.type == "mesh_only":
379 self.main_magnet.load_geometry()
380 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items():
381 self.summary[key] = value
382 elif self.fdm.run.type == "solve_with_post_process_python":
383 self.summary["solution_time"] = (
384 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui)
385 )
386 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
387 self.summary[key] = value
388 elif self.fdm.run.type == "solve_only":
389 self.summary["solution_time"] = (
390 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui)
391 )
392 elif self.fdm.run.type == "post_process_getdp_only":
393 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui)
394 elif self.fdm.run.type == "post_process_python_only":
395 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
396 self.summary[key] = value
397 elif self.fdm.run.type == "post_process":
398 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui)
399 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items():
400 self.summary[key] = value
401 elif self.fdm.run.type == "plot_python":
402 self.main_magnet.plot_python()
404 elif self.fdm.run.type == "batch_post_process_python":
405 self.main_magnet.batch_post_process_python()
406 os.chdir(self.start_folder)
408 if self.file_name: json.dump(self.summary, open(f"{os.path.join(self.wrk_folder, self.file_name)}.json", 'w'))
409 # mesh_par = self.fdm.magnet.mesh
410 # if self.summary['solution_time']:
411 # with open(r"C:\Users\avitrano\PycharmProjects\steam_sdk\tests\parsims\FiQuS_run\summary.dat", 'a') as f:
412 # content = (f"{mesh_par.mesh_coil.SizeMin} {mesh_par.mesh_coil.SizeMax} {mesh_par.mesh_iron.SizeMin} {mesh_par.mesh_iron.SizeMax} "
413 # f"{self.summary['solution_time']} {self.summary['overall_error']} {self.summary['overall_error'] * 0.999 + self.summary['solution_time'] * 0.001} "
414 # f"{self.summary['SJ']} {self.summary['SICN']} {self.summary['SIGE']} {self.summary['Gamma']} "
415 # f"{self.summary['nodes']} {self.summary['minimum_diff']} {self.summary['maximum_diff']}\n")
416 # f.writelines(content)
418 @staticmethod
419 def add_to_run_log(path_to_csv, run_log_row):
420 # If file does not exist, write the header
421 if not os.path.isfile(path_to_csv):
422 header = [
423 "Time Stamp",
424 "Run Type",
425 "Comments",
426 "Geometry Directory",
427 "Mesh Directory",
428 "Solution Directory",
429 ]
430 with open(path_to_csv, "a", newline="") as csv_file:
431 writer = csv.writer(csv_file)
432 writer.writerow(header)
434 # Open the CSV file in append mode
435 with open(path_to_csv, "a+", newline="") as csv_file:
436 writer = csv.writer(csv_file)
437 writer.writerow(run_log_row)
439 def change_htcondor_run_log(self, htcondor_csv_file, htcondor_jobid, new_status="None"):
440 try:
441 df = pd.read_csv(htcondor_csv_file)
442 df.loc[df['Job ID'] == htcondor_jobid, 'Status'] = str(new_status)
443 self.logger.info(f"Changed status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.")
444 df.to_csv(htcondor_csv_file, index=False)
445 except:
446 self.logger.warning(f"Could not change status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.")
448if __name__ == "__main__":
449 parser = argparse.ArgumentParser(
450 prog="FiQuS",
451 description="Finite Elements Quench Simulator",
452 epilog="steam-team@cern.ch",
453 )
454 parser.add_argument(
455 dest="full_path_input",
456 type=str,
457 help="Full path to FiQuS input yaml file",
458 )
459 parser.add_argument(
460 "--output", '-o', dest="output_path", type=str, help="Full path to FiQuS output folder"
461 )
462 parser.add_argument(
463 "--getdp", '-g', dest="GetDP_path", type=str, help="Full path to GetDP executable"
464 )
466 parser.add_argument("--htcondor_jobid", '-j', type=int, default=0,
467 help="HTCondor job ID (optional)", required=False)
469 parser.add_argument("--fiqus_data_model", '-m', type=str,
470 help="Full path to FiQuS Data Model file (optional)", required=False)
472 parser.add_argument("--fiqus_data_settings", '-s', type=str,
473 help="Full path to FiQuS Data Settings file (optional)", required=False)
475 args, unknown = parser.parse_known_args()
477 # remove these options from sys.argv, otherwise they are passed onto Gmsh
478 # in Gmsh.initialize()
479 options_to_remove = ["-o", "-g", "-j", "-m", "-s"]
480 # Loop through and remove each option and its value
481 i = 0
482 while i < len(sys.argv):
483 if sys.argv[i] in options_to_remove:
484 sys.argv.pop(i) # Remove the option
485 if i < len(sys.argv):
486 sys.argv.pop(i) # Remove the associated value
487 else:
488 i += 1
490 if args.fiqus_data_model != None and args.fiqus_data_settings != None:
491 # read fdm and fds from a file (HTCondor case)
492 input_fdm = Util.read_data_from_yaml(args.fiqus_data_model, FDM)
493 input_fds = Util.read_data_from_yaml(args.fiqus_data_settings, DataSettings)
495 MainFiQuS(
496 input_file_path=args.full_path_input,
497 model_folder=args.output_path,
498 fdm=input_fdm,
499 fds=input_fds,
500 htcondor_jobid=args.htcondor_jobid
501 )
502 else:
503 # fdm and fds from input (STEAM SDK case)
504 MainFiQuS(
505 input_file_path=args.full_path_input,
506 model_folder=args.output_path,
507 GetDP_path=args.GetDP_path,
508 )
509 print("FiQuS run completed")