Coverage for fiqus/utils/Utils.py: 78%

438 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-02-01 01:38 +0000

1import sys 

2import os 

3import getpass 

4import platform 

5import shutil 

6import logging 

7import re 

8 

9import numpy as np 

10from pathlib import Path 

11from time import sleep 

12import multiprocessing 

13 

14import pandas as pd 

15import ruamel.yaml 

16import json 

17import gmsh 

18from pydantic import BaseModel 

19 

20from fiqus.data.DataSettings import DataSettings 

21from fiqus.data.DataFiQuS import FDM 

22 

23logger = logging.getLogger('FiQuS') 

24 

25 

26class LoggingFormatter(logging.Formatter): 

27 """ 

28 Logging formatter class 

29 """ 

30 grey = "\x1b[38;20m" # debug level 

31 white = "\x1b[37;20m" # info level 

32 yellow = "\x1b[33;20m" # warning level 

33 red = "\x1b[31;20m" # error level 

34 bold_red = "\x1b[31;1m" # critical level 

35 

36 reset = "\x1b[0m" 

37 format = '%(asctime)s | %(levelname)s | %(message)s' 

38 

39 FORMATS = { 

40 logging.DEBUG: grey + format + reset, 

41 logging.INFO: white + format + reset, 

42 logging.WARNING: yellow + format + reset, 

43 logging.ERROR: red + format + reset, 

44 logging.CRITICAL: bold_red + format + reset 

45 } 

46 

47 def format(self, record): 

48 log_fmt = self.FORMATS.get(record.levelno) 

49 formatter = logging.Formatter(log_fmt) 

50 return formatter.format(record) 

51 

52 

53class FilesAndFolders: 

54 @staticmethod 

55 def read_data_from_yaml(full_file_path, data_class): 

56 with open(full_file_path, 'r') as stream: 

57 yaml = ruamel.yaml.YAML(typ='safe', pure=True) 

58 yaml_str = yaml.load(stream) 

59 if "magnet" in yaml_str: 

60 yaml_str["magnet"]["input_file_path"] = str(full_file_path) 

61 

62 return data_class(**yaml_str) 

63 

64 @staticmethod 

65 def write_data_to_yaml(full_file_path, dict_of_data_class, list_exceptions=[]): 

66 def my_represent_none(self, data): 

67 """ 

68 Change data representation from empty string to "null" string 

69 """ 

70 return self.represent_scalar('tag:yaml.org,2002:null', 'null') 

71 

72 def flist(x): 

73 """ 

74 Define a commented sequence to allow writing a list in a single row 

75 """ 

76 retval = ruamel.yaml.comments.CommentedSeq(x) 

77 retval.fa.set_flow_style() # fa -> format attribute 

78 return retval 

79 

80 def list_single_row_recursively(data_dict: dict, exceptions: list): 

81 """ 

82 Write lists in a single row 

83 :param data_dict: Dictionary to edit 

84 :param exceptions: List of strings defining keys that will not be written 

85 in a single row 

86 :return: 

87 """ 

88 for key, value in data_dict.items(): 

89 if isinstance(value, list) and (key not in exceptions): 

90 data_dict[key] = flist(value) 

91 elif isinstance(value, np.ndarray): 

92 data_dict[key] = flist(value.tolist()) 

93 elif isinstance(value, dict): 

94 data_dict[key] = list_single_row_recursively(value, exceptions) 

95 

96 return data_dict 

97 

98 yaml = ruamel.yaml.YAML() 

99 yaml.default_flow_style = False 

100 yaml.emitter.alt_null = 'Null' 

101 yaml.representer.add_representer(type(None), my_represent_none) 

102 dict_of_data_class = list_single_row_recursively(dict_of_data_class, exceptions=list_exceptions) 

103 with open(full_file_path, 'w') as yaml_file: 

104 yaml.dump(dict_of_data_class, yaml_file) 

105 

106 @staticmethod 

107 def write_data_model_to_yaml(full_file_path, data_model, with_comments=True, by_alias=True): 

108 if isinstance(data_model, BaseModel): 

109 # Set up YAML instance settings: 

110 yamlInstance = ruamel.yaml.YAML() 

111 

112 # Convert the model_data to a ruamel.yaml object/dictionary: 

113 if with_comments: 

114 path_object = Path(full_file_path) 

115 # Add pydantic descriptions to the yaml file as comments: 

116 dummy_yaml_file_to_create_ruamel_object = ( 

117 path_object.resolve().parent.joinpath("dummy.yaml") 

118 ) 

119 with open(dummy_yaml_file_to_create_ruamel_object, "w") as stream: 

120 yamlInstance.dump(data_model.model_dump(by_alias=by_alias), stream) 

