Coverage for fiqus/MainFiQuS.py: 63%

241 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-14 02:37 +0100

1import argparse 

2import csv 

3import os 

4import pathlib 

5import sys 

6import time 

7import getpass 

8import platform 

9import subprocess 

10import json 

11 

12import pandas as pd 

13 

14FiQuS_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 

15sys.path.insert(0, FiQuS_path) 

16 

17from fiqus.utils.Utils import FilesAndFolders as Util 

18from fiqus.utils.Utils import CheckForExceptions as Check 

19from fiqus.utils.Utils import create_json_schema 

20from fiqus.utils.Utils import get_data_settings 

21from fiqus.utils.Utils import initialize_logger 

22from fiqus.data.DataFiQuS import FDM 

23from fiqus.data.DataSettings import DataSettings 

24from fiqus.mains.MainCCT import MainCCT 

25from fiqus.mains.MainMultipole import MainMultipole 

26from fiqus.mains.MainPancake3D import MainPancake3D 

27from fiqus.mains.MainConductorAC_Strand import MainConductorAC_Strand 

28 

29class MainFiQuS: 

30 """ 

31 This is the top level class of FiQuS. 

32 """ 

33 

34 def __init__( 

35 self, 

36 input_file_path: str = None, 

37 model_folder: str = None, 

38 GetDP_path=None, 

39 fdm=None, 

40 fds=None, 

41 htcondor_jobid=None 

42 ): 

43 """ 

44 Main class for working with FiQuS simulations 

45 :param input_file_path: full path to input file yaml 

46 :type input_file_path: str 

47 :param model_folder: full path to the base output folder, called model folder 

48 :type model_folder: str 

49 :param GetDP_path: full path to GetDP executable 

50 :type GetDP_path: str 

51 :param fdm: FiQuS Data Model - object of fiqus DataFiQus 

52 :type fdm: object 

53 :param fds: FiQuS Data Settings - object of DataSettings 

54 :type fds: object 

55 """ 

56 self.time_stamp = time.strftime("%Y-%m-%d-%H-%M-%S") 

57 

58 self.start_folder = os.getcwd() 

59 self.wrk_folder = model_folder 

60 self.file_name = None 

61 

62 # Load yaml input file 

63 if not fdm: 

64 self.fdm: FDM = Util.read_data_from_yaml(input_file_path, FDM) 

65 copyInputFile = ( 

66 "copy" 

67 f" {input_file_path} {os.path.join(self.wrk_folder, 'logs', f'INPUT_FILE_{self.time_stamp}.FiQuS.yaml')}" 

68 ) 

69 subprocess.run(copyInputFile, shell=True, stdout=subprocess.DEVNULL) 

70 else: 

71 self.fdm: FDM = fdm 

72 verbose = self.fdm.run.verbosity_FiQuS 

73 self.logger = initialize_logger( 

74 verbose=verbose, time_stamp=self.time_stamp, work_folder=self.wrk_folder 

75 ) 

76 if verbose: 

77 Util.print_welcome_graphics() 

78 # Intialize logger 

79 

80 # Create JSON schema 

81 create_json_schema(self.fdm) 

82 

83 # Check for input errors 

84 Check.check_inputs(run=self.fdm.run) 

85 

86 # Initialize Main object 

87 if self.fdm.magnet.type == "CCT_straight": 

88 self.main_magnet = MainCCT(fdm=self.fdm, verbose=verbose) 

89 elif self.fdm.magnet.type == "CWS": 

90 self.main_magnet = MainCWS( 

91 fdm=self.fdm, 

92 inputs_folder_path=pathlib.Path(input_file_path).parent, 

93 verbose=self.fdm.run.verbosity_FiQuS, 

94 ) 

95 elif self.fdm.magnet.type == "Pancake3D": 

96 self.main_magnet = MainPancake3D(fdm=self.fdm, verbose=verbose) 

97 elif self.fdm.magnet.type == "CACStrand": 

98 self.main_magnet = MainConductorAC_Strand(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, outputs_folder_path=model_folder, verbose=verbose) 

99 elif self.fdm.magnet.type == "CACRutherford": 

100 self.main_magnet = MainConductorAC_Rutherford(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose) 

101 elif self.fdm.magnet.type == "Racetrack": 

102 self.main_magnet = MainRacetrack(fdm=self.fdm, verbose=verbose) 

