Coverage for fiqus/MainFiQuS.py: 66%

245 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-02-01 01:38 +0000

1import argparse 

2import csv 

3import os 

4import pathlib 

5import sys 

6import time 

7import getpass 

8import platform 

9import subprocess 

10import json 

11 

12import pandas as pd 

13 

14FiQuS_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 

15sys.path.insert(0, FiQuS_path) 

16 

17from fiqus.utils.Utils import FilesAndFolders as Util 

18from fiqus.utils.Utils import CheckForExceptions as Check 

19from fiqus.utils.Utils import create_json_schema 

20from fiqus.utils.Utils import get_data_settings 

21from fiqus.utils.Utils import initialize_logger 

22from fiqus.data.DataFiQuS import FDM, SolveDumpDataModel 

23from fiqus.data.DataSettings import DataSettings 

24from fiqus.mains.MainCCT import MainCCT 

25from fiqus.mains.MainMultipole import MainMultipole 

26from fiqus.mains.MainPancake3D import MainPancake3D 

27from fiqus.mains.MainConductorAC_Strand import MainConductorAC_Strand 

28from fiqus.mains.MainHomogenizedConductor import MainHomogenizedConductor 

29from fiqus.mains.MainConductorAC_Rutherford import MainConductorAC_Rutherford 

30from fiqus.mains.MainConductorAC_CC import MainConductorAC_CC 

31 

32class MainFiQuS: 

33 """ 

34 This is the top level class of FiQuS. 

35 """ 

36 

37 def __init__( 

38 self, 

39 input_file_path: str = None, 

40 model_folder: str = None, 

41 GetDP_path=None, 

42 fdm=None, 

43 fds=None, 

44 htcondor_jobid=None 

45 ): 

46 """ 

47 Main class for working with FiQuS simulations 

48 :param input_file_path: full path to input file yaml 

49 :type input_file_path: str 

50 :param model_folder: full path to the base output folder, called model folder 

51 :type model_folder: str 

52 :param GetDP_path: full path to GetDP executable 

53 :type GetDP_path: str 

54 :param fdm: FiQuS Data Model - object of fiqus DataFiQus 

55 :type fdm: object 

56 :param fds: FiQuS Data Settings - object of DataSettings 

57 :type fds: object 

58 """ 

59 self.time_stamp = time.strftime("%Y-%m-%d-%H-%M-%S") 

60 

61 self.start_folder = os.getcwd() 

62 self.wrk_folder = model_folder 

63 self.file_name = None 

64 

65 # Load yaml input file 

66 if not fdm: 

67 self.fdm: FDM = Util.read_data_from_yaml(input_file_path, FDM) 

68 copyInputFile = ( 

69 "copy" 

70 f" {input_file_path} {os.path.join(self.wrk_folder, 'logs', f'INPUT_FILE_{self.time_stamp}.FiQuS.yaml')}" 

71 ) 

72 subprocess.run(copyInputFile, shell=True, stdout=subprocess.DEVNULL) 

73 else: 

74 self.fdm: FDM = fdm 

75 verbose = self.fdm.run.verbosity_FiQuS 

76 self.logger = initialize_logger( 

77 verbose=verbose, time_stamp=self.time_stamp, work_folder=self.wrk_folder 

78 ) 

79 if verbose: 

80 Util.print_welcome_graphics() 

81 # Intialize logger 

82 

83 # Create JSON schema 

84 create_json_schema(self.fdm) 

85 

86 # Check for input errors 

87 Check.check_inputs(run=self.fdm.run) 

88 

89 # Initialize Main object 

90 if self.fdm.magnet.type == "CCT_straight": 

91 self.main_magnet = MainCCT(fdm=self.fdm, verbose=verbose) 

92 elif self.fdm.magnet.type == "Pancake3D": 

93 self.main_magnet = MainPancake3D(fdm=self.fdm, verbose=verbose) 

94 elif self.fdm.magnet.type == "CACStrand": 

95 self.main_magnet = MainConductorAC_Strand(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose) 

96 elif self.fdm.magnet.type == "CACCC": 

97 self.main_magnet = MainConductorAC_CC(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose) 