121 

122 # Read the file: 

123 with open(dummy_yaml_file_to_create_ruamel_object, "r") as stream: 

124 # Read the yaml file and store the date inside ruamel_yaml_object: 

125 # ruamel_yaml_object is a special object that stores both the data and 

126 # comments. Even though the data might be changed or added, the same 

127 # object will be used to create the new YAML file to store the comments. 

128 ruamel_yaml_object = yamlInstance.load( 

129 dummy_yaml_file_to_create_ruamel_object 

130 ) 

131 

132 os.remove(dummy_yaml_file_to_create_ruamel_object) 

133 

134 def iterate_fields(model, ruamel_yaml_object): 

135 for currentPydanticKey, value in model.model_fields.items(): 

136 if value.alias and by_alias: 

137 currentDictionaryKey = value.alias 

138 else: 

139 currentDictionaryKey = currentPydanticKey 

140 

141 if value.description: 

142 ruamel_yaml_object.yaml_add_eol_comment( 

143 value.description, 

144 currentDictionaryKey, 

145 ) 

146 

147 if hasattr(getattr(model, currentPydanticKey), "model_fields"): 

148 new_ruamel_yaml_object = iterate_fields( 

149 getattr(model, currentPydanticKey), 

150 ruamel_yaml_object[currentDictionaryKey], 

151 ) 

152 

153 ruamel_yaml_object[currentDictionaryKey] = new_ruamel_yaml_object 

154 

155 elif isinstance(getattr(model, currentPydanticKey), list): 

156 for i, item in enumerate(getattr(model, currentPydanticKey)): 

157 if hasattr(item, "model_fields"): 

158 new_ruamel_yaml_object = iterate_fields( 

159 item, 

160 ruamel_yaml_object[currentDictionaryKey][i], 

161 ) 

162 

163 ruamel_yaml_object[currentDictionaryKey][i] = new_ruamel_yaml_object 

164 

165 return ruamel_yaml_object 

166 

167 iterate_fields(data_model, ruamel_yaml_object) 

168 for currentPydanticKey, value in data_model.model_fields.items(): 

169 if value.alias and by_alias: 

170 currentDictionaryKey = value.alias 

171 else: 

172 currentDictionaryKey = currentPydanticKey 

173 

174 if hasattr(getattr(data_model, currentPydanticKey), "model_fields"): 

175 ruamel_yaml_object[currentDictionaryKey] = iterate_fields( 

176 getattr(data_model, currentPydanticKey), 

177 ruamel_yaml_object[currentDictionaryKey], 

178 ) 

179 

180 data_dict = ruamel_yaml_object 

181 

182 else: 

183 data_dict = data_model.model_dump(by_alias=by_alias) 

184 

185 yamlInstance.indent(sequence=4, offset=2) 

186 with open(full_file_path, 'w') as yaml_file: 

187 yamlInstance.dump(data_dict, yaml_file) 

188 

189 @staticmethod 

190 def prep_folder(folder_full_path, clear: bool = False): 

191 if clear: 

192 if os.path.exists(folder_full_path): 

193 shutil.rmtree(folder_full_path) # delete directory 

194 if not os.path.exists(folder_full_path): 

195 os.makedirs(folder_full_path) # make new directory 

196 

197 @staticmethod 

198 def get_folder_path(folder_type, folder, folder_key, overwrite, required_folder): 

199 """ 

200 Method for ... 

201 :param folder_type: 

202 :type folder_type: 

203 :param folder: 

204 :type folder: 

205 :param folder_key: 

206 :type folder_key: 

207 :param overwrite: 

208 :type overwrite: 

209 :param required_folder: 

210 :type required_folder: 

211 :return: 

212 :rtype: 

213 """ 

214 if required_folder and not (folder_key and overwrite): 

215 all_dirs = [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()] 

216 all_relevant_dirs = [x for x in all_dirs if x.startswith(f"{folder_type}_{folder_key}")] 

217 if f"{folder_type}_{folder_key}" in all_relevant_dirs: 

218 new_folder_key = f"{folder_key}_{len(all_relevant_dirs) + 1}" 

219 folder_key = new_folder_key 

220 

221 folder_path = os.path.join(folder, folder_type + '_' + str(folder_key)) 

222 # Disable the line below to avoid deleting the folder # TODO: add logic to control this at a higher level 

223 FilesAndFolders.prep_folder(folder_path, overwrite and required_folder) 

224 return folder_path 

225 

226 @staticmethod 

227 def compute_folder_key(folder_type, folder, overwrite): 

228 # Find all the directories in the folder 

229 all_dirs = [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()] 

230 

231 # Find all the directiories that start with the folder_type (e.g. geometry, mesh, solution) 

