Coverage for fiqus/MainFiQuS.py: 65%

244 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-12-19 01:48 +0000

1import argparse 

2import csv 

3import os 

4import pathlib 

5import sys 

6import time 

7import getpass 

8import platform 

9import subprocess 

10import json 

11 

12import pandas as pd 

13 

14FiQuS_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 

15sys.path.insert(0, FiQuS_path) 

16 

17from fiqus.utils.Utils import FilesAndFolders as Util 

18from fiqus.utils.Utils import CheckForExceptions as Check 

19from fiqus.utils.Utils import create_json_schema 

20from fiqus.utils.Utils import get_data_settings 

21from fiqus.utils.Utils import initialize_logger 

22from fiqus.data.DataFiQuS import FDM, SolveDumpDataModel 

23from fiqus.data.DataSettings import DataSettings 

24from fiqus.mains.MainCCT import MainCCT 

25from fiqus.mains.MainMultipole import MainMultipole 

26from fiqus.mains.MainPancake3D import MainPancake3D 

27from fiqus.mains.MainConductorAC_Strand import MainConductorAC_Strand 

28from fiqus.mains.MainHomogenizedConductor import MainHomogenizedConductor 

29from fiqus.mains.MainConductorAC_Rutherford import MainConductorAC_Rutherford 

30 

31class MainFiQuS: 

32 """ 

33 This is the top level class of FiQuS. 

34 """ 

35 

36 def __init__( 

37 self, 

38 input_file_path: str = None, 

39 model_folder: str = None, 

40 GetDP_path=None, 

41 fdm=None, 

42 fds=None, 

43 htcondor_jobid=None 

44 ): 

45 """ 

46 Main class for working with FiQuS simulations 

47 :param input_file_path: full path to input file yaml 

48 :type input_file_path: str 

49 :param model_folder: full path to the base output folder, called model folder 

50 :type model_folder: str 

51 :param GetDP_path: full path to GetDP executable 

52 :type GetDP_path: str 

53 :param fdm: FiQuS Data Model - object of fiqus DataFiQus 

54 :type fdm: object 

55 :param fds: FiQuS Data Settings - object of DataSettings 

56 :type fds: object 

57 """ 

58 self.time_stamp = time.strftime("%Y-%m-%d-%H-%M-%S") 

59 

60 self.start_folder = os.getcwd() 

61 self.wrk_folder = model_folder 

62 self.file_name = None 

63 

64 # Load yaml input file 

65 if not fdm: 

66 self.fdm: FDM = Util.read_data_from_yaml(input_file_path, FDM) 

67 copyInputFile = ( 

68 "copy" 

69 f" {input_file_path} {os.path.join(self.wrk_folder, 'logs', f'INPUT_FILE_{self.time_stamp}.FiQuS.yaml')}" 

70 ) 

71 subprocess.run(copyInputFile, shell=True, stdout=subprocess.DEVNULL) 

72 else: 

73 self.fdm: FDM = fdm 

74 verbose = self.fdm.run.verbosity_FiQuS 

75 self.logger = initialize_logger( 

76 verbose=verbose, time_stamp=self.time_stamp, work_folder=self.wrk_folder 

77 ) 

78 if verbose: 

79 Util.print_welcome_graphics() 

80 # Intialize logger 

81 

82 # Create JSON schema 

83 create_json_schema(self.fdm) 

84 

85 # Check for input errors 

86 Check.check_inputs(run=self.fdm.run) 

87 

88 # Initialize Main object 

89 if self.fdm.magnet.type == "CCT_straight": 

90 self.main_magnet = MainCCT(fdm=self.fdm, verbose=verbose) 

91 elif self.fdm.magnet.type == "Pancake3D": 

92 self.main_magnet = MainPancake3D(fdm=self.fdm, verbose=verbose) 

93 elif self.fdm.magnet.type == "CACStrand": 

94 self.main_magnet = MainConductorAC_Strand(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose) 

95 elif self.fdm.magnet.type == "HomogenizedConductor": 

96 self.main_magnet = MainHomogenizedConductor(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose) 

97 elif self.fdm.magnet.type == "CACRutherford": 

