Coverage for fiqus/utils/Utils.py: 77%

440 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-12-25 02:54 +0100

1import sys 

2import os 

3import getpass 

4import platform 

5import shutil 

6import logging 

7import re 

8 

9import numpy as np 

10from pathlib import Path 

11from time import sleep 

12import multiprocessing 

13 

14import pandas as pd 

15import ruamel.yaml 

16import gmsh 

17import json 

18 

19from fiqus.data.DataSettings import DataSettings 

20from fiqus.data.DataFiQuS import FDM 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25class LoggingFormatter(logging.Formatter): 

26 """ 

27 Logging formatter class 

28 """ 

29 grey = "\x1b[38;20m" # debug level 

30 white = "\x1b[37;20m" # info level 

31 yellow = "\x1b[33;20m" # warning level 

32 red = "\x1b[31;20m" # error level 

33 bold_red = "\x1b[31;1m" # critical level 

34 

35 reset = "\x1b[0m" 

36 format = '%(asctime)s | %(levelname)s | %(message)s' 

37 

38 FORMATS = { 

39 logging.DEBUG: grey + format + reset, 

40 logging.INFO: white + format + reset, 

41 logging.WARNING: yellow + format + reset, 

42 logging.ERROR: red + format + reset, 

43 logging.CRITICAL: bold_red + format + reset 

44 } 

45 

46 def format(self, record): 

47 log_fmt = self.FORMATS.get(record.levelno) 

48 formatter = logging.Formatter(log_fmt) 

49 return formatter.format(record) 

50 

51 

52class FilesAndFolders: 

53 @staticmethod 

54 def read_data_from_yaml(full_file_path, data_class): 

55 with open(full_file_path, 'r') as stream: 

56 yaml = ruamel.yaml.YAML(typ='safe', pure=True) 

57 yaml_str = yaml.load(stream) 

58 if "magnet" in yaml_str: 

59 yaml_str["magnet"]["input_file_path"] = str(full_file_path) 

60 

61 return data_class(**yaml_str) 

62 

63 @staticmethod 

64 def write_data_to_yaml(full_file_path, dict_of_data_class, list_exceptions=[]): 

65 def my_represent_none(self, data): 

66 """ 

67 Change data representation from empty string to "null" string 

68 """ 

69 return self.represent_scalar('tag:yaml.org,2002:null', 'null') 

70 

71 def flist(x): 

72 """ 

73 Define a commented sequence to allow writing a list in a single row 

74 """ 

75 retval = ruamel.yaml.comments.CommentedSeq(x) 

76 retval.fa.set_flow_style() # fa -> format attribute 

77 return retval 

78 

79 def list_single_row_recursively(data_dict: dict, exceptions: list): 

80 """ 

81 Write lists in a single row 

82 :param data_dict: Dictionary to edit 

83 :param exceptions: List of strings defining keys that will not be written 

84 in a single row 

85 :return: 

86 """ 

87 for key, value in data_dict.items(): 

88 if isinstance(value, list) and (key not in exceptions): 

89 data_dict[key] = flist(value) 

90 elif isinstance(value, np.ndarray): 

91 data_dict[key] = flist(value.tolist()) 

92 elif isinstance(value, dict): 

93 data_dict[key] = list_single_row_recursively(value, exceptions) 

94 

95 return data_dict 

96 

97 yaml = ruamel.yaml.YAML() 

98 yaml.default_flow_style = False 

99 yaml.emitter.alt_null = 'Null' 

100 yaml.representer.add_representer(type(None), my_represent_none) 

101 dict_of_data_class = list_single_row_recursively(dict_of_data_class, exceptions=list_exceptions) 

102 with open(full_file_path, 'w') as yaml_file: 

103 yaml.dump(dict_of_data_class, yaml_file) 

104 

105 @staticmethod 

106 def write_data_model_to_yaml(full_file_path, data_model, with_comments=True, by_alias=True): 

107 if data_model: 

108 # Set up YAML instance settings: 

109 yamlInstance = ruamel.yaml.YAML() 

110 

111 # Convert the model_data to a ruamel.yaml object/dictionary: 

112 if with_comments: 

113 path_object = Path(full_file_path) 

114 # Add pydantic descriptions to the yaml file as comments: 

115 dummy_yaml_file_to_create_ruamel_object = ( 

116 path_object.resolve().parent.joinpath("dummy.yaml") 

117 ) 

118 with open(dummy_yaml_file_to_create_ruamel_object, "w") as stream: 

119 yamlInstance.dump(data_model.dict(by_alias=by_alias), stream) 

120 

121 # Read the file: 

122 with open(dummy_yaml_file_to_create_ruamel_object, "r") as stream: 