232 # Then combine them into a single string with a custom seperator (se@p) 

233 # Seperators are used to guarantee the directories can be split later 

234 all_relevant_dirs = " se@p ".join([x for x in all_dirs if x.startswith(f"{folder_type}_")]) 

235 all_relevant_dirs = f"{all_relevant_dirs} se@p " 

236 

237 # Find all the integer keys in the relevant directories 

238 integers_in_relevant_dirs = re.findall(rf'{folder_type}_(\d+) se@p ', all_relevant_dirs) 

239 

240 if integers_in_relevant_dirs is None: 

241 # If there are no integers in the relevant directories, set the key to 1 

242 folder_key = 1 

243 else: 

244 # Make a list of integers out of the integers in the relevant directories 

245 integers_in_relevant_dirs = [int(x) for x in integers_in_relevant_dirs] 

246 

247 # Sort the integers in the relevant directories 

248 integers_in_relevant_dirs.sort() 

249 

250 if overwrite: 

251 # If overwrite is true, set the key to the largest integer in the 

252 # so that the folder with the largest integer key is overwritten 

253 if len(integers_in_relevant_dirs) == 0: 

254 folder_key = 1 

255 else: 

256 folder_key = max(integers_in_relevant_dirs) 

257 else: 

258 # If overwrite is false, then find the smallest integer key that is not 

259 # in the list of integers in the relevant directories 

260 folder_key = 1 

261 for i in integers_in_relevant_dirs: 

262 if folder_key < i: 

263 break 

264 folder_key += 1 

265 

266 return folder_key 

267 

268 @staticmethod 

269 def print_welcome_graphics(): 

270 logger.info(r" _____ _ ___ ____ ") 

271 logger.info(r"| ___(_)/ _ \ _ _/ ___| ") 

272 logger.info(r"| |_ | | | | | | | \___ \ ") 

273 logger.info(r"| _| | | |_| | |_| |___) |") 

274 logger.info(r"|_| |_|\__\_\\__,_|____/ ") 

275 logger.info("") 

276 

277 

278class CheckForExceptions: 

279 

280 @staticmethod 

281 def check_inputs(run): # RunFiQuS() 

282 # """ 

283 # This method raises errors when geometry, mesh or solution folders inputs are incorrect. Warnings are disabled as a trial. 

284 # :param run: FDM.run object 

285 # :type run: FDM.run 

286 # """ 

287 if run.type == 'start_from_yaml': 

288 pass 

289 # if run.geometry and not run.overwrite: 

290 # warnings.warn("Warning: Geometry folder is needed only if it has to be overwritten. Ignoring it...") 

291 # if run.solution or run.mesh: 

292 # warnings.warn("Warning: Mesh and Solution folders are not needed. Ignoring them...") 

293 # elif run.type == 'geometry_only': 

294 # if run.solution or run.mesh: 

295 # warnings.warn("Warning: Mesh and Solution folders are not needed. Ignoring them...") 

296 # elif run.type == 'geometry_and_mesh': 

297 # if run.geometry and not run.overwrite: 

298 # warnings.warn("Warning: Geometry folder is needed only if it has to be overwritten. Ignoring it...") 

299 # if run.mesh: 

300 # warnings.warn("Warning: Mesh folder is not needed. Ignoring it...") 

301 elif run.type == 'mesh_and_solve_with_post_process': 

302 if not run.geometry: 

303 raise Exception('Full path to Geometry not provided. ' 

304 'Insert options -> reference_files -> geometry.') 

305 # if run.mesh and not run.overwrite: 

306 # warnings.warn("Warning: Mesh folder is needed only if it has to be overwritten. Ignoring it...") 

307 # if run.solution: 

308 # warnings.warn("Warning: Solution folder is not needed. Ignoring it...") 

309 elif run.type == 'mesh_only': 

310 if not run.geometry: 

311 raise Exception('Full path to Mesh not provided. ' 

312 'Insert options -> reference_files -> geometry.') 

313 # if run.solution: 

314 # warnings.warn("Warning: Solution folder is not needed. Ignoring it...") 

315 elif run.type == 'solve_with_post_process': 

316 if not run.mesh or not run.geometry: 

317 raise Exception('Full path to Mesh not provided. ' 

318 'Insert options -> reference_files -> geometry and mesh.') 

319 # if run.solution and not run.overwrite: 

320 # warnings.warn("Warning: Solution folder is needed only if it has to be overwritten. Ignoring it...") 

321 elif run.type == 'solve_only': 

322 if not run.mesh or not run.geometry: 

323 raise Exception('Full path to Mesh not provided. ' 

324 'Insert options -> reference_files -> geometry and mesh.') 