103 elif self.fdm.magnet.type == "Racetrack3D": 

104 self.main_magnet = MainRacetrack3D(fdm=self.fdm, inputs_folder_path=pathlib.Path(input_file_path).parent, verbose=verbose) 

105 elif self.fdm.magnet.type == "multipole": 

106 self.file_name = os.path.basename(input_file_path)[:-5] 

107 if not self.fdm.magnet.geometry.geom_file_path: 

108 self.fdm.magnet.geometry.geom_file_path = f"{input_file_path[:-5]}.geom" 

109 self.main_magnet = MainMultipole( 

110 fdm=self.fdm, 

111 rgd_path=self.fdm.magnet.geometry.geom_file_path, 

112 verbose=verbose, 

113 ) 

114 

115 else: 

116 raise ValueError( 

117 f"FiQuS does not support magnet type: {self.fdm.magnet.type}!" 

118 ) 

119 

120 # Load user paths for executables and additional files 

121 self.logger.info(f'{getpass.getuser()} is running on {platform.platform()}') 

122 if not fds: 

123 fds = get_data_settings(GetDP_path=GetDP_path) 

124 else: 

125 fds = get_data_settings(GetDP_path=GetDP_path, settings=fds) 

126 self.main_magnet.GetDP_path = fds.GetDP_path 

127 self.logger.info(f"{self.main_magnet.GetDP_path} is going to be used for FE solving.") 

128 

129 # self.logger.info(gmsh.onelab.run(self.fdm.general.magnet_name, f"{self.main_magnet.settings['GetDP_path']} -info")) 

130 

131 # update htcondor csv 

132 if htcondor_jobid: 

133 base_path_model_files = fds.base_path_model_files 

134 htcondor_csv_file = os.path.join(base_path_model_files, "htcondor_run_log.csv") 

135 

136 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Running") 

137 

138 # Save Model/Geometry/Mesh/Solution folder paths 

139 self.save_folders() 

140 

141 # Build magnet 

142 self.summary = dict.fromkeys( 

143 [ 

144 "SJ", 

145 "SICN", 

146 "SIGE", 

147 "Gamma", 

148 "nodes", 

149 "solution_time", 

150 "overall_error", 

151 "minimum_diff", 

152 "maximum_diff", 

153 ] 

154 ) 

155 

156 try: 

157 self.build_magnet() 

158 except Exception as e: 

159 # update htcondor csv 

160 if htcondor_jobid: 

161 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Failed") 

162 

163 self.logger.error(f"Error: {e}") 

164 raise e 

165 else: 

166 # update htcondor csv 

167 if htcondor_jobid: 

168 self.change_htcondor_run_log(htcondor_csv_file, htcondor_jobid, "Finished") 

169 

170 def save_folders(self): 

171 """ 

172 Method to make or delete folders of FiQuS 

173 :return: Nothing, only does file and folder operation 

174 :rtype: None 

175 """ 

176 def _check_and_generate_path(folder_type: str = None): 

177 if folder_type == "Geometry": 

178 folder = self.wrk_folder 

179 elif folder_type == "Mesh": 

180 folder = self.main_magnet.geom_folder 

181 elif folder_type == "Solution": 

182 folder = self.main_magnet.mesh_folder 

183 else: 

184 raise Exception("Incompatible type.") 

185 

186 if getattr(self.fdm.run, folder_type.lower()) is None: 

187 # folder_key is not given, so it is computed 

188 folder_key = Util.compute_folder_key( 

189 folder_type=folder_type, 

190 folder=folder, 

191 overwrite=self.fdm.run.overwrite, 

192 ) 

193 else: 

194 # folder_key is given 

195 folder_key = getattr(self.fdm.run, folder_type.lower()) 

196 

197 required_folder = folder_type in required_folders 

198 if self.fdm.run.overwrite and folder_type == ( 

199 required_folders[0] if required_folders else None 

200 ): 

201 Check.check_overwrite_conditions( 

202 folder_type=folder_type, folder=folder, folder_key=folder_key 

203 ) 

204 return Util.get_folder_path( 

205 folder_type=folder_type, 

206 folder=folder, 

207 folder_key=folder_key, 

208 overwrite=self.fdm.run.overwrite, 

209 required_folder=required_folder, 

210 ) 

211 

212 if self.fdm.run.type == "start_from_yaml": 