123 # Read the yaml file and store the date inside ruamel_yaml_object: 

124 # ruamel_yaml_object is a special object that stores both the data and 

125 # comments. Even though the data might be changed or added, the same 

126 # object will be used to create the new YAML file to store the comments. 

127 ruamel_yaml_object = yamlInstance.load( 

128 dummy_yaml_file_to_create_ruamel_object 

129 ) 

130 

131 os.remove(dummy_yaml_file_to_create_ruamel_object) 

132 

133 def iterate_fields(model, ruamel_yaml_object): 

134 for currentPydanticKey, value in model.__fields__.items(): 

135 if value.alias and by_alias: 

136 currentDictionaryKey = value.alias 

137 else: 

138 currentDictionaryKey = currentPydanticKey 

139 

140 if value.description: 

141 ruamel_yaml_object.yaml_add_eol_comment( 

142 value.description, 

143 currentDictionaryKey, 

144 ) 

145 

146 if hasattr(getattr(model, currentPydanticKey), "__fields__"): 

147 new_ruamel_yaml_object = iterate_fields( 

148 getattr(model, currentPydanticKey), 

149 ruamel_yaml_object[currentDictionaryKey], 

150 ) 

151 

152 ruamel_yaml_object[currentDictionaryKey] = new_ruamel_yaml_object 

153 

154 elif isinstance(getattr(model, currentPydanticKey), list): 

155 for i, item in enumerate(getattr(model, currentPydanticKey)): 

156 if hasattr(item, "__fields__"): 

157 new_ruamel_yaml_object = iterate_fields( 

158 item, 

159 ruamel_yaml_object[currentDictionaryKey][i], 

160 ) 

161 

162 ruamel_yaml_object[currentDictionaryKey][i] = new_ruamel_yaml_object 

163 

164 return ruamel_yaml_object 

165 

166 iterate_fields(data_model, ruamel_yaml_object) 

167 for currentPydanticKey, value in data_model.__fields__.items(): 

168 if value.alias and by_alias: 

169 currentDictionaryKey = value.alias 

170 else: 

171 currentDictionaryKey = currentPydanticKey 

172 

173 if hasattr(getattr(data_model, currentPydanticKey), "__fields__"): 

174 ruamel_yaml_object[currentDictionaryKey] = iterate_fields( 

175 getattr(data_model, currentPydanticKey), 

176 ruamel_yaml_object[currentDictionaryKey], 

177 ) 

178 

179 data_dict = ruamel_yaml_object 

180 

181 else: 

182 data_dict = data_model.dict(by_alias=by_alias) 

183 

184 yamlInstance.indent(sequence=4, offset=2) 

185 with open(full_file_path, 'w') as yaml_file: 

186 yamlInstance.dump(data_dict, yaml_file) 

187 

188 @staticmethod 

189 def prep_folder(folder_full_path, clear: bool = False): 

190 if clear: 

191 if os.path.exists(folder_full_path): 

192 shutil.rmtree(folder_full_path) # delete directory 

193 if not os.path.exists(folder_full_path): 

194 os.makedirs(folder_full_path) # make new directory 

195 

196 @staticmethod 

197 def get_folder_path(folder_type, folder, folder_key, overwrite, required_folder): 

198 """ 

199 Method for ... 

200 :param folder_type: 

201 :type folder_type: 

202 :param folder: 

203 :type folder: 

204 :param folder_key: 

205 :type folder_key: 

206 :param overwrite: 

207 :type overwrite: 

208 :param required_folder: 

209 :type required_folder: 

210 :return: 

211 :rtype: 

212 """ 

213 if required_folder and not (folder_key and overwrite): 

214 all_dirs = [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()] 

215 all_relevant_dirs = [x for x in all_dirs if x.startswith(f"{folder_type}_{folder_key}")] 

216 if f"{folder_type}_{folder_key}" in all_relevant_dirs: 

217 new_folder_key = f"{folder_key}_{len(all_relevant_dirs) + 1}" 

218 folder_key = new_folder_key 

219 

220 folder_path = os.path.join(folder, folder_type + '_' + str(folder_key)) 

221 # Disable the line below to avoid deleting the folder # TODO: add logic to control this at a higher level 

222 FilesAndFolders.prep_folder(folder_path, overwrite and required_folder) 

223 return folder_path 

224 

225 @staticmethod 

226 def compute_folder_key(folder_type, folder, overwrite): 

227 # Find all the directories in the folder 

228 all_dirs = [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()] 

229 

230 # Find all the directiories that start with the folder_type (e.g. geometry, mesh, solution) 

231 # Then combine them into a single string with a custom seperator (se@p) 

232 # Seperators are used to guarantee the directories can be split later 