325 # if run.solution and not run.overwrite: 

326 # warnings.warn("Warning: Solution folder is needed only if it has to be overwritten. Ignoring it...") 

327 elif run.type == 'post_process_only': 

328 if not run.mesh or not run.geometry or not run.solution: 

329 raise Exception('Full path to Solution not provided. ' 

330 'Insert options -> reference_files -> geometry, mesh, and solution.') 

331 

332 @staticmethod 

333 def check_overwrite_conditions(folder_type, folder, folder_key): 

334 """ 

335 This method prints warning related to overwrite conditions settings. This is disabled as a trial. 

336 :param folder_type: 

337 :type folder_type: 

338 :param folder: 

339 :type folder: 

340 :param folder_key: 

341 :type folder_key: 

342 """ 

343 pass 

344 # if folder_key: 

345 # if not os.path.exists(os.path.join(folder, folder_type + '_' + str(folder_key))): 

346 # warnings.warn( 

347 # f'The folder {folder_type}_{folder_key} does not exist. Creating it...') 

348 # else: 

349 # warnings.warn( 

350 # f'Reference number of the folder {folder_type} not provided. ' 

351 # f'Overwriting the latest {folder_type} folder...') 

352 

353 

354class GeometricFunctions: 

355 

356 @staticmethod 

357 def sig_dig(n, precision=8): 

358 return float(np.format_float_positional(n, precision=precision)) 

359 

360 @staticmethod 

361 def points_distance(a, b): 

362 """ 

363 Computes the distance between two points a and b 

364 :param a: list of x and y coordinates 

365 :param b: list of x and y coordinates 

366 """ 

367 a = np.array(a) 

368 b = np.array(b) 

369 return np.linalg.norm(a - b) 

370 

371 @staticmethod 

372 def line_through_two_points(point1, point2): 

373 """ 

374 Finds coefficients of the line through two points [x1,y1] and [x2,y2] 

375 :param point1: 2-element list defining x/y positions of the 1st point 

376 :param point2: 2-element list defining x/y positions of the 2nd point 

377 :return: 3-element list defining the A, B, and C coefficients of the line, as in: A*x + B*y + C = 0 

378 """ 

379 x1, y1 = point1[0], point1[1] 

380 x2, y2 = point2[0], point2[1] 

381 if x2 == x1: 

382 A = 1 

383 B = 0 

384 C = - x1 

385 elif y2 == y1: 

386 A = 0 

387 B = 1 

388 C = - y1 

389 else: 

390 A = - (y2 - y1) / (x2 - x1) 

391 B = + 1 

392 C = - (x2 * y1 - x1 * y2) / (x2 - x1) 

393 return [float(A), float(B), float(C)] 

394 

395 @staticmethod 

396 def centroid(X, Y): 

397 """ 

398 Computes the centroid coordinates of a non-self-intersecting closed polygon 

399 :param X: list of x coordinate of the vertices 

400 :param Y: list of y coordinate of the vertices 

401 """ 

402 sum_A, sum_Cx, sum_Cy = 0, 0, 0 

403 for i in range(len(X)): 

404 index = i + 1 if i != len(X) - 1 else 0 

405 A = X[i] * Y[index] - X[index] * Y[i] 

406 sum_Cx += (X[i] + X[index]) * A 

407 sum_Cy += (Y[i] + Y[index]) * A 

408 sum_A += A 

409 factor = 1 / (3 * sum_A) 

410 return [factor * sum_Cx, factor * sum_Cy] 

411 

412 @staticmethod 

413 def arc_center_from_3_points(a, b, c): 

414 """ 

415 Computes the center coordinates of an arc passing through three points 

416 :param a: list of x and y coordinates of one arc point 

417 :param b: list of x and y coordinates of one arc point 

418 :param c: list of x and y coordinates of one arc point 

419 """ 

420 ab = [a[0] - b[0], a[1] - b[1]] 

421 ac = [a[0] - c[0], a[1] - c[1]] 

422 sac = [a[0] * a[0] - c[0] * c[0], a[1] * a[1] - c[1] * c[1]] 

423 sba = [b[0] * b[0] - a[0] * a[0], b[1] * b[1] - a[1] * a[1]] 

424 yy = (sac[0] * ab[0] + sac[1] * ab[0] + sba[0] * ac[0] + sba[1] * ac[0]) / \ 

425 (2 * ((c[1] - a[1]) * ab[0] - (b[1] - a[1]) * ac[0])) 

426 xx = (sac[0] * ab[1] + sac[1] * ab[1] + sba[0] * ac[1] + sba[1] * ac[1]) / \ 

427 (2 * ((c[0] - a[0]) * ab[1] - (b[0] - a[0]) * ac[1])) 