98 elif self.fdm.magnet.type == "HomogenizedConductor": 

99 self.main_magnet = MainHomogenizedConductor(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose) 

100 elif self.fdm.magnet.type == "CACRutherford": 

101 self.main_magnet = MainConductorAC_Rutherford(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose) 

102 elif self.fdm.magnet.type == "multipole": 

103 self.file_name = os.path.basename(input_file_path)[:-5] 

104 if not self.fdm.magnet.geometry.geom_file_path: 

105 self.fdm.magnet.geometry.geom_file_path = f"{input_file_path[:-5]}.geom" 

106 self.main_magnet = MainMultipole(fdm=self.fdm,rgd_path=self.fdm.magnet.geometry.geom_file_path,verbose=verbose, inputs_folder_path=pathlib.Path(input_file_path).parent) 

107 else: 

108 raise ValueError( 

109 f"FiQuS does not support magnet type: {self.fdm.magnet.type}!" 

110 ) 

111 

112 # Load user paths for executables and additional files 

113 self.logger.info(f'{getpass.getuser()} is running on {platform.platform()}') 

114 if not fds: 

115 fds = get_data_settings(GetDP_path=GetDP_path) 

116 else: 

117 fds = get_data_settings(GetDP_path=GetDP_path, settings=fds) 

118 self.main_magnet.GetDP_path = fds.GetDP_path 

119 self.logger.info(f"{self.main_magnet.GetDP_path} is going to be used for FE solving.") 

120 

121 # self.logger.info(gmsh.onelab.run(self.fdm.general.magnet_name, f"{self.main_magnet.settings['GetDP_path']} -info")) 

122 

123 # update htcondor csv 

124 if htcondor_jobid: 

125 base_path_model_files = fds.base_path_model_files 

126 htcondor_csv_file = os.path.join(base_path_model_files, "htcondor_run_log.csv") 

127 

128 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Running") 

129 

130 # Save Model/Geometry/Mesh/Solution folder paths 

131 self.save_folders() 

132 

133 # Build magnet 

134 self.summary = dict.fromkeys( 

135 [ 

136 "SJ", 

137 "SICN", 

138 "SIGE", 

139 "Gamma", 

140 "nodes", 

141 "solution_time", 

142 "overall_error", 

143 "minimum_diff", 

144 "maximum_diff", 

145 ] 

146 ) 

147 

148 try: 

149 self.build_magnet() 

150 except Exception as e: 

151 # update htcondor csv 

152 if htcondor_jobid: 

153 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Failed") 

154 

155 self.logger.error(f"Error: {e}") 

156 raise e 

157 else: 

158 # update htcondor csv 

159 if htcondor_jobid: 

160 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Finished") 

161 

162 def save_folders(self): 

163 """ 

164 Method to make or delete folders of FiQuS 

165 :return: Nothing, only does file and folder operation 

166 :rtype: None 

167 """ 

168 def _check_and_generate_path(folder_type: str = None): 

169 if folder_type == "Geometry": 

170 folder = self.wrk_folder 

171 elif folder_type == "Mesh": 

172 folder = self.main_magnet.geom_folder 

173 elif folder_type == "Solution": 

174 folder = self.main_magnet.mesh_folder 

175 else: 

176 raise Exception("Incompatible type.") 

177 

178 if getattr(self.fdm.run, folder_type.lower()) is None: 

179 # folder_key is not given, so it is computed 

180 folder_key = Util.compute_folder_key( 

181 folder_type=folder_type, 

182 folder=folder, 

183 overwrite=self.fdm.run.overwrite, 

184 ) 

185 else: 

186 # folder_key is given 

187 folder_key = getattr(self.fdm.run, folder_type.lower()) 

188 

189 required_folder = folder_type in required_folders 

190 if self.fdm.run.overwrite and folder_type == ( 

191 required_folders[0] if required_folders else None 

192 ): 

193 Check.check_overwrite_conditions( 

194 folder_type=folder_type, folder=folder, folder_key=folder_key 

195 ) 