233 all_relevant_dirs = " se@p ".join([x for x in all_dirs if x.startswith(f"{folder_type}_")]) 

234 all_relevant_dirs = f"{all_relevant_dirs} se@p " 

235 

236 # Find all the integer keys in the relevant directories 

237 integers_in_relevant_dirs = re.findall(rf'{folder_type}_(\d+) se@p ', all_relevant_dirs) 

238 

239 if integers_in_relevant_dirs is None: 

240 # If there are no integers in the relevant directories, set the key to 1 

241 folder_key = 1 

242 else: 

243 # Make a list of integers out of the integers in the relevant directories 

244 integers_in_relevant_dirs = [int(x) for x in integers_in_relevant_dirs] 

245 

246 # Sort the integers in the relevant directories 

247 integers_in_relevant_dirs.sort() 

248 

249 if overwrite: 

250 # If overwrite is true, set the key to the largest integer in the 

251 # so that the folder with the largest integer key is overwritten 

252 if len(integers_in_relevant_dirs) == 0: 

253 folder_key = 1 

254 else: 

255 folder_key = max(integers_in_relevant_dirs) 

256 else: 

257 # If overwrite is false, then find the smallest integer key that is not 

258 # in the list of integers in the relevant directories 

259 folder_key = 1 

260 for i in integers_in_relevant_dirs: 

261 if folder_key < i: 

262 break 

263 folder_key += 1 

264 

265 return folder_key 

266 

267 @staticmethod 

268 def print_welcome_graphics(): 

269 logger.info(r" _____ _ ___ ____ ") 

270 logger.info(r"| ___(_)/ _ \ _ _/ ___| ") 

271 logger.info(r"| |_ | | | | | | | \___ \ ") 

272 logger.info(r"| _| | | |_| | |_| |___) |") 

273 logger.info(r"|_| |_|\__\_\\__,_|____/ ") 

274 logger.info("") 

275 

276 

277class CheckForExceptions: 

278 

279 @staticmethod 

280 def check_inputs(run): # RunFiQuS() 

281 # """ 

282 # This method raises errors when geometry, mesh or solution folders inputs are incorrect. Warnings are disabled as a trial. 

283 # :param run: FDM.run object 

284 # :type run: FDM.run 

285 # """ 

286 if run.type == 'start_from_yaml': 

287 pass 

288 # if run.geometry and not run.overwrite: 

289 # warnings.warn("Warning: Geometry folder is needed only if it has to be overwritten. Ignoring it...") 

290 # if run.solution or run.mesh: 

291 # warnings.warn("Warning: Mesh and Solution folders are not needed. Ignoring them...") 

292 # elif run.type == 'geometry_only': 

293 # if run.solution or run.mesh: 

294 # warnings.warn("Warning: Mesh and Solution folders are not needed. Ignoring them...") 

295 # elif run.type == 'geometry_and_mesh': 

296 # if run.geometry and not run.overwrite: 

297 # warnings.warn("Warning: Geometry folder is needed only if it has to be overwritten. Ignoring it...") 

298 # if run.mesh: 

299 # warnings.warn("Warning: Mesh folder is not needed. Ignoring it...") 

300 elif run.type == 'mesh_and_solve_with_post_process': 

301 if not run.geometry: 

302 raise Exception('Full path to Geometry not provided. ' 

303 'Insert options -> reference_files -> geometry.') 

304 # if run.mesh and not run.overwrite: 

305 # warnings.warn("Warning: Mesh folder is needed only if it has to be overwritten. Ignoring it...") 

306 # if run.solution: 

307 # warnings.warn("Warning: Solution folder is not needed. Ignoring it...") 

308 elif run.type == 'mesh_only': 

309 if not run.geometry: 

310 raise Exception('Full path to Mesh not provided. ' 

311 'Insert options -> reference_files -> geometry.') 

312 # if run.solution: 

313 # warnings.warn("Warning: Solution folder is not needed. Ignoring it...") 

314 elif run.type == 'solve_with_post_process': 

315 if not run.mesh or not run.geometry: 

316 raise Exception('Full path to Mesh not provided. ' 

317 'Insert options -> reference_files -> geometry and mesh.') 

318 # if run.solution and not run.overwrite: 

319 # warnings.warn("Warning: Solution folder is needed only if it has to be overwritten. Ignoring it...") 

320 elif run.type == 'solve_only': 

321 if not run.mesh or not run.geometry: 

322 raise Exception('Full path to Mesh not provided. ' 

323 'Insert options -> reference_files -> geometry and mesh.') 

324 # if run.solution and not run.overwrite: 

325 # warnings.warn("Warning: Solution folder is needed only if it has to be overwritten. Ignoring it...") 