98 self.main_magnet = MainConductorAC_Rutherford(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose) 

99 elif self.fdm.magnet.type == "multipole": 

100 self.file_name = os.path.basename(input_file_path)[:-5] 

101 if not self.fdm.magnet.geometry.geom_file_path: 

102 self.fdm.magnet.geometry.geom_file_path = f"{input_file_path[:-5]}.geom" 

103 self.main_magnet = MainMultipole( 

104 fdm=self.fdm, 

105 rgd_path=self.fdm.magnet.geometry.geom_file_path, 

106 verbose=verbose, 

107 ) 

108 

109 else: 

110 raise ValueError( 

111 f"FiQuS does not support magnet type: {self.fdm.magnet.type}!" 

112 ) 

113 

114 # Load user paths for executables and additional files 

115 self.logger.info(f'{getpass.getuser()} is running on {platform.platform()}') 

116 if not fds: 

117 fds = get_data_settings(GetDP_path=GetDP_path) 

118 else: 

119 fds = get_data_settings(GetDP_path=GetDP_path, settings=fds) 

120 self.main_magnet.GetDP_path = fds.GetDP_path 

121 self.logger.info(f"{self.main_magnet.GetDP_path} is going to be used for FE solving.") 

122 

123 # self.logger.info(gmsh.onelab.run(self.fdm.general.magnet_name, f"{self.main_magnet.settings['GetDP_path']} -info")) 

124 

125 # update htcondor csv 

126 if htcondor_jobid: 

127 base_path_model_files = fds.base_path_model_files 

128 htcondor_csv_file = os.path.join(base_path_model_files, "htcondor_run_log.csv") 

129 

130 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Running") 

131 

132 # Save Model/Geometry/Mesh/Solution folder paths 

133 self.save_folders() 

134 

135 # Build magnet 

136 self.summary = dict.fromkeys( 

137 [ 

138 "SJ", 

139 "SICN", 

140 "SIGE", 

141 "Gamma", 

142 "nodes", 

143 "solution_time", 

144 "overall_error", 

145 "minimum_diff", 

146 "maximum_diff", 

147 ] 

148 ) 

149 

150 try: 

151 self.build_magnet() 

152 except Exception as e: 

153 # update htcondor csv 

154 if htcondor_jobid: 

155 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Failed") 

156 

157 self.logger.error(f"Error: {e}") 

158 raise e 

159 else: 

160 # update htcondor csv 

161 if htcondor_jobid: 

162 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Finished") 

163 

164 def save_folders(self): 

165 """ 

166 Method to make or delete folders of FiQuS 

167 :return: Nothing, only does file and folder operation 

168 :rtype: None 

169 """ 

170 def _check_and_generate_path(folder_type: str = None): 

171 if folder_type == "Geometry": 

172 folder = self.wrk_folder 

173 elif folder_type == "Mesh": 

174 folder = self.main_magnet.geom_folder 

175 elif folder_type == "Solution": 

176 folder = self.main_magnet.mesh_folder 

177 else: 

178 raise Exception("Incompatible type.") 

179 

180 if getattr(self.fdm.run, folder_type.lower()) is None: 

181 # folder_key is not given, so it is computed 

182 folder_key = Util.compute_folder_key( 

183 folder_type=folder_type, 

184 folder=folder, 

185 overwrite=self.fdm.run.overwrite, 

186 ) 

187 else: 

188 # folder_key is given 

189 folder_key = getattr(self.fdm.run, folder_type.lower()) 

190 

191 required_folder = folder_type in required_folders 

192 if self.fdm.run.overwrite and folder_type == ( 

193 required_folders[0] if required_folders else None 

194 ): 

195 Check.check_overwrite_conditions( 

196 folder_type=folder_type, folder=folder, folder_key=folder_key 

197 ) 

198 return Util.get_folder_path( 

199 folder_type=folder_type, 

200 folder=folder, 

201 folder_key=folder_key, 

202 overwrite=self.fdm.run.overwrite, 

203 required_folder=required_folder, 

204 ) 

205 

206 if self.fdm.run.type == "start_from_yaml": 

