Coverage for fiqus/MainFiQuS.py: 66%

247 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-01-08 01:36 +0000

1import argparse 

2import csv 

3import os 

4import pathlib 

5import sys 

6import time 

7import getpass 

8import platform 

9import subprocess 

10import json 

11 

12import pandas as pd 

13 

14FiQuS_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 

15sys.path.insert(0, FiQuS_path) 

16 

17from fiqus.utils.Utils import FilesAndFolders as Util 

18from fiqus.utils.Utils import CheckForExceptions as Check 

19from fiqus.utils.Utils import create_json_schema 

20from fiqus.utils.Utils import get_data_settings 

21from fiqus.utils.Utils import initialize_logger 

22from fiqus.data.DataFiQuS import FDM, SolveDumpDataModel 

23from fiqus.data.DataSettings import DataSettings 

24from fiqus.mains.MainCCT import MainCCT 

25from fiqus.mains.MainMultipole import MainMultipole 

26from fiqus.mains.MainPancake3D import MainPancake3D 

27from fiqus.mains.MainConductorAC_Strand import MainConductorAC_Strand 

28from fiqus.mains.MainHomogenizedConductor import MainHomogenizedConductor 

29from fiqus.mains.MainConductorAC_Rutherford import MainConductorAC_Rutherford 

30from fiqus.mains.MainConductorAC_CC import MainConductorAC_CC 

31 

32class MainFiQuS: 

33 """ 

34 This is the top level class of FiQuS. 

35 """ 

36 

37 def __init__( 

38 self, 

39 input_file_path: str = None, 

40 model_folder: str = None, 

41 GetDP_path=None, 

42 fdm=None, 

43 fds=None, 

44 htcondor_jobid=None 

45 ): 

46 """ 

47 Main class for working with FiQuS simulations 

48 :param input_file_path: full path to input file yaml 

49 :type input_file_path: str 

50 :param model_folder: full path to the base output folder, called model folder 

51 :type model_folder: str 

52 :param GetDP_path: full path to GetDP executable 

53 :type GetDP_path: str 

54 :param fdm: FiQuS Data Model - object of fiqus DataFiQus 

55 :type fdm: object 

56 :param fds: FiQuS Data Settings - object of DataSettings 

57 :type fds: object 

58 """ 

59 self.time_stamp = time.strftime("%Y-%m-%d-%H-%M-%S") 

60 

61 self.start_folder = os.getcwd() 

62 self.wrk_folder = model_folder 

63 self.file_name = None 

64 

65 # Load yaml input file 

66 if not fdm: 

67 self.fdm: FDM = Util.read_data_from_yaml(input_file_path, FDM) 

68 copyInputFile = ( 

69 "copy" 

70 f" {input_file_path} {os.path.join(self.wrk_folder, 'logs', f'INPUT_FILE_{self.time_stamp}.FiQuS.yaml')}" 

71 ) 

72 subprocess.run(copyInputFile, shell=True, stdout=subprocess.DEVNULL) 

73 else: 

74 self.fdm: FDM = fdm 

75 verbose = self.fdm.run.verbosity_FiQuS 

76 self.logger = initialize_logger( 

77 verbose=verbose, time_stamp=self.time_stamp, work_folder=self.wrk_folder 

78 ) 

79 if verbose: 

80 Util.print_welcome_graphics() 

81 # Intialize logger 

82 

83 # Create JSON schema 

84 create_json_schema(self.fdm) 

85 

86 # Check for input errors 

87 Check.check_inputs(run=self.fdm.run) 

88 

89 # Initialize Main object 

90 if self.fdm.magnet.type == "CCT_straight": 

91 self.main_magnet = MainCCT(fdm=self.fdm, verbose=verbose) 

92 elif self.fdm.magnet.type == "Pancake3D": 

93 self.main_magnet = MainPancake3D(fdm=self.fdm, verbose=verbose) 

94 elif self.fdm.magnet.type == "CACStrand": 

95 self.main_magnet = MainConductorAC_Strand(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose) 

96 elif self.fdm.magnet.type == "CACCC": 

97 self.main_magnet = MainConductorAC_CC(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose) 

