Coverage for fiqus/MainFiQuS.py: 65%

241 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-11-29 01:35 +0000

1import argparse 

2import csv 

3import os 

4import pathlib 

5import sys 

6import time 

7import getpass 

8import platform 

9import subprocess 

10import json 

11 

12import pandas as pd 

13 

14FiQuS_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 

15sys.path.insert(0, FiQuS_path) 

16 

17from fiqus.utils.Utils import FilesAndFolders as Util 

18from fiqus.utils.Utils import CheckForExceptions as Check 

19from fiqus.utils.Utils import create_json_schema 

20from fiqus.utils.Utils import get_data_settings 

21from fiqus.utils.Utils import initialize_logger 

22from fiqus.data.DataFiQuS import FDM, SolveDumpDataModel 

23from fiqus.data.DataSettings import DataSettings 

24from fiqus.mains.MainCCT import MainCCT 

25from fiqus.mains.MainMultipole import MainMultipole 

26from fiqus.mains.MainPancake3D import MainPancake3D 

27from fiqus.mains.MainConductorAC_Strand import MainConductorAC_Strand 

28from fiqus.mains.MainHomogenizedConductor import MainHomogenizedConductor 

29 

30class MainFiQuS: 

31 """ 

32 This is the top level class of FiQuS. 

33 """ 

34 

35 def __init__( 

36 self, 

37 input_file_path: str = None, 

38 model_folder: str = None, 

39 GetDP_path=None, 

40 fdm=None, 

41 fds=None, 

42 htcondor_jobid=None 

43 ): 

44 """ 

45 Main class for working with FiQuS simulations 

46 :param input_file_path: full path to input file yaml 

47 :type input_file_path: str 

48 :param model_folder: full path to the base output folder, called model folder 

49 :type model_folder: str 

50 :param GetDP_path: full path to GetDP executable 

51 :type GetDP_path: str 

52 :param fdm: FiQuS Data Model - object of fiqus DataFiQus 

53 :type fdm: object 

54 :param fds: FiQuS Data Settings - object of DataSettings 

55 :type fds: object 

56 """ 

57 self.time_stamp = time.strftime("%Y-%m-%d-%H-%M-%S") 

58 

59 self.start_folder = os.getcwd() 

60 self.wrk_folder = model_folder 

61 self.file_name = None 

62 

63 # Load yaml input file 

64 if not fdm: 

65 self.fdm: FDM = Util.read_data_from_yaml(input_file_path, FDM) 

66 copyInputFile = ( 

67 "copy" 

68 f" {input_file_path} {os.path.join(self.wrk_folder, 'logs', f'INPUT_FILE_{self.time_stamp}.FiQuS.yaml')}" 

69 ) 

70 subprocess.run(copyInputFile, shell=True, stdout=subprocess.DEVNULL) 

71 else: 

72 self.fdm: FDM = fdm 

73 verbose = self.fdm.run.verbosity_FiQuS 

74 self.logger = initialize_logger( 

75 verbose=verbose, time_stamp=self.time_stamp, work_folder=self.wrk_folder 

76 ) 

77 if verbose: 

78 Util.print_welcome_graphics() 

79 # Intialize logger 

80 

81 # Create JSON schema 

82 create_json_schema(self.fdm) 

83 

84 # Check for input errors 

85 Check.check_inputs(run=self.fdm.run) 

86 

87 # Initialize Main object 

88 if self.fdm.magnet.type == "CCT_straight": 

89 self.main_magnet = MainCCT(fdm=self.fdm, verbose=verbose) 

90 elif self.fdm.magnet.type == "Pancake3D": 

91 self.main_magnet = MainPancake3D(fdm=self.fdm, verbose=verbose) 

92 elif self.fdm.magnet.type == "CACStrand": 

93 self.main_magnet = MainConductorAC_Strand(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose) 

94 elif self.fdm.magnet.type == "HomogenizedConductor": 

95 self.main_magnet = MainHomogenizedConductor(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose) 

96 elif self.fdm.magnet.type == "multipole": 