428 return [-xx, -yy] 

429 

430 @staticmethod 

431 def corrected_arc_center(C, pnt1, pnt2): 

432 """ 

433 Computes the center coordinates of an arc from two points and a guessed center 

434 :param C: list of x and y coordinates of guessed center 

435 :param pnt1: list of x and y coordinates of first arc point 

436 :param pnt2: list of x and y coordinates of second arc point 

437 """ 

438 if pnt1[1] < 0: 

439 pnt_tmp = pnt1.copy() 

440 pnt1 = pnt2.copy() 

441 pnt2 = pnt_tmp 

442 radius = (np.sqrt(np.square(pnt1[0] - C[0]) + np.square(pnt1[1] - C[1])) + 

443 np.sqrt(np.square(pnt2[0] - C[0]) + np.square(pnt2[1] - C[1]))) / 2 

444 d = [0.5 * abs((pnt2[0] - pnt1[0])), 0.5 * abs((pnt1[1] - pnt2[1]))] 

445 aa = np.sqrt(np.square(d[0]) + np.square(d[1])) 

446 bb = np.sqrt(np.square(radius) - np.square(aa)) 

447 M = [pnt1[0] + d[0]] 

448 if pnt2[1] < pnt1[1]: 

449 M.append(pnt2[1] + d[1]) 

450 sign = [-1, -1] if pnt2[1] >= 0. else [1, 1] 

451 else: 

452 M.append(pnt1[1] + d[1]) 

453 sign = [1, -1] if pnt2[1] >= 0. else [-1, 1] 

454 return [M[0] + sign[0] * bb * d[1] / aa, M[1] + sign[1] * bb * d[0] / aa] 

455 

456 @staticmethod 

457 def arc_angle_between_point_and_abscissa(p, c): 

458 """ 

459 Returns the angle of an arc with center c and endpoints at (cx + radius, cy) and (px, py) 

460 :param p: list of x and y coordinates of a point 

461 :param c: list of x and y coordinates of the arc center 

462 """ 

463 theta = np.arctan2(p[1] - c[1], p[0] - c[0]) 

464 return theta + (2 * np.pi if theta < 0 else 0) 

465 

466 @staticmethod 

467 def intersection_between_two_lines(line1, line2): 

468 """ 

469 Finds the intersection point between two lines 

470 :param line1: list of A, B, C (A*x + B*y + C = 0) 

471 :param line2: list of A, B, C (A*x + B*y + C = 0) 

472 """ 

473 if line1[1] == 0.0: 

474 x = - line1[2] / line1[0] 

475 y = - (line2[0] * x + line2[2]) / line2[1] 

476 elif line2[1] == 0.0: 

477 x = - line2[2] / line2[0] 

478 y = - (line1[0] * x + line1[2]) / line1[1] 

479 else: 

480 a = - line1[0] / line1[1] 

481 c = - line1[2] / line1[1] 

482 b = - line2[0] / line2[1] 

483 d = - line2[2] / line2[1] 

484 x = (d - c) / (a - b) 

485 y = a * x + c 

486 return [x, y] 

487 

488 @staticmethod 

489 def intersection_between_circle_and_line(line, circle, get_only_closest: bool = False): 

490 """ 

491 Finds the intersection point/s between a circle and a line 

492 :param line: list of A, B, C (A*x + B*y + C = 0) 

493 :param circle: list of lists (x and y coordinates of the center, and point) 

494 :param get_only_closest: boolean to return only closest intersection point to the circle point 

495 """ 

496 vertical = line[1] == 0 

497 c, d = circle 

498 r = GeometricFunctions.points_distance(c, d) 

499 intersect = [] 

500 if vertical: 

501 m = - line[2] / line[0] 

502 delta = r ** 2 + 2 * m * c[0] - m ** 2 - c[0] ** 2 

503 else: 

504 m, b = - line[0] / line[1], - line[2] / line[1] 

505 A = m ** 2 + 1 

506 B = 2 * (m * b - c[0] - m * c[1]) 

507 C = b ** 2 - r ** 2 + c[0] ** 2 + c[1] ** 2 - 2 * c[1] * b 

508 delta = B ** 2 - 4 * A * C 

509 

510 if delta < 0: # no intersection with the circle 

511 return None 

512 elif delta == 0: # tangent to the circle 

513 x0 = m if vertical else - B / 2 / A 

514 y0 = c[1] if vertical else m * x0 + b 

515 intersect.append([x0, y0]) 

516 else: # two intersections with the circle 

517 x1 = m if vertical else (- B + np.sqrt(delta)) / 2 / A 

518 y1 = np.sqrt(delta) + c[1] if vertical else m * x1 + b 