326 elif run.type == 'post_process_only': 

327 if not run.mesh or not run.geometry or not run.solution: 

328 raise Exception('Full path to Solution not provided. ' 

329 'Insert options -> reference_files -> geometry, mesh, and solution.') 

330 

331 @staticmethod 

332 def check_overwrite_conditions(folder_type, folder, folder_key): 

333 """ 

334 This method prints warning related to overwrite conditions settings. This is disabled as a trial. 

335 :param folder_type: 

336 :type folder_type: 

337 :param folder: 

338 :type folder: 

339 :param folder_key: 

340 :type folder_key: 

341 """ 

342 pass 

343 # if folder_key: 

344 # if not os.path.exists(os.path.join(folder, folder_type + '_' + str(folder_key))): 

345 # warnings.warn( 

346 # f'The folder {folder_type}_{folder_key} does not exist. Creating it...') 

347 # else: 

348 # warnings.warn( 

349 # f'Reference number of the folder {folder_type} not provided. ' 

350 # f'Overwriting the latest {folder_type} folder...') 

351 

352 

353class GeometricFunctions: 

354 

355 @staticmethod 

356 def sig_dig(n, precision=8): 

357 return float(np.format_float_positional(n, precision=precision)) 

358 

359 @staticmethod 

360 def points_distance(a, b): 

361 """ 

362 Computes the distance between two points a and b 

363 :param a: list of x and y coordinates 

364 :param b: list of x and y coordinates 

365 """ 

366 a = np.array(a) 

367 b = np.array(b) 

368 return np.linalg.norm(a - b) 

369 

370 @staticmethod 

371 def line_through_two_points(point1, point2): 

372 """ 

373 Finds coefficients of the line through two points [x1,y1] and [x2,y2] 

374 :param point1: 2-element list defining x/y positions of the 1st point 

375 :param point2: 2-element list defining x/y positions of the 2nd point 

376 :return: 3-element list defining the A, B, and C coefficients of the line, as in: A*x + B*y + C = 0 

377 """ 

378 x1, y1 = point1[0], point1[1] 

379 x2, y2 = point2[0], point2[1] 

380 if x2 == x1: 

381 A = 1 

382 B = 0 

383 C = - x1 

384 elif y2 == y1: 

385 A = 0 

386 B = 1 

387 C = - y1 

388 else: 

389 A = - (y2 - y1) / (x2 - x1) 

390 B = + 1 

391 C = - (x2 * y1 - x1 * y2) / (x2 - x1) 

392 return [float(A), float(B), float(C)] 

393 

394 @staticmethod 

395 def centroid(X, Y): 

396 """ 

397 Computes the centroid coordinates of a non-self-intersecting closed polygon 

398 :param X: list of x coordinate of the vertices 

399 :param Y: list of y coordinate of the vertices 

400 """ 

401 sum_A, sum_Cx, sum_Cy = 0, 0, 0 

402 for i in range(len(X)): 

403 index = i + 1 if i != len(X) - 1 else 0 

404 A = X[i] * Y[index] - X[index] * Y[i] 

405 sum_Cx += (X[i] + X[index]) * A 

406 sum_Cy += (Y[i] + Y[index]) * A 

407 sum_A += A 

408 factor = 1 / (3 * sum_A) 

409 return [factor * sum_Cx, factor * sum_Cy] 

410 

411 @staticmethod 

412 def arc_center_from_3_points(a, b, c): 

413 """ 

414 Computes the center coordinates of an arc passing through three points 

415 :param a: list of x and y coordinates of one arc point 

416 :param b: list of x and y coordinates of one arc point 

417 :param c: list of x and y coordinates of one arc point 

418 """ 

419 ab = [a[0] - b[0], a[1] - b[1]] 

420 ac = [a[0] - c[0], a[1] - c[1]] 

421 sac = [a[0] * a[0] - c[0] * c[0], a[1] * a[1] - c[1] * c[1]] 

422 sba = [b[0] * b[0] - a[0] * a[0], b[1] * b[1] - a[1] * a[1]] 

423 yy = (sac[0] * ab[0] + sac[1] * ab[0] + sba[0] * ac[0] + sba[1] * ac[0]) / \ 

424 (2 * ((c[1] - a[1]) * ab[0] - (b[1] - a[1]) * ac[0])) 

425 xx = (sac[0] * ab[1] + sac[1] * ab[1] + sba[0] * ac[1] + sba[1] * ac[1]) / \ 

426 (2 * ((c[0] - a[0]) * ab[1] - (b[0] - a[0]) * ac[1])) 

427 return [-xx, -yy] 

428 

429 @staticmethod 

430 def corrected_arc_center(C, pnt1, pnt2): 