97 self.file_name = os.path.basename(input_file_path)[:-5] 

98 if not self.fdm.magnet.geometry.geom_file_path: 

99 self.fdm.magnet.geometry.geom_file_path = f"{input_file_path[:-5]}.geom" 

100 self.main_magnet = MainMultipole( 

101 fdm=self.fdm, 

102 rgd_path=self.fdm.magnet.geometry.geom_file_path, 

103 verbose=verbose, 

104 ) 

105 

106 else: 

107 raise ValueError( 

108 f"FiQuS does not support magnet type: {self.fdm.magnet.type}!" 

109 ) 

110 

111 # Load user paths for executables and additional files 

112 self.logger.info(f'{getpass.getuser()} is running on {platform.platform()}') 

113 if not fds: 

114 fds = get_data_settings(GetDP_path=GetDP_path) 

115 else: 

116 fds = get_data_settings(GetDP_path=GetDP_path, settings=fds) 

117 self.main_magnet.GetDP_path = fds.GetDP_path 

118 self.logger.info(f"{self.main_magnet.GetDP_path} is going to be used for FE solving.") 

119 

120 # self.logger.info(gmsh.onelab.run(self.fdm.general.magnet_name, f"{self.main_magnet.settings['GetDP_path']} -info")) 

121 

122 # update htcondor csv 

123 if htcondor_jobid: 

124 base_path_model_files = fds.base_path_model_files 

125 htcondor_csv_file = os.path.join(base_path_model_files, "htcondor_run_log.csv") 

126 

127 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Running") 

128 

129 # Save Model/Geometry/Mesh/Solution folder paths 

130 self.save_folders() 

131 

132 # Build magnet 

133 self.summary = dict.fromkeys( 

134 [ 

135 "SJ", 

136 "SICN", 

137 "SIGE", 

138 "Gamma", 

139 "nodes", 

140 "solution_time", 

141 "overall_error", 

142 "minimum_diff", 

143 "maximum_diff", 

144 ] 

145 ) 

146 

147 try: 

148 self.build_magnet() 

149 except Exception as e: 

150 # update htcondor csv 

151 if htcondor_jobid: 

152 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Failed") 

153 

154 self.logger.error(f"Error: {e}") 

155 raise e 

156 else: 

157 # update htcondor csv 

158 if htcondor_jobid: 

159 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Finished") 

160 

161 def save_folders(self): 

162 """ 

163 Method to make or delete folders of FiQuS 

164 :return: Nothing, only does file and folder operation 

165 :rtype: None 

166 """ 

167 def _check_and_generate_path(folder_type: str = None): 

168 if folder_type == "Geometry": 

169 folder = self.wrk_folder 

170 elif folder_type == "Mesh": 

171 folder = self.main_magnet.geom_folder 

172 elif folder_type == "Solution": 

173 folder = self.main_magnet.mesh_folder 

174 else: 

175 raise Exception("Incompatible type.") 

176 

177 if getattr(self.fdm.run, folder_type.lower()) is None: 

178 # folder_key is not given, so it is computed 

179 folder_key = Util.compute_folder_key( 

180 folder_type=folder_type, 

181 folder=folder, 

182 overwrite=self.fdm.run.overwrite, 

183 ) 

184 else: 

185 # folder_key is given 

186 folder_key = getattr(self.fdm.run, folder_type.lower()) 

187 

188 required_folder = folder_type in required_folders 

189 if self.fdm.run.overwrite and folder_type == ( 

190 required_folders[0] if required_folders else None 

191 ): 

192 Check.check_overwrite_conditions( 

193 folder_type=folder_type, folder=folder, folder_key=folder_key 

194 ) 

195 return Util.get_folder_path( 

196 folder_type=folder_type, 

197 folder=folder, 

198 folder_key=folder_key, 

199 overwrite=self.fdm.run.overwrite, 

200 required_folder=required_folder, 

201 ) 

202 

203 if self.fdm.run.type == "start_from_yaml": 

204 required_folders = ["Geometry", "Mesh", "Solution"] 

205 elif self.fdm.run.type == "geometry_and_mesh": 