519 x2 = m if vertical else (- B - np.sqrt(delta)) / 2 / A 

520 y2 = - np.sqrt(delta) + c[1] if vertical else m * x2 + b 

521 intersect.append([x1, y1]) 

522 intersect.append([x2, y2]) 

523 if get_only_closest: 

524 distance1 = GeometricFunctions.points_distance(d, intersect[0]) 

525 distance2 = GeometricFunctions.points_distance(d, intersect[1]) 

526 if distance1 > distance2: 

527 intersect.pop(0) 

528 else: 

529 intersect.pop(1) 

530 return intersect 

531 

532 @staticmethod 

533 def intersection_between_arc_and_line(line, arc): 

534 """ 

535 Finds the intersection point/s between an arc and a line 

536 :param line: list of A, B, C (A*x + B*y + C = 0) 

537 :param arc: list of lists (x and y coordinates of the center, high-angle endpoint, and low-angle endpoint) 

538 """ 

539 vertical = line[1] == 0 

540 c, d, e = arc 

541 r = GeometricFunctions.points_distance(c, d) 

542 angle_d = GeometricFunctions.arc_angle_between_point_and_abscissa(d, c) 

543 if angle_d == 0: 

544 angle_d = 2 * np.pi # if the 'high-angle' angle is 0, set it to 2*pi to avoid issues with the arc 

545 angle_e = GeometricFunctions.arc_angle_between_point_and_abscissa(e, c) 

546 intersect = [] 

547 if vertical: 

548 m = - line[2] / line[0] 

549 delta = r ** 2 + 2 * m * c[0] - m ** 2 - c[0] ** 2 

550 else: 

551 m, b = - line[0] / line[1], - line[2] / line[1] 

552 A = m ** 2 + 1 

553 B = 2 * (m * b - c[0] - m * c[1]) 

554 C = b ** 2 - r ** 2 + c[0] ** 2 + c[1] ** 2 - 2 * c[1] * b 

555 delta = B ** 2 - 4 * A * C 

556 

557 if delta < 0: # no intersection with the circle 

558 return None 

559 elif delta == 0: # tangent to the circle 

560 x0 = m if vertical else - B / 2 / A 

561 y0 = c[1] if vertical else m * x0 + b 

562 angle0 = GeometricFunctions.arc_angle_between_point_and_abscissa([x0, y0], c) 

563 intersect0 = True if angle_e < angle0 < angle_d else False 

564 if intersect0: 

565 intersect.append([x0, y0]) 

566 else: # no intersection with the arc 

567 return None 

568 else: # two intersections with the circle 

569 x1 = m if vertical else (- B + np.sqrt(delta)) / 2 / A 

570 y1 = np.sqrt(delta) + c[1] if vertical else m * x1 + b 

571 angle1 = GeometricFunctions.arc_angle_between_point_and_abscissa([x1, y1], c) 

572 intersect1 = True if (angle_e < angle1 < angle_d) or abs(angle1 - angle_e) < 1e-6 or abs(angle1 - angle_d) < 1e-6 else False 

573 x2 = m if vertical else (- B - np.sqrt(delta)) / 2 / A 

574 y2 = - np.sqrt(delta) + c[1] if vertical else m * x2 + b 

575 angle2 = GeometricFunctions.arc_angle_between_point_and_abscissa([x2, y2], c) 

576 intersect2 = True if (angle_e < angle2 < angle_d) or abs(angle2 - angle_e) < 1e-6 or abs(angle2 - angle_d) < 1e-6 else False 

577 if not intersect1 and not intersect2: # no intersection with the arc 

578 return None 

579 if intersect1: # first point intersecting the arc 

580 intersect.append([x1, y1]) 

581 if intersect2: # second point intersecting the arc 

582 intersect.append([x2, y2]) 

583 

584 return intersect 

585 

586 

587class GmshUtils: 

588 

589 def __init__(self, model_name='dummy_name', verbose=True): 

590 self.model_name = model_name 

591 self.verbose = verbose 

592 

593 def initialize(self, verbosity_Gmsh: int = 5): 

594 """ 

595 Initialize Gmsh with options for FiQuS 

596 :param verbosity_Gmsh: Input file run.verbosity_Gmsh 

597 :type verbosity_Gmsh: int 

598 """ 

599 if not gmsh.is_initialized(): 

600 gmsh.initialize(sys.argv, interruptible=False, readConfigFiles=False) 

601 gmsh.model.add(str(self.model_name)) 

602 num_threads = multiprocessing.cpu_count() 

603 gmsh.option.setNumber('General.NumThreads', num_threads) # enable multithreading (this seems to be only for meshing) 

604 gmsh.option.setNumber('Mesh.MaxNumThreads1D', num_threads) 