213 required_folders = ["Geometry", "Mesh", "Solution"] 

214 elif self.fdm.run.type == "geometry_and_mesh": 

215 required_folders = ["Geometry", "Mesh"] 

216 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python": 

217 required_folders = ["Mesh", "Solution"] 

218 elif self.fdm.run.type in ["solve_with_post_process_python", "solve_only"]: 

219 required_folders = ["Solution"] 

220 elif self.fdm.run.type == "geometry_only": 

221 required_folders = ( 

222 [] 

223 if self.fdm.run.geometry and not self.fdm.run.overwrite 

224 else ["Geometry"] 

225 ) 

226 elif self.fdm.run.type == "mesh_only": 

227 required_folders = ( 

228 [] if self.fdm.run.mesh and not self.fdm.run.overwrite else ["Mesh"] 

229 ) 

230 else: # post_process_getdp_only or post_process_python_only or plot_python 

231 required_folders = [] 

232 

233 fdm = self.main_magnet.fdm.magnet 

234 

235 self.main_magnet.geom_folder = _check_and_generate_path(folder_type="Geometry") 

236 if not self.fdm.run.type in ["geometry_only"]: 

237 self.main_magnet.mesh_folder = _check_and_generate_path(folder_type="Mesh") 

238 if not ( 

239 self.fdm.run.type == "geometry_only" 

240 or self.fdm.run.type == "mesh_only" 

241 ): 

242 self.main_magnet.solution_folder = _check_and_generate_path( 

243 folder_type="Solution" 

244 ) 

245 

246 if self.fdm.run.type in [ 

247 "start_from_yaml", 

248 "geometry_and_mesh", 

249 "geometry_only", 

250 ]: 

251 Util.write_data_model_to_yaml( 

252 os.path.join(self.main_magnet.geom_folder, "geometry.yaml"), 

253 fdm.geometry, 

254 by_alias=True, 

255 with_comments=True, 

256 ) 

257 if self.fdm.run.type in [ 

258 "start_from_yaml", 

259 "geometry_and_mesh", 

260 "mesh_and_solve_with_post_process_python", 

261 "mesh_only", 

262 ]: 

263 Util.write_data_model_to_yaml( 

264 os.path.join(self.main_magnet.mesh_folder, "mesh.yaml"), 

265 fdm.mesh, 

266 by_alias=True, 

267 with_comments=True, 

268 ) 

269 if self.fdm.run.type in [ 

270 "start_from_yaml", 

271 "mesh_and_solve_with_post_process_python", 

272 "solve_with_post_process_python", 

273 "solve_only", 

274 "post_process", 

275 "plot_python" 

276 ]: 

277 Util.write_data_model_to_yaml( 

278 os.path.join(self.main_magnet.solution_folder, "solve.yaml"), 

279 fdm.solve, 

280 by_alias=True, 

281 with_comments=True, 

282 ) 

283 if self.fdm.run.type in [ 

284 "start_from_yaml", 

285 "mesh_and_solve_with_post_process_python", 

286 "solve_with_post_process_python", 

287 "post_process_python_only", 

288 "post_process_getdp_only", 

289 "post_process", 

290 "plot_python" 

291 ]: 

292 Util.write_data_model_to_yaml( 

293 os.path.join(self.main_magnet.solution_folder, "postproc.yaml"), 

294 fdm.postproc, 

295 by_alias=True, 

296 with_comments=True, 

297 ) 

298 

299 try: 

300 run_type = self.fdm.run.type 

301 comments = self.fdm.run.comments 

302 if self.main_magnet.geom_folder is not None: 

303 geo_folder = os.path.relpath(self.main_magnet.geom_folder) 

304 geo_folder = os.path.relpath( 

305 geo_folder, os.path.join("tests", "_outputs") 

306 ) 

307 else: 

308 geo_folder = "-" 

309 

310 if self.main_magnet.mesh_folder is not None: 

311 mesh_folder = os.path.relpath(self.main_magnet.mesh_folder) 

312 mesh_folder = os.path.relpath( 

313 mesh_folder, os.path.join("tests", "_outputs") 

314 ) 

315 else: 

316 mesh_folder = "-" 

317 

318 if self.main_magnet.solution_folder is not None: 

319 solution_folder = os.path.relpath(self.main_magnet.solution_folder) 

320 solution_folder = os.path.relpath( 

321 solution_folder, os.path.join("tests", "_outputs") 

322 ) 