196 return Util.get_folder_path( 

197 folder_type=folder_type, 

198 folder=folder, 

199 folder_key=folder_key, 

200 overwrite=self.fdm.run.overwrite, 

201 required_folder=required_folder, 

202 ) 

203 

204 if self.fdm.run.type == "start_from_yaml": 

205 required_folders = ["Geometry", "Mesh", "Solution"] 

206 elif self.fdm.run.type == "geometry_and_mesh": 

207 required_folders = ["Geometry", "Mesh"] 

208 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python": 

209 required_folders = ["Mesh", "Solution"] 

210 elif self.fdm.run.type in ["solve_with_post_process_python", "solve_only"]: 

211 required_folders = ["Solution"] 

212 elif self.fdm.run.type == "geometry_only": 

213 required_folders = ( 

214 [] 

215 if self.fdm.run.geometry and not self.fdm.run.overwrite 

216 else ["Geometry"] 

217 ) 

218 elif self.fdm.run.type == "mesh_only": 

219 required_folders = ( 

220 [] if self.fdm.run.mesh and not self.fdm.run.overwrite else ["Mesh"] 

221 ) 

222 else: # post_process_getdp_only or post_process_python_only or plot_python 

223 required_folders = [] 

224 

225 

226 

227 self.main_magnet.geom_folder = _check_and_generate_path(folder_type="Geometry") 

228 if not self.fdm.run.type in ["geometry_only"]: 

229 self.main_magnet.mesh_folder = _check_and_generate_path(folder_type="Mesh") 

230 if not ( 

231 self.fdm.run.type == "geometry_only" 

232 or self.fdm.run.type == "mesh_only" 

233 ): 

234 self.main_magnet.solution_folder = _check_and_generate_path( 

235 folder_type="Solution" 

236 ) 

237 

238 if self.fdm.run.type in [ 

239 "start_from_yaml", 

240 "geometry_and_mesh", 

241 "geometry_only", 

242 ]: 

243 Util.write_data_model_to_yaml( 

244 os.path.join(self.main_magnet.geom_folder, "geometry.yaml"), 

245 self.fdm.magnet.geometry, 

246 by_alias=True, 

247 with_comments=True, 

248 ) 

249 if self.fdm.run.type in [ 

250 "start_from_yaml", 

251 "geometry_and_mesh", 

252 "mesh_and_solve_with_post_process_python", 

253 "mesh_only", 

254 ]: 

255 Util.write_data_model_to_yaml( 

256 os.path.join(self.main_magnet.mesh_folder, "mesh.yaml"), 

257 self.fdm.magnet.mesh, 

258 by_alias=True, 

259 with_comments=True, 

260 ) 

261 if self.fdm.run.type in [ 

262 "start_from_yaml", 

263 "mesh_and_solve_with_post_process_python", 

264 "solve_with_post_process_python", 

265 "solve_only", 

266 "post_process", 

267 "plot_python", 

268 "postprocess_veusz" 

269 ]: 

270 solve_dump_data = SolveDumpDataModel( 

271 solve=self.fdm.magnet.solve, 

272 circuit=self.fdm.circuit, 

273 power_supply=self.fdm.power_supply, 

274 quench_protection=self.fdm.quench_protection, 

275 quench_detection=self.fdm.quench_detection, 

276 conductors=self.fdm.conductors 

277 ) 

278 Util.write_data_model_to_yaml( 

279 os.path.join(self.main_magnet.solution_folder, "solve.yaml"), 

280 solve_dump_data, 

281 by_alias=True, 

282 with_comments=True, 

283 ) 

284 if self.fdm.run.type in [ 

285 "start_from_yaml", 

286 "mesh_and_solve_with_post_process_python", 

287 "solve_with_post_process_python", 

288 "post_process_python_only", 

289 "post_process_getdp_only", 

290 "post_process", 

291 "postprocess_veusz", 

292 "plot_python" 

293 ]: 

294 Util.write_data_model_to_yaml( 

295 os.path.join(self.main_magnet.solution_folder, "postproc.yaml"), 

296 self.fdm.magnet.postproc, 

297 by_alias=True, 

298 with_comments=True, 

299 ) 

300 

301 try: 