206 required_folders = ["Geometry", "Mesh"] 

207 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python": 

208 required_folders = ["Mesh", "Solution"] 

209 elif self.fdm.run.type in ["solve_with_post_process_python", "solve_only"]: 

210 required_folders = ["Solution"] 

211 elif self.fdm.run.type == "geometry_only": 

212 required_folders = ( 

213 [] 

214 if self.fdm.run.geometry and not self.fdm.run.overwrite 

215 else ["Geometry"] 

216 ) 

217 elif self.fdm.run.type == "mesh_only": 

218 required_folders = ( 

219 [] if self.fdm.run.mesh and not self.fdm.run.overwrite else ["Mesh"] 

220 ) 

221 else: # post_process_getdp_only or post_process_python_only or plot_python 

222 required_folders = [] 

223 

224 

225 

226 self.main_magnet.geom_folder = _check_and_generate_path(folder_type="Geometry") 

227 if not self.fdm.run.type in ["geometry_only"]: 

228 self.main_magnet.mesh_folder = _check_and_generate_path(folder_type="Mesh") 

229 if not ( 

230 self.fdm.run.type == "geometry_only" 

231 or self.fdm.run.type == "mesh_only" 

232 ): 

233 self.main_magnet.solution_folder = _check_and_generate_path( 

234 folder_type="Solution" 

235 ) 

236 

237 if self.fdm.run.type in [ 

238 "start_from_yaml", 

239 "geometry_and_mesh", 

240 "geometry_only", 

241 ]: 

242 Util.write_data_model_to_yaml( 

243 os.path.join(self.main_magnet.geom_folder, "geometry.yaml"), 

244 self.fdm.magnet.geometry, 

245 by_alias=True, 

246 with_comments=True, 

247 ) 

248 if self.fdm.run.type in [ 

249 "start_from_yaml", 

250 "geometry_and_mesh", 

251 "mesh_and_solve_with_post_process_python", 

252 "mesh_only", 

253 ]: 

254 Util.write_data_model_to_yaml( 

255 os.path.join(self.main_magnet.mesh_folder, "mesh.yaml"), 

256 self.fdm.magnet.mesh, 

257 by_alias=True, 

258 with_comments=True, 

259 ) 

260 if self.fdm.run.type in [ 

261 "start_from_yaml", 

262 "mesh_and_solve_with_post_process_python", 

263 "solve_with_post_process_python", 

264 "solve_only", 

265 "post_process", 

266 "plot_python", 

267 "postprocess_veusz" 

268 ]: 

269 solve_dump_data = SolveDumpDataModel( 

270 solve=self.fdm.magnet.solve, 

271 circuit=self.fdm.circuit, 

272 power_supply=self.fdm.power_supply, 

273 quench_protection=self.fdm.quench_protection, 

274 quench_detection=self.fdm.quench_detection, 

275 conductors=self.fdm.conductors 

276 ) 

277 Util.write_data_model_to_yaml( 

278 os.path.join(self.main_magnet.solution_folder, "solve.yaml"), 

279 solve_dump_data, 

280 by_alias=True, 

281 with_comments=True, 

282 ) 

283 if self.fdm.run.type in [ 

284 "start_from_yaml", 

285 "mesh_and_solve_with_post_process_python", 

286 "solve_with_post_process_python", 

287 "post_process_python_only", 

288 "post_process_getdp_only", 

289 "post_process", 

290 "postprocess_veusz", 

291 "plot_python" 

292 ]: 

293 Util.write_data_model_to_yaml( 

294 os.path.join(self.main_magnet.solution_folder, "postproc.yaml"), 

295 self.fdm.magnet.postproc, 

296 by_alias=True, 

297 with_comments=True, 

298 ) 

299 

300 try: 

301 run_type = self.fdm.run.type 

302 comments = self.fdm.run.comments 

303 if self.main_magnet.geom_folder is not None: 

304 geo_folder = os.path.relpath(self.main_magnet.geom_folder) 

305 geo_folder = os.path.relpath( 

306 geo_folder, os.path.join("tests", "_outputs") 

307 ) 