207 required_folders = ["Geometry", "Mesh", "Solution"] 

208 elif self.fdm.run.type == "geometry_and_mesh": 

209 required_folders = ["Geometry", "Mesh"] 

210 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python": 

211 required_folders = ["Mesh", "Solution"] 

212 elif self.fdm.run.type in ["solve_with_post_process_python", "solve_only"]: 

213 required_folders = ["Solution"] 

214 elif self.fdm.run.type == "geometry_only": 

215 required_folders = ( 

216 [] 

217 if self.fdm.run.geometry and not self.fdm.run.overwrite 

218 else ["Geometry"] 

219 ) 

220 elif self.fdm.run.type == "mesh_only": 

221 required_folders = ( 

222 [] if self.fdm.run.mesh and not self.fdm.run.overwrite else ["Mesh"] 

223 ) 

224 else: # post_process_getdp_only or post_process_python_only or plot_python 

225 required_folders = [] 

226 

227 

228 

229 self.main_magnet.geom_folder = _check_and_generate_path(folder_type="Geometry") 

230 if not self.fdm.run.type in ["geometry_only"]: 

231 self.main_magnet.mesh_folder = _check_and_generate_path(folder_type="Mesh") 

232 if not ( 

233 self.fdm.run.type == "geometry_only" 

234 or self.fdm.run.type == "mesh_only" 

235 ): 

236 self.main_magnet.solution_folder = _check_and_generate_path( 

237 folder_type="Solution" 

238 ) 

239 

240 if self.fdm.run.type in [ 

241 "start_from_yaml", 

242 "geometry_and_mesh", 

243 "geometry_only", 

244 ]: 

245 Util.write_data_model_to_yaml( 

246 os.path.join(self.main_magnet.geom_folder, "geometry.yaml"), 

247 self.fdm.magnet.geometry, 

248 by_alias=True, 

249 with_comments=True, 

250 ) 

251 if self.fdm.run.type in [ 

252 "start_from_yaml", 

253 "geometry_and_mesh", 

254 "mesh_and_solve_with_post_process_python", 

255 "mesh_only", 

256 ]: 

257 Util.write_data_model_to_yaml( 

258 os.path.join(self.main_magnet.mesh_folder, "mesh.yaml"), 

259 self.fdm.magnet.mesh, 

260 by_alias=True, 

261 with_comments=True, 

262 ) 

263 if self.fdm.run.type in [ 

264 "start_from_yaml", 

265 "mesh_and_solve_with_post_process_python", 

266 "solve_with_post_process_python", 

267 "solve_only", 

268 "post_process", 

269 "plot_python", 

270 "postprocess_veusz" 

271 ]: 

272 solve_dump_data = SolveDumpDataModel( 

273 solve=self.fdm.magnet.solve, 

274 circuit=self.fdm.circuit, 

275 power_supply=self.fdm.power_supply, 

276 quench_protection=self.fdm.quench_protection, 

277 quench_detection=self.fdm.quench_detection, 

278 conductors=self.fdm.conductors 

279 ) 

280 Util.write_data_model_to_yaml( 

281 os.path.join(self.main_magnet.solution_folder, "solve.yaml"), 

282 solve_dump_data, 

283 by_alias=True, 

284 with_comments=True, 

285 ) 

286 if self.fdm.run.type in [ 

287 "start_from_yaml", 

288 "mesh_and_solve_with_post_process_python", 

289 "solve_with_post_process_python", 

290 "post_process_python_only", 

291 "post_process_getdp_only", 

292 "post_process", 

293 "postprocess_veusz", 

294 "plot_python" 

295 ]: 

296 Util.write_data_model_to_yaml( 

297 os.path.join(self.main_magnet.solution_folder, "postproc.yaml"), 

298 self.fdm.magnet.postproc, 

299 by_alias=True, 

300 with_comments=True, 

301 ) 

302 

303 try: 

304 run_type = self.fdm.run.type 

305 comments = self.fdm.run.comments 

306 if self.main_magnet.geom_folder is not None: 

307 geo_folder = os.path.relpath(self.main_magnet.geom_folder) 