302 run_type = self.fdm.run.type 

303 comments = self.fdm.run.comments 

304 if self.main_magnet.geom_folder is not None: 

305 geo_folder = os.path.relpath(self.main_magnet.geom_folder) 

306 geo_folder = os.path.relpath( 

307 geo_folder, os.path.join("tests", "_outputs") 

308 ) 

309 else: 

310 geo_folder = "-" 

311 

312 if self.main_magnet.mesh_folder is not None: 

313 mesh_folder = os.path.relpath(self.main_magnet.mesh_folder) 

314 mesh_folder = os.path.relpath( 

315 mesh_folder, os.path.join("tests", "_outputs") 

316 ) 

317 else: 

318 mesh_folder = "-" 

319 

320 if self.main_magnet.solution_folder is not None: 

321 solution_folder = os.path.relpath(self.main_magnet.solution_folder) 

322 solution_folder = os.path.relpath( 

323 solution_folder, os.path.join("tests", "_outputs") 

324 ) 

325 else: 

326 solution_folder = "-" 

327 

328 run_log_row = [ 

329 self.time_stamp, 

330 run_type, 

331 comments, 

332 geo_folder, 

333 mesh_folder, 

334 solution_folder, 

335 ] 

336 self.add_to_run_log( 

337 os.path.join(self.wrk_folder, "run_log.csv"), run_log_row 

338 ) 

339 except: 

340 self.logger.warning("Run log could not be completed.") 

341 

342 

343 def build_magnet(self): 

344 """ 

345 Main method to build magnets, i.e. to run various fiqus run types and magnet types 

346 :return: none 

347 :rtype: none 

348 """ 

349 if self.fdm.run.type == "start_from_yaml": 

350 self.main_magnet.generate_geometry() 

351 self.main_magnet.pre_process() 

352 self.main_magnet.load_geometry() 

353 for key, value in self.main_magnet.mesh().items(): 

354 self.summary[key] = value 

355 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp() 

356 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

357 self.summary[key] = value 

358 elif self.fdm.run.type == "pre_process_only": 

359 self.main_magnet.pre_process() 

360 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

361 self.summary[key] = value # todo: DISABLE FOR ONE GROUP ONLY 

362 elif self.fdm.run.type == "geometry_only": 

363 self.main_magnet.generate_geometry( 

364 gui=(self.main_magnet.fdm.run.launch_gui if self.fdm.magnet.type != "CCT_straight" else False) 

365 ) 

366 if self.fdm.magnet.type in ["CCT_straight", "CWS"]: 

367 self.main_magnet.pre_process(gui=self.main_magnet.fdm.run.launch_gui) 

368 elif self.fdm.run.type == "geometry_and_mesh": 

369 self.main_magnet.generate_geometry() 

370 self.main_magnet.pre_process() 

371 self.main_magnet.load_geometry() 

372 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items(): 

373 self.summary[key] = value 

374 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python": 

375 self.main_magnet.load_geometry() 

376 for key, value in self.main_magnet.mesh().items(): 

377 self.summary[key] = value 

378 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp() 

379 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

380 self.summary[key] = value 

381 elif self.fdm.run.type == "mesh_only": 

382 self.main_magnet.load_geometry() 

383 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items(): 

384 self.summary[key] = value 

385 elif self.fdm.run.type == "solve_with_post_process_python": 

386 self.summary["solution_time"] = ( 

387 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

388 ) 

389 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

390 self.summary[key] = value 

391 elif self.fdm.run.type == "solve_only": 

392 self.summary["solution_time"] = ( 

393 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

394 ) 

395 elif self.fdm.run.type == "post_process_getdp_only": 

396 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

397 elif self.fdm.run.type == "post_process_python_only": 

398 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

399 self.summary[key] = value 

400 elif self.fdm.run.type == "post_process": 

401 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

402 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

403 self.summary[key] = value 

404 elif self.fdm.run.type == "plot_python": 

405 self.main_magnet.plot_python() 

406 

407 elif self.fdm.run.type == "batch_post_process_python": 

408 self.main_magnet.batch_post_process_python() 

409 os.chdir(self.start_folder) 