605 gmsh.option.setNumber('Mesh.MaxNumThreads2D', num_threads) 

606 gmsh.option.setNumber('Mesh.MaxNumThreads3D', num_threads) 

607 gmsh.option.setNumber('Geometry.OCCParallel', 1) 

608 gmsh.option.setNumber('Geometry.ToleranceBoolean', 0.000001) 

609 gmsh.option.setString('Geometry.OCCTargetUnit', 'M') 

610 gmsh.option.setNumber("General.Verbosity", verbosity_Gmsh) 

611 if self.verbose: 

612 gmsh.option.setNumber('General.Terminal', 1) 

613 else: 

614 gmsh.option.setNumber('General.Terminal', 0) 

615 

616 def check_for_event(self): # pragma: no cover 

617 action = gmsh.onelab.getString("ONELAB/Action") 

618 if len(action) and action[0] == "check": 

619 gmsh.onelab.setString("ONELAB/Action", [""]) 

620 if self.verbose: 

621 print("-------------------check----------------") 

622 gmsh.fltk.update() 

623 gmsh.graphics.draw() 

624 if len(action) and action[0] == "compute": 

625 gmsh.onelab.setString("ONELAB/Action", [""]) 

626 if self.verbose: 

627 print("-------------------compute----------------") 

628 gmsh.onelab.setChanged("Gmsh", 0) 

629 gmsh.onelab.setChanged("GetDP", 0) 

630 gmsh.fltk.update() 

631 gmsh.graphics.draw() 

632 return True 

633 

634 def launch_interactive_GUI(self, close_after=-1): # pragma: no cover 

635 gmsh.fltk.initialize() 

636 while gmsh.fltk.isAvailable() and self.check_for_event(): 

637 gmsh.fltk.wait() 

638 if close_after >= 0: 

639 sleep(close_after) 

640 gmsh.fltk.finalize() 

641 gmsh.finalize() 

642 

643 

644class RoxieParsers: 

645 def __init__(self, conductor, block, xyCorner): 

646 self.conductor = conductor 

647 self.block = block 

648 self.xyCorner = xyCorner 

649 

650 @staticmethod 

651 def parseMap2d(map2dFile: Path, physical_quantity: str = 'magnetic_flux_density'): 

652 """ 

653 Generates pandas data frame with map2d content 

654 :param map2dFile: path of map2dFile containing the content to parse 

655 :param physical_quantity: magnetic_flux_density or temperature 

656 """ 

657 physical_quantities_abbreviations = {'magnetic_flux_density': ('BX/T', 'BY/T'), 'temperature': ('T/K', '-')} 

658 columns = ['BL.', 'COND.', 'NO.', 'X-POS/MM', 'Y-POS/MM'] + \ 

659 [abbr for abbr in physical_quantities_abbreviations[physical_quantity]] + \ 

660 ['AREA/MM**2', 'CURRENT', 'FILL FAC.'] 

661 return pd.read_csv(map2dFile, sep=r"\s{2,}|(?<=2) |(?<=T) ", engine='python', usecols=columns) 

662 

663 @staticmethod 

664 def parseCond2d(cond2dFile: Path): 

665 """ 

666 Read input file and return list of ConductorPosition objects 

667 

668 # input: fileName 

669 # output: conductorPositionsList 

670 

671 """ 

672 # conductorStartKeyword = "CONDUCTOR POSITION IN THE CROSS-SECTION" 

673 blockStartKeyword = "BLOCK POSITION IN THE CROSS-SECTION" 

674 

675 fileContent = open(cond2dFile, "r").read() 

676 

677 # separate rows 

678 fileContentByRow = fileContent.split("\n") 

679 

680 # Find block definition 

681 for i in range(len(fileContentByRow)): 

682 if blockStartKeyword in fileContentByRow[i]: 

683 startOfBlockDefinitionIndex = i 

684 

685 # separate part of the data with conductor position information 

686 conductorPositions = fileContentByRow[5:startOfBlockDefinitionIndex - 2] 

687 

688 # drop every 5th row 

689 conductorPositionsFourVertices = list(conductorPositions) 

690 del conductorPositionsFourVertices[4::5] 

691 

692 # arrange data in a list of lists 

693 outputConductorPositions = [] 

694 for row in conductorPositionsFourVertices: 

695 rowSplitStr = row.split(',') 

696 rowSplitFloat = [float(elem) for elem in rowSplitStr] 

697 outputConductorPositions.append(rowSplitFloat) 

698 

699 # arrange data from list to numpy.array 

700 outputConductorPositionsMatrix = np.array(outputConductorPositions) 

701 

702 # input: outputConductorPositions 