308 else: 

309 geo_folder = "-" 

310 

311 if self.main_magnet.mesh_folder is not None: 

312 mesh_folder = os.path.relpath(self.main_magnet.mesh_folder) 

313 mesh_folder = os.path.relpath( 

314 mesh_folder, os.path.join("tests", "_outputs") 

315 ) 

316 else: 

317 mesh_folder = "-" 

318 

319 if self.main_magnet.solution_folder is not None: 

320 solution_folder = os.path.relpath(self.main_magnet.solution_folder) 

321 solution_folder = os.path.relpath( 

322 solution_folder, os.path.join("tests", "_outputs") 

323 ) 

324 else: 

325 solution_folder = "-" 

326 

327 run_log_row = [ 

328 self.time_stamp, 

329 run_type, 

330 comments, 

331 geo_folder, 

332 mesh_folder, 

333 solution_folder, 

334 ] 

335 self.add_to_run_log( 

336 os.path.join(self.wrk_folder, "run_log.csv"), run_log_row 

337 ) 

338 except: 

339 self.logger.warning("Run log could not be completed.") 

340 

341 

342 def build_magnet(self): 

343 """ 

344 Main method to build magnets, i.e. to run various fiqus run types and magnet types 

345 :return: none 

346 :rtype: none 

347 """ 

348 if self.fdm.run.type == "start_from_yaml": 

349 self.main_magnet.generate_geometry() 

350 self.main_magnet.pre_process() 

351 self.main_magnet.load_geometry() 

352 for key, value in self.main_magnet.mesh().items(): 

353 self.summary[key] = value 

354 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp() 

355 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

356 self.summary[key] = value 

357 elif self.fdm.run.type == "pre_process_only": 

358 self.main_magnet.pre_process() 

359 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

360 self.summary[key] = value # todo: DISABLE FOR ONE GROUP ONLY 

361 elif self.fdm.run.type == "geometry_only": 

362 self.main_magnet.generate_geometry( 

363 gui=(self.main_magnet.fdm.run.launch_gui if self.fdm.magnet.type != "CCT_straight" else False) 

364 ) 

365 if self.fdm.magnet.type in ["CCT_straight", "CWS"]: 

366 self.main_magnet.pre_process(gui=self.main_magnet.fdm.run.launch_gui) 

367 elif self.fdm.run.type == "geometry_and_mesh": 

368 self.main_magnet.generate_geometry() 

369 self.main_magnet.pre_process() 

370 self.main_magnet.load_geometry() 

371 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items(): 

372 self.summary[key] = value 

373 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python": 

374 self.main_magnet.load_geometry() 

375 for key, value in self.main_magnet.mesh().items(): 

376 self.summary[key] = value 

377 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp() 

378 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

379 self.summary[key] = value 

380 elif self.fdm.run.type == "mesh_only": 

381 self.main_magnet.load_geometry() 

382 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items(): 

383 self.summary[key] = value 

384 elif self.fdm.run.type == "solve_with_post_process_python": 

385 self.summary["solution_time"] = ( 

386 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

387 ) 

388 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

389 self.summary[key] = value 

390 elif self.fdm.run.type == "solve_only": 

391 self.summary["solution_time"] = ( 

392 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

393 ) 

394 elif self.fdm.run.type == "postprocess_veusz": 

395 self.main_magnet.post_process_veusz(gui=self.main_magnet.fdm.run.launch_gui) 

396 elif self.fdm.run.type == "post_process_getdp_only": 

397 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

398 elif self.fdm.run.type == "post_process_python_only": 

399 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

400 self.summary[key] = value 

401 elif self.fdm.run.type == "post_process": 

402 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

403 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

404 self.summary[key] = value 

405 elif self.fdm.run.type == "plot_python": 

406 self.main_magnet.plot_python() 

407 

408 elif self.fdm.run.type == "batch_post_process_python": 

409 self.main_magnet.batch_post_process_python() 

410 os.chdir(self.start_folder) 

411 

412 if self.file_name: 