431 """ 

432 Computes the center coordinates of an arc from two points and a guessed center 

433 :param C: list of x and y coordinates of guessed center 

434 :param pnt1: list of x and y coordinates of first arc point 

435 :param pnt2: list of x and y coordinates of second arc point 

436 """ 

437 if pnt1[1] < 0: 

438 pnt_tmp = pnt1.copy() 

439 pnt1 = pnt2.copy() 

440 pnt2 = pnt_tmp 

441 radius = (np.sqrt(np.square(pnt1[0] - C[0]) + np.square(pnt1[1] - C[1])) + 

442 np.sqrt(np.square(pnt2[0] - C[0]) + np.square(pnt2[1] - C[1]))) / 2 

443 d = [0.5 * abs((pnt2[0] - pnt1[0])), 0.5 * abs((pnt1[1] - pnt2[1]))] 

444 aa = np.sqrt(np.square(d[0]) + np.square(d[1])) 

445 bb = np.sqrt(np.square(radius) - np.square(aa)) 

446 M = [pnt1[0] + d[0]] 

447 if pnt2[1] < pnt1[1]: 

448 M.append(pnt2[1] + d[1]) 

449 sign = [-1, -1] if pnt2[1] >= 0. else [1, 1] 

450 else: 

451 M.append(pnt1[1] + d[1]) 

452 sign = [1, -1] if pnt2[1] >= 0. else [-1, 1] 

453 return [M[0] + sign[0] * bb * d[1] / aa, M[1] + sign[1] * bb * d[0] / aa] 

454 

455 @staticmethod 

456 def arc_angle_between_point_and_abscissa(p, c): 

457 """ 

458 Returns the angle of an arc with center c and endpoints at (cx + radius, cy) and (px, py) 

459 :param p: list of x and y coordinates of a point 

460 :param c: list of x and y coordinates of the arc center 

461 """ 

462 theta = np.arctan2(p[1] - c[1], p[0] - c[0]) 

463 return theta + (2 * np.pi if theta < 0 else 0) 

464 

465 @staticmethod 

466 def intersection_between_two_lines(line1, line2): 

467 """ 

468 Finds the intersection point between two lines 

469 :param line1: list of A, B, C (A*x + B*y + C = 0) 

470 :param line2: list of A, B, C (A*x + B*y + C = 0) 

471 """ 

472 if line1[1] == 0.0: 

473 x = - line1[2] / line1[0] 

474 y = - (line2[0] * x + line2[2]) / line2[1] 

475 elif line2[1] == 0.0: 

476 x = - line2[2] / line2[0] 

477 y = - (line1[0] * x + line1[2]) / line1[1] 

478 else: 

479 a = - line1[0] / line1[1] 

480 c = - line1[2] / line1[1] 

481 b = - line2[0] / line2[1] 

482 d = - line2[2] / line2[1] 

483 x = (d - c) / (a - b) 

484 y = a * x + c 

485 return [x, y] 

486 

487 @staticmethod 

488 def intersection_between_circle_and_line(line, circle, get_only_closest: bool = False): 

489 """ 

490 Finds the intersection point/s between a circle and a line 

491 :param line: list of A, B, C (A*x + B*y + C = 0) 

492 :param circle: list of lists (x and y coordinates of the center, and point) 

493 :param get_only_closest: boolean to return only closest intersection point to the circle point 

494 """ 

495 vertical = line[1] == 0 

496 c, d = circle 

497 r = GeometricFunctions.points_distance(c, d) 

498 intersect = [] 

499 if vertical: 

500 m = - line[2] / line[0] 

501 delta = r ** 2 + 2 * m * c[0] - m ** 2 - c[0] ** 2 

502 else: 

503 m, b = - line[0] / line[1], - line[2] / line[1] 

504 A = m ** 2 + 1 

505 B = 2 * (m * b - c[0] - m * c[1]) 

506 C = b ** 2 - r ** 2 + c[0] ** 2 + c[1] ** 2 - 2 * c[1] * b 

507 delta = B ** 2 - 4 * A * C 

508 

509 if delta < 0: # no intersection with the circle 

510 return None 

511 elif delta == 0: # tangent to the circle 

512 x0 = m if vertical else - B / 2 / A 

513 y0 = c[1] if vertical else m * x0 + b 

514 intersect.append([x0, y0]) 

515 else: # two intersections with the circle 

516 x1 = m if vertical else (- B + np.sqrt(delta)) / 2 / A 

517 y1 = np.sqrt(delta) + c[1] if vertical else m * x1 + b 

518 x2 = m if vertical else (- B - np.sqrt(delta)) / 2 / A 

519 y2 = - np.sqrt(delta) + c[1] if vertical else m * x2 + b 