323 else: 

324 solution_folder = "-" 

325 

326 run_log_row = [ 

327 self.time_stamp, 

328 run_type, 

329 comments, 

330 geo_folder, 

331 mesh_folder, 

332 solution_folder, 

333 ] 

334 self.add_to_run_log( 

335 os.path.join(self.wrk_folder, "run_log.csv"), run_log_row 

336 ) 

337 except: 

338 self.logger.warning("Run log could not be completed.") 

339 

340 def build_magnet(self): 

341 """ 

342 Main method to build magnets, i.e. to run various fiqus run types and magnet types 

343 :return: none 

344 :rtype: none 

345 """ 

346 if self.fdm.run.type == "start_from_yaml": 

347 self.main_magnet.generate_geometry() 

348 self.main_magnet.pre_process() 

349 self.main_magnet.load_geometry() 

350 for key, value in self.main_magnet.mesh().items(): 

351 self.summary[key] = value 

352 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp() 

353 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

354 self.summary[key] = value 

355 elif self.fdm.run.type == "pre_process_only": 

356 self.main_magnet.pre_process() 

357 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

358 self.summary[key] = value # todo: DISABLE FOR ONE GROUP ONLY 

359 elif self.fdm.run.type == "geometry_only": 

360 self.main_magnet.generate_geometry( 

361 gui=(self.main_magnet.fdm.run.launch_gui if self.fdm.magnet.type != "CCT_straight" else False) 

362 ) 

363 if self.fdm.magnet.type in ["CCT_straight", "CWS"]: 

364 self.main_magnet.pre_process(gui=self.main_magnet.fdm.run.launch_gui) 

365 elif self.fdm.run.type == "geometry_and_mesh": 

366 self.main_magnet.generate_geometry() 

367 self.main_magnet.pre_process() 

368 self.main_magnet.load_geometry() 

369 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items(): 

370 self.summary[key] = value 

371 elif self.fdm.run.type == "mesh_and_solve_with_post_process_python": 

372 self.main_magnet.load_geometry() 

373 for key, value in self.main_magnet.mesh().items(): 

374 self.summary[key] = value 

375 self.summary["solution_time"] = self.main_magnet.solve_and_postprocess_getdp() 

376 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

377 self.summary[key] = value 

378 elif self.fdm.run.type == "mesh_only": 

379 self.main_magnet.load_geometry() 

380 for key, value in self.main_magnet.mesh(gui=self.main_magnet.fdm.run.launch_gui).items(): 

381 self.summary[key] = value 

382 elif self.fdm.run.type == "solve_with_post_process_python": 

383 self.summary["solution_time"] = ( 

384 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

385 ) 

386 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

387 self.summary[key] = value 

388 elif self.fdm.run.type == "solve_only": 

389 self.summary["solution_time"] = ( 

390 self.main_magnet.solve_and_postprocess_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

391 ) 

392 elif self.fdm.run.type == "post_process_getdp_only": 

393 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

394 elif self.fdm.run.type == "post_process_python_only": 

395 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

396 self.summary[key] = value 

397 elif self.fdm.run.type == "post_process": 

398 self.main_magnet.post_process_getdp(gui=self.main_magnet.fdm.run.launch_gui) 

399 for key, value in self.main_magnet.post_process_python(gui=self.main_magnet.fdm.run.launch_gui).items(): 

400 self.summary[key] = value 

401 elif self.fdm.run.type == "plot_python": 

402 self.main_magnet.plot_python() 

403 

404 elif self.fdm.run.type == "batch_post_process_python": 

405 self.main_magnet.batch_post_process_python() 

406 os.chdir(self.start_folder) 

407 

408 if self.file_name: json.dump(self.summary, open(f"{os.path.join(self.wrk_folder, self.file_name)}.json", 'w')) 

409 # mesh_par = self.fdm.magnet.mesh 

410 # if self.summary['solution_time']: 

411 # with open(r"C:\Users\avitrano\PycharmProjects\steam_sdk\tests\parsims\FiQuS_run\summary.dat", 'a') as f: 

412 # content = (f"{mesh_par.mesh_coil.SizeMin} {mesh_par.mesh_coil.SizeMax} {mesh_par.mesh_iron.SizeMin} {mesh_par.mesh_iron.SizeMax} " 