98 elif self.fdm.magnet.type == "HomogenizedConductor": 

99 self.main_magnet = MainHomogenizedConductor(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose) 

100 elif self.fdm.magnet.type == "CACRutherford": 

101 self.main_magnet = MainConductorAC_Rutherford(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose) 

102 elif self.fdm.magnet.type == "multipole": 

103 self.file_name = os.path.basename(input_file_path)[:-5] 

104 if not self.fdm.magnet.geometry.geom_file_path: 

105 self.fdm.magnet.geometry.geom_file_path = f"{input_file_path[:-5]}.geom" 

106 self.main_magnet = MainMultipole( 

107 fdm=self.fdm, 

108 rgd_path=self.fdm.magnet.geometry.geom_file_path, 

109 verbose=verbose, 

110 ) 

111 

112 else: 

113 raise ValueError( 

114 f"FiQuS does not support magnet type: {self.fdm.magnet.type}!" 

115 ) 

116 

117 # Load user paths for executables and additional files 

118 self.logger.info(f'{getpass.getuser()} is running on {platform.platform()}') 

119 if not fds: 

120 fds = get_data_settings(GetDP_path=GetDP_path) 

121 else: 

122 fds = get_data_settings(GetDP_path=GetDP_path, settings=fds) 

123 self.main_magnet.GetDP_path = fds.GetDP_path 

124 self.logger.info(f"{self.main_magnet.GetDP_path} is going to be used for FE solving.") 

125 

126 # self.logger.info(gmsh.onelab.run(self.fdm.general.magnet_name, f"{self.main_magnet.settings['GetDP_path']} -info")) 

127 

128 # update htcondor csv 

129 if htcondor_jobid: 

130 base_path_model_files = fds.base_path_model_files 

131 htcondor_csv_file = os.path.join(base_path_model_files, "htcondor_run_log.csv") 

132 

133 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Running") 

134 

135 # Save Model/Geometry/Mesh/Solution folder paths 

136 self.save_folders() 

137 

138 # Build magnet 

139 self.summary = dict.fromkeys( 

140 [ 

141 "SJ", 

142 "SICN", 

143 "SIGE", 

144 "Gamma", 

145 "nodes", 

146 "solution_time", 

147 "overall_error", 

148 "minimum_diff", 

149 "maximum_diff", 

150 ] 

151 ) 

152 

153 try: 

154 self.build_magnet() 

155 except Exception as e: 

156 # update htcondor csv 

157 if htcondor_jobid: 

158 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Failed") 

159 

160 self.logger.error(f"Error: {e}") 

161 raise e 

162 else: 

163 # update htcondor csv 

164 if htcondor_jobid: 

165 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Finished") 

166 

167 def save_folders(self): 

168 """ 

169 Method to make or delete folders of FiQuS 

170 :return: Nothing, only does file and folder operation 

171 :rtype: None 

172 """ 

173 def _check_and_generate_path(folder_type: str = None): 

174 if folder_type == "Geometry": 

175 folder = self.wrk_folder 

176 elif folder_type == "Mesh": 

177 folder = self.main_magnet.geom_folder 

178 elif folder_type == "Solution": 

179 folder = self.main_magnet.mesh_folder 

180 else: 

181 raise Exception("Incompatible type.") 

182 

183 if getattr(self.fdm.run, folder_type.lower()) is None: 

184 # folder_key is not given, so it is computed 

185 folder_key = Util.compute_folder_key( 

186 folder_type=folder_type, 

187 folder=folder, 

188 overwrite=self.fdm.run.overwrite, 

189 ) 

190 else: 

191 # folder_key is given 

192 folder_key = getattr(self.fdm.run, folder_type.lower()) 

193 

194 required_folder = folder_type in required_folders 

195 if self.fdm.run.overwrite and folder_type == ( 

196 required_folders[0] if required_folders else None 

197 ): 

198 Check.check_overwrite_conditions( 

199 folder_type=folder_type, folder=folder, folder_key=folder_key 

200 ) 

201 return Util.get_folder_path( 

202 folder_type=folder_type, 

203 folder=folder, 

204 folder_key=folder_key, 

205 overwrite=self.fdm.run.overwrite, 

206 required_folder=required_folder, 

207 ) 