308 geo_folder = os.path.relpath( 

309 geo_folder, os.path.join("tests", "_outputs") 

310 ) 

311 else: 

312 geo_folder = "-" 

313 

314 if self.main_magnet.mesh_folder is not None: 

315 mesh_folder = os.path.relpath(self.main_magnet.mesh_folder) 

316 mesh_folder = os.path.relpath( 

317 mesh_folder, os.path.join("tests", "_outputs") 

318 ) 

319 else: 

320 mesh_folder = "-" 

321 

322 if self.main_magnet.solution_folder is not None: 

323 solution_folder = os.path.relpath(self.main_magnet.solution_folder) 

324 solution_folder = os.path.relpath( 

325 solution_folder, os.path.join("tests", "_outputs") 

326 ) 

327 else: 

328 solution_folder = "-" 

329 

330 run_log_row = [ 

331 self.time_stamp, 

332 run_type, 

333 comments, 

334 geo_folder, 

335 mesh_folder, 

336 solution_folder, 

337 ] 

338 self.add_to_run_log( 

339 os.path.join(self.wrk_folder, "run_log.csv"), run_log_row 

340 ) 

341 except: 

342 self.logger.warning("Run log could not be completed.") 

343 

344 

345 def build_magnet(self): 

346 """ 

347 Main method to build magnets, i.e. to run various fiqus run types and magnet types 

348 :return: none 

349 :rtype: none 

350 """ 

351 if self.fdm.run.type == "start_from_yaml": 

352 self.main_magnet.generate_geometry() 

353 self.main_magnet.pre_process() 

354 self.main_magnet.load_geometry() 

355 for key, value in self.main_magnet.mesh().items(): 

356 self.summary[key] = value 

357 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp() 

358 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

359 self.summary[key] = value 

360 elif self.fdm.run.type == "pre_process_only": 

361 self.main_magnet.pre_process() 

362 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

363 self.summary[key] = value # todo: DISABLE FOR ONE GROUP ONLY 

364 elif self.fdm.run.type == "geometry_only": 

365 self.main_magnet.generate_geometry( 

366 gui=(self.main_magnet.fdm.run.launch_gui if self.fdm.magnet.type != "CCT_straight" else False) 

367 ) 

368 if self.fdm.magnet.type in ["CCT_straight", "CWS"]: 

369 self.main_magnet.pre_process(gui=self.main_magnet.fdm.run.launch_gui) 

370 elif self.fdm.run.type == "geometry_and_mesh": 

371 self.main_magnet.generate_geometry() 

372 self.main_magnet.pre_process() 

373 self.main_magnet.load_geometry() 

374 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items(): 

375 self.summary[key] = value 

376 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python": 

377 self.main_magnet.load_geometry() 

378 for key, value in self.main_magnet.mesh().items(): 

379 self.summary[key] = value 

380 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp() 

381 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

382 self.summary[key] = value 

383 elif self.fdm.run.type == "mesh_only": 

384 self.main_magnet.load_geometry() 

385 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items(): 

386 self.summary[key] = value 

387 elif self.fdm.run.type == "solve_with_post_process_python": 

388 self.summary["solution_time"] = ( 

389 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

390 ) 

391 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

392 self.summary[key] = value 

393 elif self.fdm.run.type == "solve_only": 

394 self.summary["solution_time"] = ( 

395 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

396 ) 

397 elif self.fdm.run.type == "postprocess_veusz": 

398 self.main_magnet.post_process_veusz(gui=self.main_magnet.fdm.run.launch_gui) 

399 elif self.fdm.run.type == "post_process_getdp_only": 

400 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

401 elif self.fdm.run.type == "post_process_python_only": 

402 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

403 self.summary[key] = value 

404 elif self.fdm.run.type == "post_process": 

405 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

406 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

407 self.summary[key] = value 

408 elif self.fdm.run.type == "plot_python": 

409 self.main_magnet.plot_python() 

410 

411 elif self.fdm.run.type == "batch_post_process_python": 

412 self.main_magnet.batch_post_process_python() 

413 os.chdir(self.start_folder) 

414 