520 intersect.append([x1, y1]) 

521 intersect.append([x2, y2]) 

522 if get_only_closest: 

523 distance1 = GeometricFunctions.points_distance(d, intersect[0]) 

524 distance2 = GeometricFunctions.points_distance(d, intersect[1]) 

525 if distance1 > distance2: 

526 intersect.pop(0) 

527 else: 

528 intersect.pop(1) 

529 return intersect 

530 

531 @staticmethod 

532 def intersection_between_arc_and_line(line, arc): 

533 """ 

534 Finds the intersection point/s between an arc and a line 

535 :param line: list of A, B, C (A*x + B*y + C = 0) 

536 :param arc: list of lists (x and y coordinates of the center, high-angle endpoint, and low-angle endpoint) 

537 """ 

538 vertical = line[1] == 0 

539 c, d, e = arc 

540 r = GeometricFunctions.points_distance(c, d) 

541 angle_d = GeometricFunctions.arc_angle_between_point_and_abscissa(d, c) 

542 angle_e = GeometricFunctions.arc_angle_between_point_and_abscissa(e, c) 

543 intersect = [] 

544 if vertical: 

545 m = - line[2] / line[0] 

546 delta = r ** 2 + 2 * m * c[0] - m ** 2 - c[0] ** 2 

547 else: 

548 m, b = - line[0] / line[1], - line[2] / line[1] 

549 A = m ** 2 + 1 

550 B = 2 * (m * b - c[0] - m * c[1]) 

551 C = b ** 2 - r ** 2 + c[0] ** 2 + c[1] ** 2 - 2 * c[1] * b 

552 delta = B ** 2 - 4 * A * C 

553 

554 if delta < 0: # no intersection with the circle 

555 return None 

556 elif delta == 0: # tangent to the circle 

557 x0 = m if vertical else - B / 2 / A 

558 y0 = c[1] if vertical else m * x0 + b 

559 angle0 = GeometricFunctions.arc_angle_between_point_and_abscissa([x0, y0], c) 

560 intersect0 = True if angle_e < angle0 < angle_d else False 

561 if intersect0: 

562 intersect.append([x0, y0]) 

563 else: # no intersection with the arc 

564 return None 

565 else: # two intersections with the circle 

566 x1 = m if vertical else (- B + np.sqrt(delta)) / 2 / A 

567 y1 = np.sqrt(delta) + c[1] if vertical else m * x1 + b 

568 angle1 = GeometricFunctions.arc_angle_between_point_and_abscissa([x1, y1], c) 

569 intersect1 = True if (angle_e < angle1 < angle_d) or abs(angle1 - angle_e) < 1e-6 or abs(angle1 - angle_d) < 1e-6 else False 

570 x2 = m if vertical else (- B - np.sqrt(delta)) / 2 / A 

571 y2 = - np.sqrt(delta) + c[1] if vertical else m * x2 + b 

572 angle2 = GeometricFunctions.arc_angle_between_point_and_abscissa([x2, y2], c) 

573 intersect2 = True if (angle_e < angle2 < angle_d) or abs(angle2 - angle_e) < 1e-6 or abs(angle2 - angle_d) < 1e-6 else False 

574 if not intersect1 and not intersect2: # no intersection with the arc 

575 return None 

576 if intersect1: # first point intersecting the arc 

577 intersect.append([x1, y1]) 

578 if intersect2: # second point intersecting the arc 

579 intersect.append([x2, y2]) 

580 

581 return intersect 

582 

583 

584class GmshUtils: 

585 

586 def __init__(self, model_name='dummy_name', verbose=True): 

587 self.model_name = model_name 

588 self.verbose = verbose 

589 

590 def initialize(self, verbosity_Gmsh: int = 5): 

591 """ 

592 Initialize Gmsh with options for FiQuS 

593 :param verbosity_Gmsh: Input file run.verbosity_Gmsh 

594 :type verbosity_Gmsh: int 

595 """ 

596 if not gmsh.is_initialized(): 

597 gmsh.initialize(sys.argv) 

598 gmsh.model.add(str(self.model_name)) 

599 num_threads = multiprocessing.cpu_count() 

600 gmsh.option.setNumber('General.NumThreads', num_threads) # enable multithreading (this seems to be only for meshing) 

601 gmsh.option.setNumber('Mesh.MaxNumThreads1D', num_threads) 

602 gmsh.option.setNumber('Mesh.MaxNumThreads2D', num_threads) 

603 gmsh.option.setNumber('Mesh.MaxNumThreads3D', num_threads) 

604 gmsh.option.setNumber('Geometry.OCCParallel', 1) 