208 

209 if self.fdm.run.type == "start_from_yaml": 

210 required_folders = ["Geometry", "Mesh", "Solution"] 

211 elif self.fdm.run.type == "geometry_and_mesh": 

212 required_folders = ["Geometry", "Mesh"] 

213 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python": 

214 required_folders = ["Mesh", "Solution"] 

215 elif self.fdm.run.type in ["solve_with_post_process_python", "solve_only"]: 

216 required_folders = ["Solution"] 

217 elif self.fdm.run.type == "geometry_only": 

218 required_folders = ( 

219 [] 

220 if self.fdm.run.geometry and not self.fdm.run.overwrite 

221 else ["Geometry"] 

222 ) 

223 elif self.fdm.run.type == "mesh_only": 

224 required_folders = ( 

225 [] if self.fdm.run.mesh and not self.fdm.run.overwrite else ["Mesh"] 

226 ) 

227 else: # post_process_getdp_only or post_process_python_only or plot_python 

228 required_folders = [] 

229 

230 

231 

232 self.main_magnet.geom_folder = _check_and_generate_path(folder_type="Geometry") 

233 if not self.fdm.run.type in ["geometry_only"]: 

234 self.main_magnet.mesh_folder = _check_and_generate_path(folder_type="Mesh") 

235 if not ( 

236 self.fdm.run.type == "geometry_only" 

237 or self.fdm.run.type == "mesh_only" 

238 ): 

239 self.main_magnet.solution_folder = _check_and_generate_path( 

240 folder_type="Solution" 

241 ) 

242 

243 if self.fdm.run.type in [ 

244 "start_from_yaml", 

245 "geometry_and_mesh", 

246 "geometry_only", 

247 ]: 

248 Util.write_data_model_to_yaml( 

249 os.path.join(self.main_magnet.geom_folder, "geometry.yaml"), 

250 self.fdm.magnet.geometry, 

251 by_alias=True, 

252 with_comments=True, 

253 ) 

254 if self.fdm.run.type in [ 

255 "start_from_yaml", 

256 "geometry_and_mesh", 

257 "mesh_and_solve_with_post_process_python", 

258 "mesh_only", 

259 ]: 

260 Util.write_data_model_to_yaml( 

261 os.path.join(self.main_magnet.mesh_folder, "mesh.yaml"), 

262 self.fdm.magnet.mesh, 

263 by_alias=True, 

264 with_comments=True, 

265 ) 

266 if self.fdm.run.type in [ 

267 "start_from_yaml", 

268 "mesh_and_solve_with_post_process_python", 

269 "solve_with_post_process_python", 

270 "solve_only", 

271 "post_process", 

272 "plot_python", 

273 "postprocess_veusz" 

274 ]: 

275 solve_dump_data = SolveDumpDataModel( 

276 solve=self.fdm.magnet.solve, 

277 circuit=self.fdm.circuit, 

278 power_supply=self.fdm.power_supply, 

279 quench_protection=self.fdm.quench_protection, 

280 quench_detection=self.fdm.quench_detection, 

281 conductors=self.fdm.conductors 

282 ) 

283 Util.write_data_model_to_yaml( 

284 os.path.join(self.main_magnet.solution_folder, "solve.yaml"), 

285 solve_dump_data, 

286 by_alias=True, 

287 with_comments=True, 

288 ) 

289 if self.fdm.run.type in [ 

290 "start_from_yaml", 

291 "mesh_and_solve_with_post_process_python", 

292 "solve_with_post_process_python", 

293 "post_process_python_only", 

294 "post_process_getdp_only", 

295 "post_process", 

296 "postprocess_veusz", 

297 "plot_python" 

298 ]: 

299 Util.write_data_model_to_yaml( 

300 os.path.join(self.main_magnet.solution_folder, "postproc.yaml"), 

301 self.fdm.magnet.postproc, 

302 by_alias=True, 

303 with_comments=True, 

304 ) 

305 

306 try: 

307 run_type = self.fdm.run.type 

308 comments = self.fdm.run.comments 

309 if self.main_magnet.geom_folder is not None: 