415 if self.file_name: 

416 file_path = os.path.join(self.wrk_folder, f"{self.file_name}.json") 

417 with open(file_path, 'w', encoding='utf-8') as f: 

418 json.dump(self.summary, f, indent=2) 

419 

420 @staticmethod 

421 def add_to_run_log(path_to_csv, run_log_row): 

422 # If file does not exist, write the header 

423 if not os.path.isfile(path_to_csv): 

424 header = [ 

425 "Time Stamp", 

426 "Run Type", 

427 "Comments", 

428 "Geometry Directory", 

429 "Mesh Directory", 

430 "Solution Directory", 

431 ] 

432 with open(path_to_csv, "a", newline="") as csv_file: 

433 writer = csv.writer(csv_file) 

434 writer.writerow(header) 

435 

436 # Open the CSV file in append mode 

437 with open(path_to_csv, "a+", newline="") as csv_file: 

438 writer = csv.writer(csv_file) 

439 writer.writerow(run_log_row) 

440 

441 def change_htcondor_run_log(self, htcondor_csv_file, htcondor_jobid, new_status="None"): 

442 try: 

443 df = pd.read_csv(htcondor_csv_file) 

444 df.loc[df['Job ID'] == htcondor_jobid, 'Status'] = str(new_status) 

445 self.logger.info(f"Changed status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.") 

446 df.to_csv(htcondor_csv_file, index=False) 

447 except: 

448 self.logger.warning(f"Could not change status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.") 

449 

450if __name__ == "__main__": 

451 parser = argparse.ArgumentParser( 

452 prog="FiQuS", 

453 description="Finite Elements Quench Simulator", 

454 epilog="steam-team@cern.ch", 

455 ) 

456 parser.add_argument( 

457 dest="full_path_input", 

458 type=str, 

459 help="Full path to FiQuS input yaml file", 

460 ) 

461 parser.add_argument( 

462 "--output", '-o', dest="output_path", type=str, help="Full path to FiQuS output folder" 

463 ) 

464 parser.add_argument( 

465 "--getdp", '-g', dest="GetDP_path", type=str, help="Full path to GetDP executable" 

466 ) 

467 

468 parser.add_argument("--htcondor_jobid", '-j', type=int, default=0, 

469 help="HTCondor job ID (optional)", required=False) 

470 

471 parser.add_argument("--fiqus_data_model", '-m', type=str, 

472 help="Full path to FiQuS Data Model file (optional)", required=False) 

473 

474 parser.add_argument("--fiqus_data_settings", '-s', type=str, 

475 help="Full path to FiQuS Data Settings file (optional)", required=False) 

476 

477 args, unknown = parser.parse_known_args() 

478 

479 # remove these options from sys.argv, otherwise they are passed onto Gmsh 

480 # in Gmsh.initialize() 

481 options_to_remove = ["-o", "-g", "-j", "-m", "-s"] 

482 # Loop through and remove each option and its value 

483 i = 0 

484 while i < len(sys.argv): 

485 if sys.argv[i] in options_to_remove: 

486 sys.argv.pop(i) # Remove the option 

487 if i < len(sys.argv): 

488 sys.argv.pop(i) # Remove the associated value 

489 else: 

490 i += 1 

491 

492 if args.fiqus_data_model != None and args.fiqus_data_settings != None: 

493 # read fdm and fds from a file (HTCondor case) 

494 input_fdm = Util.read_data_from_yaml(args.fiqus_data_model, FDM) 

495 input_fds = Util.read_data_from_yaml(args.fiqus_data_settings, DataSettings) 

496 

497 MainFiQuS( 

498 input_file_path=args.full_path_input, 

499 model_folder=args.output_path, 

500 fdm=input_fdm, 

501 fds=input_fds, 

502 htcondor_jobid=args.htcondor_jobid 

503 ) 

504 else: 

505 # fdm and fds from input (STEAM SDK case) 

506 MainFiQuS( 

507 input_file_path=args.full_path_input, 

508 model_folder=args.output_path, 

509 GetDP_path=args.GetDP_path, 

510 ) 

511 print("FiQuS run completed")