703 # output: conductorPositionsList 

704 conductorPositionsList = [] 

705 for i in range(0, len(outputConductorPositions), 4): 

706 out = outputConductorPositions[i] 

707 conductor = int(out[1]) 

708 block = int(out[2]) 

709 xyCorner = outputConductorPositionsMatrix[i:i + 4, 4:6] 

710 conductorPositionsList.append(RoxieParsers(conductor, block, xyCorner)) 

711 

712 return conductorPositionsList 

713 

714 

715def initialize_logger(work_folder: str = None, time_stamp: str = None, verbose: bool = True, ): 

716 """ 

717 This is logger function to write FiQuS log files. 

718 

719 :param work_folder: Folder where the log file is written to 

720 :type work_folder: str 

721 :param time_stamp: time stamp put in the log file name 

722 :type time_stamp: str 

723 :param verbose: if true INFO level logs are printed, if false only WARNING level logs are printed to the console 

724 :type verbose: bool 

725 :return: logger object 

726 :rtype: object 

727 """ 

728 

729 logger = logging.getLogger('FiQuS') 

730 

731 for handler in logger.handlers: 

732 if isinstance(handler, logging.FileHandler): 

733 handler.close() 

734 logger.removeHandler(handler) 

735 

736 if verbose: 

737 logger.setLevel(logging.INFO) 

738 else: 

739 logger.setLevel(logging.WARNING) 

740 

741 stdout_handler = logging.StreamHandler(sys.stdout) 

742 stdout_handler.setLevel(logging.INFO) 

743 stdout_handler.setFormatter(LoggingFormatter()) 

744 logger.addHandler(stdout_handler) 

745 

746 FilesAndFolders.prep_folder(work_folder) 

747 FilesAndFolders.prep_folder(os.path.join(work_folder, "logs")) 

748 file_handler = logging.FileHandler(os.path.join(work_folder, "logs", f"{time_stamp}.FiQuS.log")) 

749 file_handler.setLevel(logging.INFO) 

750 fileFormatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s') 

751 file_handler.setFormatter(fileFormatter) 

752 logger.addHandler(file_handler) 

753 

754 errorsAndWarnings_file_handler = logging.FileHandler(os.path.join(work_folder, "logs", f"ERRORS_WARNINGS_{time_stamp}.FiQuS.log")) 

755 errorsAndWarnings_file_handler.setLevel(logging.WARNING) 

756 errorsAndWarnings_file_handler.setFormatter(fileFormatter) 

757 logger.addHandler(errorsAndWarnings_file_handler) 

758 

759 return logger 

760 

761 

762def create_json_schema(data_model: FDM): 

763 """ 

764 Create the JSON Schema from a Pydantic data model 

765 :param data_model: FDM 

766 :type data_model: FDM 

767 """ 

768 

769 # Generate the raw JSON schema from the Pydantic model 

770 json_schema_dict = data_model.model_json_schema() 

771 

772 # Replace anyOf with oneOf for better compatibility 

773 json_schema_str = json.dumps(json_schema_dict) 

774 json_schema_str = json_schema_str.replace("anyOf", "oneOf") 

775 

776 # Pretty-print the schema with proper indentation 

777 pretty_json_schema = json.dumps(json.loads(json_schema_str), indent=4, ensure_ascii=False) 

778 

779 # Define the output folder for the schema 

780 docs_folder = os.path.join( 

781 os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "docs" 

782 ) 

783 

784 # Create the _inputs folder for the JSON schema 

785 json_schema_file_path = os.path.join(docs_folder, "schema.json") 

786 os.makedirs(os.path.dirname(json_schema_file_path), exist_ok=True) 

787 

788 # Write the prettified JSON schema to a file 

789 with open(json_schema_file_path, "w", encoding="utf-8") as file: 

790 file.write(pretty_json_schema) 

791 

792 

793def get_data_settings(GetDP_path=None, settings=None): 

794 user_name = getpass.getuser() 

795 

796 if user_name == 'root': 

797 user_name = 'SYSTEM' 

798 elif user_name == 'MP-WIN-02$': 

799 user_name = 'MP_WIN_02' 

800 if not settings: 

801 path_to_settings_file = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "tests", f"settings.{user_name}.yaml") 

802 try: 

803 settings = FilesAndFolders.read_data_from_yaml(path_to_settings_file, DataSettings) 

804 except: 

805 with open(settings.error.log, 'a') as file: 

806 # Append the string to the file 

807 file.write(f'Could not find: {path_to_settings_file}' + '\n') 

808 raise ValueError(f'File: {path_to_settings_file} does not exist.') 

809 

810 if GetDP_path: 

811 settings.GetDP_path = GetDP_path 

812 

813 return settings