310 geo_folder = os.path.relpath(self.main_magnet.geom_folder) 

311 geo_folder = os.path.relpath( 

312 geo_folder, os.path.join("tests", "_outputs") 

313 ) 

314 else: 

315 geo_folder = "-" 

316 

317 if self.main_magnet.mesh_folder is not None: 

318 mesh_folder = os.path.relpath(self.main_magnet.mesh_folder) 

319 mesh_folder = os.path.relpath( 

320 mesh_folder, os.path.join("tests", "_outputs") 

321 ) 

322 else: 

323 mesh_folder = "-" 

324 

325 if self.main_magnet.solution_folder is not None: 

326 solution_folder = os.path.relpath(self.main_magnet.solution_folder) 

327 solution_folder = os.path.relpath( 

328 solution_folder, os.path.join("tests", "_outputs") 

329 ) 

330 else: 

331 solution_folder = "-" 

332 

333 run_log_row = [ 

334 self.time_stamp, 

335 run_type, 

336 comments, 

337 geo_folder, 

338 mesh_folder, 

339 solution_folder, 

340 ] 

341 self.add_to_run_log( 

342 os.path.join(self.wrk_folder, "run_log.csv"), run_log_row 

343 ) 

344 except: 

345 self.logger.warning("Run log could not be completed.") 

346 

347 

348 def build_magnet(self): 

349 """ 

350 Main method to build magnets, i.e. to run various fiqus run types and magnet types 

351 :return: none 

352 :rtype: none 

353 """ 

354 if self.fdm.run.type == "start_from_yaml": 

355 self.main_magnet.generate_geometry() 

356 self.main_magnet.pre_process() 

357 self.main_magnet.load_geometry() 

358 for key, value in self.main_magnet.mesh().items(): 

359 self.summary[key] = value 

360 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp() 

361 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

362 self.summary[key] = value 

363 elif self.fdm.run.type == "pre_process_only": 

364 self.main_magnet.pre_process() 

365 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

366 self.summary[key] = value # todo: DISABLE FOR ONE GROUP ONLY 

367 elif self.fdm.run.type == "geometry_only": 

368 self.main_magnet.generate_geometry( 

369 gui=(self.main_magnet.fdm.run.launch_gui if self.fdm.magnet.type != "CCT_straight" else False) 

370 ) 

371 if self.fdm.magnet.type in ["CCT_straight", "CWS"]: 

372 self.main_magnet.pre_process(gui=self.main_magnet.fdm.run.launch_gui) 

373 elif self.fdm.run.type == "geometry_and_mesh": 

374 self.main_magnet.generate_geometry() 

375 self.main_magnet.pre_process() 

376 self.main_magnet.load_geometry() 

377 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items(): 

378 self.summary[key] = value 

379 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python": 

380 self.main_magnet.load_geometry() 

381 for key, value in self.main_magnet.mesh().items(): 

382 self.summary[key] = value 

383 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp() 

384 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

385 self.summary[key] = value 

386 elif self.fdm.run.type == "mesh_only": 

387 self.main_magnet.load_geometry() 

388 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items(): 

389 self.summary[key] = value 

390 elif self.fdm.run.type == "solve_with_post_process_python": 

391 self.summary["solution_time"] = ( 

392 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

393 ) 

394 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

395 self.summary[key] = value 

396 elif self.fdm.run.type == "solve_only": 

397 self.summary["solution_time"] = ( 

398 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

399 ) 

400 elif self.fdm.run.type == "postprocess_veusz": 

401 self.main_magnet.post_process_veusz(gui=self.main_magnet.fdm.run.launch_gui) 

402 elif self.fdm.run.type == "post_process_getdp_only": 

403 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

404 elif self.fdm.run.type == "post_process_python_only": 

405 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

406 self.summary[key] = value 

407 elif self.fdm.run.type == "post_process": 

408 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

409 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

410 self.summary[key] = value 

411 elif self.fdm.run.type == "plot_python": 

412 self.main_magnet.plot_python() 

413 

414 elif self.fdm.run.type == "batch_post_process_python": 

415 self.main_magnet.batch_post_process_python() 