413 # f"{self.summary['solution_time']} {self.summary['overall_error']} {self.summary['overall_error'] * 0.999 + self.summary['solution_time'] * 0.001} " 

414 # f"{self.summary['SJ']} {self.summary['SICN']} {self.summary['SIGE']} {self.summary['Gamma']} " 

415 # f"{self.summary['nodes']} {self.summary['minimum_diff']} {self.summary['maximum_diff']}\n") 

416 # f.writelines(content) 

417 

418 @staticmethod 

419 def add_to_run_log(path_to_csv, run_log_row): 

420 # If file does not exist, write the header 

421 if not os.path.isfile(path_to_csv): 

422 header = [ 

423 "Time Stamp", 

424 "Run Type", 

425 "Comments", 

426 "Geometry Directory", 

427 "Mesh Directory", 

428 "Solution Directory", 

429 ] 

430 with open(path_to_csv, "a", newline="") as csv_file: 

431 writer = csv.writer(csv_file) 

432 writer.writerow(header) 

433 

434 # Open the CSV file in append mode 

435 with open(path_to_csv, "a+", newline="") as csv_file: 

436 writer = csv.writer(csv_file) 

437 writer.writerow(run_log_row) 

438 

439 def change_htcondor_run_log(self, htcondor_csv_file, htcondor_jobid, new_status="None"): 

440 try: 

441 df = pd.read_csv(htcondor_csv_file) 

442 df.loc[df['Job ID'] == htcondor_jobid, 'Status'] = str(new_status) 

443 self.logger.info(f"Changed status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.") 

444 df.to_csv(htcondor_csv_file, index=False) 

445 except: 

446 self.logger.warning(f"Could not change status of JobID {htcondor_jobid} to {new_status} in {htcondor_csv_file}.") 

447 

448if __name__ == "__main__": 

449 parser = argparse.ArgumentParser( 

450 prog="FiQuS", 

451 description="Finite Elements Quench Simulator", 

452 epilog="steam-team@cern.ch", 

453 ) 

454 parser.add_argument( 

455 dest="full_path_input", 

456 type=str, 

457 help="Full path to FiQuS input yaml file", 

458 ) 

459 parser.add_argument( 

460 "--output", '-o', dest="output_path", type=str, help="Full path to FiQuS output folder" 

461 ) 

462 parser.add_argument( 

463 "--getdp", '-g', dest="GetDP_path", type=str, help="Full path to GetDP executable" 

464 ) 

465 

466 parser.add_argument("--htcondor_jobid", '-j', type=int, default=0, 

467 help="HTCondor job ID (optional)", required=False) 

468 

469 parser.add_argument("--fiqus_data_model", '-m', type=str, 

470 help="Full path to FiQuS Data Model file (optional)", required=False) 

471 

472 parser.add_argument("--fiqus_data_settings", '-s', type=str, 

473 help="Full path to FiQuS Data Settings file (optional)", required=False) 

474 

475 args, unknown = parser.parse_known_args() 

476 

477 # remove these options from sys.argv, otherwise they are passed onto Gmsh 

478 # in Gmsh.initialize() 

479 options_to_remove = ["-o", "-g", "-j", "-m", "-s"] 

480 # Loop through and remove each option and its value 

481 i = 0 

482 while i < len(sys.argv): 

483 if sys.argv[i] in options_to_remove: 

484 sys.argv.pop(i) # Remove the option 

485 if i < len(sys.argv): 

486 sys.argv.pop(i) # Remove the associated value 

487 else: 

488 i += 1 

489 

490 if args.fiqus_data_model != None and args.fiqus_data_settings != None: 

491 # read fdm and fds from a file (HTCondor case) 

492 input_fdm = Util.read_data_from_yaml(args.fiqus_data_model, FDM) 

493 input_fds = Util.read_data_from_yaml(args.fiqus_data_settings, DataSettings) 

494 

495 MainFiQuS( 

496 input_file_path=args.full_path_input, 

497 model_folder=args.output_path, 

498 fdm=input_fdm, 

499 fds=input_fds, 

500 htcondor_jobid=args.htcondor_jobid 

501 ) 

502 else: 

503 # fdm and fds from input (STEAM SDK case) 

504 MainFiQuS( 

505 input_file_path=args.full_path_input, 

506 model_folder=args.output_path, 

507 GetDP_path=args.GetDP_path, 

508 ) 

509 print("FiQuS run completed")