605 gmsh.option.setNumber('Geometry.ToleranceBoolean', 0.000001) 

606 gmsh.option.setString('Geometry.OCCTargetUnit', 'M') 

607 gmsh.option.setNumber("General.Verbosity", verbosity_Gmsh) 

608 if self.verbose: 

609 gmsh.option.setNumber('General.Terminal', 1) 

610 else: 

611 gmsh.option.setNumber('General.Terminal', 0) 

612 

613 def check_for_event(self): # pragma: no cover 

614 action = gmsh.onelab.getString("ONELAB/Action") 

615 if len(action) and action[0] == "check": 

616 gmsh.onelab.setString("ONELAB/Action", [""]) 

617 if self.verbose: 

618 print("-------------------check----------------") 

619 gmsh.fltk.update() 

620 gmsh.graphics.draw() 

621 if len(action) and action[0] == "compute": 

622 gmsh.onelab.setString("ONELAB/Action", [""]) 

623 if self.verbose: 

624 print("-------------------compute----------------") 

625 gmsh.onelab.setChanged("Gmsh", 0) 

626 gmsh.onelab.setChanged("GetDP", 0) 

627 gmsh.fltk.update() 

628 gmsh.graphics.draw() 

629 return True 

630 

631 def launch_interactive_GUI(self, close_after=-1): # pragma: no cover 

632 gmsh.fltk.initialize() 

633 while gmsh.fltk.isAvailable() and self.check_for_event(): 

634 gmsh.fltk.wait() 

635 if close_after >= 0: 

636 sleep(close_after) 

637 gmsh.fltk.finalize() 

638 gmsh.finalize() 

639 

640 

641class RoxieParsers: 

642 def __init__(self, conductor, block, xyCorner): 

643 self.conductor = conductor 

644 self.block = block 

645 self.xyCorner = xyCorner 

646 

647 @staticmethod 

648 def parseMap2d(map2dFile: Path, physical_quantity: str = 'magnetic_flux_density'): 

649 """ 

650 Generates pandas data frame with map2d content 

651 :param map2dFile: path of map2dFile containing the content to parse 

652 :param physical_quantity: magnetic_flux_density or temperature 

653 """ 

654 physical_quantities_abbreviations = {'magnetic_flux_density': ('BX/T', 'BY/T'), 'temperature': ('T/K', '-')} 

655 columns = ['BL.', 'COND.', 'NO.', 'X-POS/MM', 'Y-POS/MM'] + \ 

656 [abbr for abbr in physical_quantities_abbreviations[physical_quantity]] + \ 

657 ['AREA/MM**2', 'CURRENT', 'FILL FAC.'] 

658 return pd.read_csv(map2dFile, sep=r"\s{2,}|(?<=2) |(?<=T) ", engine='python', usecols=columns) 

659 

660 @staticmethod 

661 def parseCond2d(cond2dFile: Path): 

662 """ 

663 Read input file and return list of ConductorPosition objects 

664 

665 # input: fileName 

666 # output: conductorPositionsList 

667 

668 """ 

669 # conductorStartKeyword = "CONDUCTOR POSITION IN THE CROSS-SECTION" 

670 blockStartKeyword = "BLOCK POSITION IN THE CROSS-SECTION" 

671 

672 fileContent = open(cond2dFile, "r").read() 

673 

674 # separate rows 

675 fileContentByRow = fileContent.split("\n") 

676 

677 # Find block definition 

678 for i in range(len(fileContentByRow)): 

679 if blockStartKeyword in fileContentByRow[i]: 

680 startOfBlockDefinitionIndex = i 

681 

682 # separate part of the data with conductor position information 

683 conductorPositions = fileContentByRow[5:startOfBlockDefinitionIndex - 2] 

684 

685 # drop every 5th row 

686 conductorPositionsFourVertices = list(conductorPositions) 

687 del conductorPositionsFourVertices[4::5] 

688 

689 # arrange data in a list of lists 

690 outputConductorPositions = [] 

691 for row in conductorPositionsFourVertices: 

692 rowSplitStr = row.split(',') 

693 rowSplitFloat = [float(elem) for elem in rowSplitStr] 

694 outputConductorPositions.append(rowSplitFloat) 

695 

696 # arrange data from list to numpy.array 

697 outputConductorPositionsMatrix = np.array(outputConductorPositions) 

698 

699 # input: outputConductorPositions 

700 # output: conductorPositionsList 

701 conductorPositionsList = [] 

702 for i in range(0, len(outputConductorPositions), 4): 

703 out = outputConductorPositions[i] 

704 conductor = int(out[1]) 

705 block = int(out[2]) 

706 xyCorner = outputConductorPositionsMatrix[i:i + 4, 4:6] 