410 

411 if self.file_name: 

412 file_path = os.path.join(self.wrk_folder, f"{self.file_name}.json") 

413 with open(file_path, 'w', encoding='utf-8') as f: 

414 json.dump(self.summary, f, indent=2) 

415 

416 @staticmethod 

417 def add_to_run_log(path_to_csv, run_log_row): 

418 # If file does not exist, write the header 

419 if not os.path.isfile(path_to_csv): 

420 header = [ 

421 "Time Stamp", 

422 "Run Type", 

423 "Comments", 

424 "Geometry Directory", 

425 "Mesh Directory", 

426 "Solution Directory", 

427 ] 

428 with open(path_to_csv, "a", newline="") as csv_file: 

429 writer = csv.writer(csv_file) 

430 writer.writerow(header) 

431 

432 # Open the CSV file in append mode 

433 with open(path_to_csv, "a+", newline="") as csv_file: 

434 writer = csv.writer(csv_file) 

435 writer.writerow(run_log_row) 

436 

437 def change_htcondor_run_log(self, htcondor_csv_file, htcondor_jobid, new_status="None"): 

438 try: 

439 df = pd.read_csv(htcondor_csv_file) 

440 df.loc[df['Job ID'] == htcondor_jobid, 'Status'] = str(new_status) 

441 self.logger.info(f"Changed status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.") 

442 df.to_csv(htcondor_csv_file, index=False) 

443 except: 

444 self.logger.warning(f"Could not change status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.") 

445 

446if __name__ == "__main__": 

447 parser = argparse.ArgumentParser( 

448 prog="FiQuS", 

449 description="Finite Elements Quench Simulator", 

450 epilog="steam-team@cern.ch", 

451 ) 

452 parser.add_argument( 

453 dest="full_path_input", 

454 type=str, 

455 help="Full path to FiQuS input yaml file", 

456 ) 

457 parser.add_argument( 

458 "--output", '-o', dest="output_path", type=str, help="Full path to FiQuS output folder" 

459 ) 

460 parser.add_argument( 

461 "--getdp", '-g', dest="GetDP_path", type=str, help="Full path to GetDP executable" 

462 ) 

463 

464 parser.add_argument("--htcondor_jobid", '-j', type=int, default=0, 

465 help="HTCondor job ID (optional)", required=False) 

466 

467 parser.add_argument("--fiqus_data_model", '-m', type=str, 

468 help="Full path to FiQuS Data Model file (optional)", required=False) 

469 

470 parser.add_argument("--fiqus_data_settings", '-s', type=str, 

471 help="Full path to FiQuS Data Settings file (optional)", required=False) 

472 

473 args, unknown = parser.parse_known_args() 

474 

475 # remove these options from sys.argv, otherwise they are passed onto Gmsh 

476 # in Gmsh.initialize() 

477 options_to_remove = ["-o", "-g", "-j", "-m", "-s"] 

478 # Loop through and remove each option and its value 

479 i = 0 

480 while i < len(sys.argv): 

481 if sys.argv[i] in options_to_remove: 

482 sys.argv.pop(i) # Remove the option 

483 if i < len(sys.argv): 

484 sys.argv.pop(i) # Remove the associated value 

485 else: 

486 i += 1 

487 

488 if args.fiqus_data_model != None and args.fiqus_data_settings != None: 

489 # read fdm and fds from a file (HTCondor case) 

490 input_fdm = Util.read_data_from_yaml(args.fiqus_data_model, FDM) 

491 input_fds = Util.read_data_from_yaml(args.fiqus_data_settings, DataSettings) 

492 

493 MainFiQuS( 

494 input_file_path=args.full_path_input, 

495 model_folder=args.output_path, 

496 fdm=input_fdm, 

497 fds=input_fds, 

498 htcondor_jobid=args.htcondor_jobid 

499 ) 

500 else: 

501 # fdm and fds from input (STEAM SDK case) 

502 MainFiQuS( 

503 input_file_path=args.full_path_input, 

504 model_folder=args.output_path, 

505 GetDP_path=args.GetDP_path, 

506 ) 

507 print("FiQuS run completed")