416 os.chdir(self.start_folder) 

417 

418 if self.file_name: 

419 file_path = os.path.join(self.wrk_folder, f"{self.file_name}.json") 

420 with open(file_path, 'w', encoding='utf-8') as f: 

421 json.dump(self.summary, f, indent=2) 

422 

423 @staticmethod 

424 def add_to_run_log(path_to_csv, run_log_row): 

425 # If file does not exist, write the header 

426 if not os.path.isfile(path_to_csv): 

427 header = [ 

428 "Time Stamp", 

429 "Run Type", 

430 "Comments", 

431 "Geometry Directory", 

432 "Mesh Directory", 

433 "Solution Directory", 

434 ] 

435 with open(path_to_csv, "a", newline="") as csv_file: 

436 writer = csv.writer(csv_file) 

437 writer.writerow(header) 

438 

439 # Open the CSV file in append mode 

440 with open(path_to_csv, "a+", newline="") as csv_file: 

441 writer = csv.writer(csv_file) 

442 writer.writerow(run_log_row) 

443 

444 def change_htcondor_run_log(self, htcondor_csv_file, htcondor_jobid, new_status="None"): 

445 try: 

446 df = pd.read_csv(htcondor_csv_file) 

447 df.loc[df['Job ID'] == htcondor_jobid, 'Status'] = str(new_status) 

448 self.logger.info(f"Changed status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.") 

449 df.to_csv(htcondor_csv_file, index=False) 

450 except: 

451 self.logger.warning(f"Could not change status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.") 

452 

453if __name__ == "__main__": 

454 parser = argparse.ArgumentParser( 

455 prog="FiQuS", 

456 description="Finite Elements Quench Simulator", 

457 epilog="steam-team@cern.ch", 

458 ) 

459 parser.add_argument( 

460 dest="full_path_input", 

461 type=str, 

462 help="Full path to FiQuS input yaml file", 

463 ) 

464 parser.add_argument( 

465 "--output", '-o', dest="output_path", type=str, help="Full path to FiQuS output folder" 

466 ) 

467 parser.add_argument( 

468 "--getdp", '-g', dest="GetDP_path", type=str, help="Full path to GetDP executable" 

469 ) 

470 

471 parser.add_argument("--htcondor_jobid", '-j', type=int, default=0, 

472 help="HTCondor job ID (optional)", required=False) 

473 

474 parser.add_argument("--fiqus_data_model", '-m', type=str, 

475 help="Full path to FiQuS Data Model file (optional)", required=False) 

476 

477 parser.add_argument("--fiqus_data_settings", '-s', type=str, 

478 help="Full path to FiQuS Data Settings file (optional)", required=False) 

479 

480 args, unknown = parser.parse_known_args() 

481 

482 # remove these options from sys.argv, otherwise they are passed onto Gmsh 

483 # in Gmsh.initialize() 

484 options_to_remove = ["-o", "-g", "-j", "-m", "-s"] 

485 # Loop through and remove each option and its value 

486 i = 0 

487 while i < len(sys.argv): 

488 if sys.argv[i] in options_to_remove: 

489 sys.argv.pop(i) # Remove the option 

490 if i < len(sys.argv): 

491 sys.argv.pop(i) # Remove the associated value 

492 else: 

493 i += 1 

494 

495 if args.fiqus_data_model != None and args.fiqus_data_settings != None: 

496 # read fdm and fds from a file (HTCondor case) 

497 input_fdm = Util.read_data_from_yaml(args.fiqus_data_model, FDM) 

498 input_fds = Util.read_data_from_yaml(args.fiqus_data_settings, DataSettings) 

499 

500 MainFiQuS( 

501 input_file_path=args.full_path_input, 

502 model_folder=args.output_path, 

503 fdm=input_fdm, 

504 fds=input_fds, 

505 htcondor_jobid=args.htcondor_jobid 

506 ) 

507 else: 

508 # fdm and fds from input (STEAM SDK case) 

509 MainFiQuS( 

510 input_file_path=args.full_path_input, 

511 model_folder=args.output_path, 

512 GetDP_path=args.GetDP_path, 

513 ) 

514 print("FiQuS run completed")