707 conductorPositionsList.append(RoxieParsers(conductor, block, xyCorner)) 

708 

709 return conductorPositionsList 

710 

711 

712def initialize_logger(work_folder: str = None, time_stamp: str = None, verbose: bool = True, ): 

713 """ 

714 THis is logger function to write FiQuS log files. 

715 

716 :param work_folder: Folder where the log file is written to 

717 :type work_folder: str 

718 :param time_stamp: time stamp put in the log file name 

719 :type time_stamp: str 

720 :param verbose: if true INFO level logs are printed, if false only WARNING level logs are printed to the console 

721 :type verbose: bool 

722 :return: logger object 

723 :rtype: object 

724 """ 

725 

726 logger = logging.getLogger() 

727 

728 while logger.hasHandlers(): 

729 logger.removeHandler(logger.handlers[0]) 

730 

731 if verbose: 

732 logger.setLevel(logging.INFO) 

733 else: 

734 logger.setLevel(logging.WARNING) 

735 

736 stdout_handler = logging.StreamHandler(sys.stdout) 

737 stdout_handler.setLevel(logging.INFO) 

738 stdout_handler.setFormatter(LoggingFormatter()) 

739 logger.addHandler(stdout_handler) 

740 

741 FilesAndFolders.prep_folder(work_folder) 

742 FilesAndFolders.prep_folder(os.path.join(work_folder, "logs")) 

743 file_handler = logging.FileHandler(os.path.join(work_folder, "logs", f"{time_stamp}.FiQuS.log")) 

744 file_handler.setLevel(logging.INFO) 

745 fileFormatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s') 

746 file_handler.setFormatter(fileFormatter) 

747 logger.addHandler(file_handler) 

748 

749 errorsAndWarnings_file_handler = logging.FileHandler(os.path.join(work_folder, "logs", f"ERRORS_WARNINGS_{time_stamp}.FiQuS.log")) 

750 errorsAndWarnings_file_handler.setLevel(logging.WARNING) 

751 errorsAndWarnings_file_handler.setFormatter(fileFormatter) 

752 logger.addHandler(errorsAndWarnings_file_handler) 

753 

754 return logger 

755 

756 

757def create_json_schema(data_model: FDM): 

758 """ 

759 Create the JSON Schema from a Pydantic data model 

760 :param data_model: FDM 

761 :type data_model: FDM 

762 """ 

763 

764 # Generate the raw JSON schema from the Pydantic model 

765 json_schema_dict = data_model.model_json_schema() 

766 

767 # Replace anyOf with oneOf for better compatibility 

768 json_schema_str = json.dumps(json_schema_dict) 

769 json_schema_str = json_schema_str.replace("anyOf", "oneOf") 

770 

771 # Pretty-print the schema with proper indentation 

772 pretty_json_schema = json.dumps(json.loads(json_schema_str), indent=4, ensure_ascii=False) 

773 

774 # Define the output folder for the schema 

775 docs_folder = os.path.join( 

776 os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "docs" 

777 ) 

778 

779 # Create the _inputs folder for the JSON schema 

780 json_schema_file_path = os.path.join(docs_folder, "schema.json") 

781 os.makedirs(os.path.dirname(json_schema_file_path), exist_ok=True) 

782 

783 # Write the prettified JSON schema to a file 

784 with open(json_schema_file_path, "w", encoding="utf-8") as file: 

785 file.write(pretty_json_schema) 

786 

787def get_data_settings(GetDP_path=None, settings=None): 

788 user_name = getpass.getuser() 

789 

790 if user_name == 'root': 

791 user_name = 'SYSTEM' 

792 elif user_name == 'MP-WIN-02$': 

793 user_name = 'MP_WIN_02' 

794 if not settings: 

795 path_to_settings_file = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "tests", f"settings.{user_name}.yaml") 

796 try: 

797 settings = FilesAndFolders.read_data_from_yaml(path_to_settings_file, DataSettings) 

798 except: 

799 with open(settings.error.log, 'a') as file: 

800 # Append the string to the file 

801 file.write(f'Could not find: {path_to_settings_file}' + '\n') 

802 raise ValueError(f'File: {path_to_settings_file} does not exist.') 

803 

804 if platform.system() == 'Windows': 

805 if GetDP_path: 

806 settings.GetDP_path = GetDP_path 

807 elif platform.system() == 'Linux': 

808 settings.GetDP_path = 'getdp' 

809 elif platform.system() == 'Darwin': 

810 if GetDP_path: 

811 settings.GetDP_path = GetDP_path 

812 else: 

813 raise ValueError(f'Python claims that you are running on operating system: {platform.system()} and it is not supported by FiQuS!') 

814 

815 return settings