413 file_path = os.path.join(self.wrk_folder, f"{self.file_name}.json") 

414 with open(file_path, 'w', encoding='utf-8') as f: 

415 json.dump(self.summary, f, indent=2) 

416 

417 @staticmethod 

418 def add_to_run_log(path_to_csv, run_log_row): 

419 # If file does not exist, write the header 

420 if not os.path.isfile(path_to_csv): 

421 header = [ 

422 "Time Stamp", 

423 "Run Type", 

424 "Comments", 

425 "Geometry Directory", 

426 "Mesh Directory", 

427 "Solution Directory", 

428 ] 

429 with open(path_to_csv, "a", newline="") as csv_file: 

430 writer = csv.writer(csv_file) 

431 writer.writerow(header) 

432 

433 # Open the CSV file in append mode 

434 with open(path_to_csv, "a+", newline="") as csv_file: 

435 writer = csv.writer(csv_file) 

436 writer.writerow(run_log_row) 

437 

438 def change_htcondor_run_log(self, htcondor_csv_file, htcondor_jobid, new_status="None"): 

439 try: 

440 df = pd.read_csv(htcondor_csv_file) 

441 df.loc[df['Job ID'] == htcondor_jobid, 'Status'] = str(new_status) 

442 self.logger.info(f"Changed status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.") 

443 df.to_csv(htcondor_csv_file, index=False) 

444 except: 

445 self.logger.warning(f"Could not change status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.") 

446 

447if __name__ == "__main__": 

448 parser = argparse.ArgumentParser( 

449 prog="FiQuS", 

450 description="Finite Elements Quench Simulator", 

451 epilog="steam-team@cern.ch", 

452 ) 

453 parser.add_argument( 

454 dest="full_path_input", 

455 type=str, 

456 help="Full path to FiQuS input yaml file", 

457 ) 

458 parser.add_argument( 

459 "--output", '-o', dest="output_path", type=str, help="Full path to FiQuS output folder" 

460 ) 

461 parser.add_argument( 

462 "--getdp", '-g', dest="GetDP_path", type=str, help="Full path to GetDP executable" 

463 ) 

464 

465 parser.add_argument("--htcondor_jobid", '-j', type=int, default=0, 

466 help="HTCondor job ID (optional)", required=False) 

467 

468 parser.add_argument("--fiqus_data_model", '-m', type=str, 

469 help="Full path to FiQuS Data Model file (optional)", required=False) 

470 

471 parser.add_argument("--fiqus_data_settings", '-s', type=str, 

472 help="Full path to FiQuS Data Settings file (optional)", required=False) 

473 

474 args, unknown = parser.parse_known_args() 

475 

476 # remove these options from sys.argv, otherwise they are passed onto Gmsh 

477 # in Gmsh.initialize() 

478 options_to_remove = ["-o", "-g", "-j", "-m", "-s"] 

479 # Loop through and remove each option and its value 

480 i = 0 

481 while i < len(sys.argv): 

482 if sys.argv[i] in options_to_remove: 

483 sys.argv.pop(i) # Remove the option 

484 if i < len(sys.argv): 

485 sys.argv.pop(i) # Remove the associated value 

486 else: 

487 i += 1 

488 

489 if args.fiqus_data_model != None and args.fiqus_data_settings != None: 

490 # read fdm and fds from a file (HTCondor case) 

491 input_fdm = Util.read_data_from_yaml(args.fiqus_data_model, FDM) 

492 input_fds = Util.read_data_from_yaml(args.fiqus_data_settings, DataSettings) 

493 

494 MainFiQuS( 

495 input_file_path=args.full_path_input, 

496 model_folder=args.output_path, 

497 fdm=input_fdm, 

498 fds=input_fds, 

499 htcondor_jobid=args.htcondor_jobid 

500 ) 

501 else: 

502 # fdm and fds from input (STEAM SDK case) 

503 MainFiQuS( 

504 input_file_path=args.full_path_input, 

505 model_folder=args.output_path, 

506 GetDP_path=args.GetDP_path, 

507 ) 

508 print("FiQuS run completed")