Coverage for fiqus/utils/Utils.py: 78%
438 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-02-01 01:38 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-02-01 01:38 +0000
1import sys
2import os
3import getpass
4import platform
5import shutil
6import logging
7import re
9import numpy as np
10from pathlib import Path
11from time import sleep
12import multiprocessing
14import pandas as pd
15import ruamel.yaml
16import json
17import gmsh
18from pydantic import BaseModel
20from fiqus.data.DataSettings import DataSettings
21from fiqus.data.DataFiQuS import FDM
23logger = logging.getLogger('FiQuS')
26class LoggingFormatter(logging.Formatter):
27 """
28 Logging formatter class
29 """
30 grey = "\x1b[38;20m" # debug level
31 white = "\x1b[37;20m" # info level
32 yellow = "\x1b[33;20m" # warning level
33 red = "\x1b[31;20m" # error level
34 bold_red = "\x1b[31;1m" # critical level
36 reset = "\x1b[0m"
37 format = '%(asctime)s | %(levelname)s | %(message)s'
39 FORMATS = {
40 logging.DEBUG: grey + format + reset,
41 logging.INFO: white + format + reset,
42 logging.WARNING: yellow + format + reset,
43 logging.ERROR: red + format + reset,
44 logging.CRITICAL: bold_red + format + reset
45 }
47 def format(self, record):
48 log_fmt = self.FORMATS.get(record.levelno)
49 formatter = logging.Formatter(log_fmt)
50 return formatter.format(record)
53class FilesAndFolders:
54 @staticmethod
55 def read_data_from_yaml(full_file_path, data_class):
56 with open(full_file_path, 'r') as stream:
57 yaml = ruamel.yaml.YAML(typ='safe', pure=True)
58 yaml_str = yaml.load(stream)
59 if "magnet" in yaml_str:
60 yaml_str["magnet"]["input_file_path"] = str(full_file_path)
62 return data_class(**yaml_str)
64 @staticmethod
65 def write_data_to_yaml(full_file_path, dict_of_data_class, list_exceptions=[]):
66 def my_represent_none(self, data):
67 """
68 Change data representation from empty string to "null" string
69 """
70 return self.represent_scalar('tag:yaml.org,2002:null', 'null')
72 def flist(x):
73 """
74 Define a commented sequence to allow writing a list in a single row
75 """
76 retval = ruamel.yaml.comments.CommentedSeq(x)
77 retval.fa.set_flow_style() # fa -> format attribute
78 return retval
80 def list_single_row_recursively(data_dict: dict, exceptions: list):
81 """
82 Write lists in a single row
83 :param data_dict: Dictionary to edit
84 :param exceptions: List of strings defining keys that will not be written
85 in a single row
86 :return:
87 """
88 for key, value in data_dict.items():
89 if isinstance(value, list) and (key not in exceptions):
90 data_dict[key] = flist(value)
91 elif isinstance(value, np.ndarray):
92 data_dict[key] = flist(value.tolist())
93 elif isinstance(value, dict):
94 data_dict[key] = list_single_row_recursively(value, exceptions)
96 return data_dict
98 yaml = ruamel.yaml.YAML()
99 yaml.default_flow_style = False
100 yaml.emitter.alt_null = 'Null'
101 yaml.representer.add_representer(type(None), my_represent_none)
102 dict_of_data_class = list_single_row_recursively(dict_of_data_class, exceptions=list_exceptions)
103 with open(full_file_path, 'w') as yaml_file:
104 yaml.dump(dict_of_data_class, yaml_file)
106 @staticmethod
107 def write_data_model_to_yaml(full_file_path, data_model, with_comments=True, by_alias=True):
108 if isinstance(data_model, BaseModel):
109 # Set up YAML instance settings:
110 yamlInstance = ruamel.yaml.YAML()
112 # Convert the model_data to a ruamel.yaml object/dictionary:
113 if with_comments:
114 path_object = Path(full_file_path)
115 # Add pydantic descriptions to the yaml file as comments:
116 dummy_yaml_file_to_create_ruamel_object = (
117 path_object.resolve().parent.joinpath("dummy.yaml")
118 )
119 with open(dummy_yaml_file_to_create_ruamel_object, "w") as stream:
120 yamlInstance.dump(data_model.model_dump(by_alias=by_alias), stream)
122 # Read the file:
123 with open(dummy_yaml_file_to_create_ruamel_object, "r") as stream:
124 # Read the yaml file and store the date inside ruamel_yaml_object:
125 # ruamel_yaml_object is a special object that stores both the data and
126 # comments. Even though the data might be changed or added, the same
127 # object will be used to create the new YAML file to store the comments.
128 ruamel_yaml_object = yamlInstance.load(
129 dummy_yaml_file_to_create_ruamel_object
130 )
132 os.remove(dummy_yaml_file_to_create_ruamel_object)
134 def iterate_fields(model, ruamel_yaml_object):
135 for currentPydanticKey, value in model.model_fields.items():
136 if value.alias and by_alias:
137 currentDictionaryKey = value.alias
138 else:
139 currentDictionaryKey = currentPydanticKey
141 if value.description:
142 ruamel_yaml_object.yaml_add_eol_comment(
143 value.description,
144 currentDictionaryKey,
145 )
147 if hasattr(getattr(model, currentPydanticKey), "model_fields"):
148 new_ruamel_yaml_object = iterate_fields(
149 getattr(model, currentPydanticKey),
150 ruamel_yaml_object[currentDictionaryKey],
151 )
153 ruamel_yaml_object[currentDictionaryKey] = new_ruamel_yaml_object
155 elif isinstance(getattr(model, currentPydanticKey), list):
156 for i, item in enumerate(getattr(model, currentPydanticKey)):
157 if hasattr(item, "model_fields"):
158 new_ruamel_yaml_object = iterate_fields(
159 item,
160 ruamel_yaml_object[currentDictionaryKey][i],
161 )
163 ruamel_yaml_object[currentDictionaryKey][i] = new_ruamel_yaml_object
165 return ruamel_yaml_object
167 iterate_fields(data_model, ruamel_yaml_object)
168 for currentPydanticKey, value in data_model.model_fields.items():
169 if value.alias and by_alias:
170 currentDictionaryKey = value.alias
171 else:
172 currentDictionaryKey = currentPydanticKey
174 if hasattr(getattr(data_model, currentPydanticKey), "model_fields"):
175 ruamel_yaml_object[currentDictionaryKey] = iterate_fields(
176 getattr(data_model, currentPydanticKey),
177 ruamel_yaml_object[currentDictionaryKey],
178 )
180 data_dict = ruamel_yaml_object
182 else:
183 data_dict = data_model.model_dump(by_alias=by_alias)
185 yamlInstance.indent(sequence=4, offset=2)
186 with open(full_file_path, 'w') as yaml_file:
187 yamlInstance.dump(data_dict, yaml_file)
189 @staticmethod
190 def prep_folder(folder_full_path, clear: bool = False):
191 if clear:
192 if os.path.exists(folder_full_path):
193 shutil.rmtree(folder_full_path) # delete directory
194 if not os.path.exists(folder_full_path):
195 os.makedirs(folder_full_path) # make new directory
197 @staticmethod
198 def get_folder_path(folder_type, folder, folder_key, overwrite, required_folder):
199 """
200 Method for ...
201 :param folder_type:
202 :type folder_type:
203 :param folder:
204 :type folder:
205 :param folder_key:
206 :type folder_key:
207 :param overwrite:
208 :type overwrite:
209 :param required_folder:
210 :type required_folder:
211 :return:
212 :rtype:
213 """
214 if required_folder and not (folder_key and overwrite):
215 all_dirs = [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()]
216 all_relevant_dirs = [x for x in all_dirs if x.startswith(f"{folder_type}_{folder_key}")]
217 if f"{folder_type}_{folder_key}" in all_relevant_dirs:
218 new_folder_key = f"{folder_key}_{len(all_relevant_dirs) + 1}"
219 folder_key = new_folder_key
221 folder_path = os.path.join(folder, folder_type + '_' + str(folder_key))
222 # Disable the line below to avoid deleting the folder # TODO: add logic to control this at a higher level
223 FilesAndFolders.prep_folder(folder_path, overwrite and required_folder)
224 return folder_path
226 @staticmethod
227 def compute_folder_key(folder_type, folder, overwrite):
228 # Find all the directories in the folder
229 all_dirs = [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()]
231 # Find all the directiories that start with the folder_type (e.g. geometry, mesh, solution)
232 # Then combine them into a single string with a custom seperator (se@p)
233 # Seperators are used to guarantee the directories can be split later
234 all_relevant_dirs = " se@p ".join([x for x in all_dirs if x.startswith(f"{folder_type}_")])
235 all_relevant_dirs = f"{all_relevant_dirs} se@p "
237 # Find all the integer keys in the relevant directories
238 integers_in_relevant_dirs = re.findall(rf'{folder_type}_(\d+) se@p ', all_relevant_dirs)
240 if integers_in_relevant_dirs is None:
241 # If there are no integers in the relevant directories, set the key to 1
242 folder_key = 1
243 else:
244 # Make a list of integers out of the integers in the relevant directories
245 integers_in_relevant_dirs = [int(x) for x in integers_in_relevant_dirs]
247 # Sort the integers in the relevant directories
248 integers_in_relevant_dirs.sort()
250 if overwrite:
251 # If overwrite is true, set the key to the largest integer in the
252 # so that the folder with the largest integer key is overwritten
253 if len(integers_in_relevant_dirs) == 0:
254 folder_key = 1
255 else:
256 folder_key = max(integers_in_relevant_dirs)
257 else:
258 # If overwrite is false, then find the smallest integer key that is not
259 # in the list of integers in the relevant directories
260 folder_key = 1
261 for i in integers_in_relevant_dirs:
262 if folder_key < i:
263 break
264 folder_key += 1
266 return folder_key
268 @staticmethod
269 def print_welcome_graphics():
270 logger.info(r" _____ _ ___ ____ ")
271 logger.info(r"| ___(_)/ _ \ _ _/ ___| ")
272 logger.info(r"| |_ | | | | | | | \___ \ ")
273 logger.info(r"| _| | | |_| | |_| |___) |")
274 logger.info(r"|_| |_|\__\_\\__,_|____/ ")
275 logger.info("")
278class CheckForExceptions:
280 @staticmethod
281 def check_inputs(run): # RunFiQuS()
282 # """
283 # This method raises errors when geometry, mesh or solution folders inputs are incorrect. Warnings are disabled as a trial.
284 # :param run: FDM.run object
285 # :type run: FDM.run
286 # """
287 if run.type == 'start_from_yaml':
288 pass
289 # if run.geometry and not run.overwrite:
290 # warnings.warn("Warning: Geometry folder is needed only if it has to be overwritten. Ignoring it...")
291 # if run.solution or run.mesh:
292 # warnings.warn("Warning: Mesh and Solution folders are not needed. Ignoring them...")
293 # elif run.type == 'geometry_only':
294 # if run.solution or run.mesh:
295 # warnings.warn("Warning: Mesh and Solution folders are not needed. Ignoring them...")
296 # elif run.type == 'geometry_and_mesh':
297 # if run.geometry and not run.overwrite:
298 # warnings.warn("Warning: Geometry folder is needed only if it has to be overwritten. Ignoring it...")
299 # if run.mesh:
300 # warnings.warn("Warning: Mesh folder is not needed. Ignoring it...")
301 elif run.type == 'mesh_and_solve_with_post_process':
302 if not run.geometry:
303 raise Exception('Full path to Geometry not provided. '
304 'Insert options -> reference_files -> geometry.')
305 # if run.mesh and not run.overwrite:
306 # warnings.warn("Warning: Mesh folder is needed only if it has to be overwritten. Ignoring it...")
307 # if run.solution:
308 # warnings.warn("Warning: Solution folder is not needed. Ignoring it...")
309 elif run.type == 'mesh_only':
310 if not run.geometry:
311 raise Exception('Full path to Mesh not provided. '
312 'Insert options -> reference_files -> geometry.')
313 # if run.solution:
314 # warnings.warn("Warning: Solution folder is not needed. Ignoring it...")
315 elif run.type == 'solve_with_post_process':
316 if not run.mesh or not run.geometry:
317 raise Exception('Full path to Mesh not provided. '
318 'Insert options -> reference_files -> geometry and mesh.')
319 # if run.solution and not run.overwrite:
320 # warnings.warn("Warning: Solution folder is needed only if it has to be overwritten. Ignoring it...")
321 elif run.type == 'solve_only':
322 if not run.mesh or not run.geometry:
323 raise Exception('Full path to Mesh not provided. '
324 'Insert options -> reference_files -> geometry and mesh.')
325 # if run.solution and not run.overwrite:
326 # warnings.warn("Warning: Solution folder is needed only if it has to be overwritten. Ignoring it...")
327 elif run.type == 'post_process_only':
328 if not run.mesh or not run.geometry or not run.solution:
329 raise Exception('Full path to Solution not provided. '
330 'Insert options -> reference_files -> geometry, mesh, and solution.')
332 @staticmethod
333 def check_overwrite_conditions(folder_type, folder, folder_key):
334 """
335 This method prints warning related to overwrite conditions settings. This is disabled as a trial.
336 :param folder_type:
337 :type folder_type:
338 :param folder:
339 :type folder:
340 :param folder_key:
341 :type folder_key:
342 """
343 pass
344 # if folder_key:
345 # if not os.path.exists(os.path.join(folder, folder_type + '_' + str(folder_key))):
346 # warnings.warn(
347 # f'The folder {folder_type}_{folder_key} does not exist. Creating it...')
348 # else:
349 # warnings.warn(
350 # f'Reference number of the folder {folder_type} not provided. '
351 # f'Overwriting the latest {folder_type} folder...')
354class GeometricFunctions:
356 @staticmethod
357 def sig_dig(n, precision=8):
358 return float(np.format_float_positional(n, precision=precision))
360 @staticmethod
361 def points_distance(a, b):
362 """
363 Computes the distance between two points a and b
364 :param a: list of x and y coordinates
365 :param b: list of x and y coordinates
366 """
367 a = np.array(a)
368 b = np.array(b)
369 return np.linalg.norm(a - b)
371 @staticmethod
372 def line_through_two_points(point1, point2):
373 """
374 Finds coefficients of the line through two points [x1,y1] and [x2,y2]
375 :param point1: 2-element list defining x/y positions of the 1st point
376 :param point2: 2-element list defining x/y positions of the 2nd point
377 :return: 3-element list defining the A, B, and C coefficients of the line, as in: A*x + B*y + C = 0
378 """
379 x1, y1 = point1[0], point1[1]
380 x2, y2 = point2[0], point2[1]
381 if x2 == x1:
382 A = 1
383 B = 0
384 C = - x1
385 elif y2 == y1:
386 A = 0
387 B = 1
388 C = - y1
389 else:
390 A = - (y2 - y1) / (x2 - x1)
391 B = + 1
392 C = - (x2 * y1 - x1 * y2) / (x2 - x1)
393 return [float(A), float(B), float(C)]
395 @staticmethod
396 def centroid(X, Y):
397 """
398 Computes the centroid coordinates of a non-self-intersecting closed polygon
399 :param X: list of x coordinate of the vertices
400 :param Y: list of y coordinate of the vertices
401 """
402 sum_A, sum_Cx, sum_Cy = 0, 0, 0
403 for i in range(len(X)):
404 index = i + 1 if i != len(X) - 1 else 0
405 A = X[i] * Y[index] - X[index] * Y[i]
406 sum_Cx += (X[i] + X[index]) * A
407 sum_Cy += (Y[i] + Y[index]) * A
408 sum_A += A
409 factor = 1 / (3 * sum_A)
410 return [factor * sum_Cx, factor * sum_Cy]
412 @staticmethod
413 def arc_center_from_3_points(a, b, c):
414 """
415 Computes the center coordinates of an arc passing through three points
416 :param a: list of x and y coordinates of one arc point
417 :param b: list of x and y coordinates of one arc point
418 :param c: list of x and y coordinates of one arc point
419 """
420 ab = [a[0] - b[0], a[1] - b[1]]
421 ac = [a[0] - c[0], a[1] - c[1]]
422 sac = [a[0] * a[0] - c[0] * c[0], a[1] * a[1] - c[1] * c[1]]
423 sba = [b[0] * b[0] - a[0] * a[0], b[1] * b[1] - a[1] * a[1]]
424 yy = (sac[0] * ab[0] + sac[1] * ab[0] + sba[0] * ac[0] + sba[1] * ac[0]) / \
425 (2 * ((c[1] - a[1]) * ab[0] - (b[1] - a[1]) * ac[0]))
426 xx = (sac[0] * ab[1] + sac[1] * ab[1] + sba[0] * ac[1] + sba[1] * ac[1]) / \
427 (2 * ((c[0] - a[0]) * ab[1] - (b[0] - a[0]) * ac[1]))
428 return [-xx, -yy]
430 @staticmethod
431 def corrected_arc_center(C, pnt1, pnt2):
432 """
433 Computes the center coordinates of an arc from two points and a guessed center
434 :param C: list of x and y coordinates of guessed center
435 :param pnt1: list of x and y coordinates of first arc point
436 :param pnt2: list of x and y coordinates of second arc point
437 """
438 if pnt1[1] < 0:
439 pnt_tmp = pnt1.copy()
440 pnt1 = pnt2.copy()
441 pnt2 = pnt_tmp
442 radius = (np.sqrt(np.square(pnt1[0] - C[0]) + np.square(pnt1[1] - C[1])) +
443 np.sqrt(np.square(pnt2[0] - C[0]) + np.square(pnt2[1] - C[1]))) / 2
444 d = [0.5 * abs((pnt2[0] - pnt1[0])), 0.5 * abs((pnt1[1] - pnt2[1]))]
445 aa = np.sqrt(np.square(d[0]) + np.square(d[1]))
446 bb = np.sqrt(np.square(radius) - np.square(aa))
447 M = [pnt1[0] + d[0]]
448 if pnt2[1] < pnt1[1]:
449 M.append(pnt2[1] + d[1])
450 sign = [-1, -1] if pnt2[1] >= 0. else [1, 1]
451 else:
452 M.append(pnt1[1] + d[1])
453 sign = [1, -1] if pnt2[1] >= 0. else [-1, 1]
454 return [M[0] + sign[0] * bb * d[1] / aa, M[1] + sign[1] * bb * d[0] / aa]
456 @staticmethod
457 def arc_angle_between_point_and_abscissa(p, c):
458 """
459 Returns the angle of an arc with center c and endpoints at (cx + radius, cy) and (px, py)
460 :param p: list of x and y coordinates of a point
461 :param c: list of x and y coordinates of the arc center
462 """
463 theta = np.arctan2(p[1] - c[1], p[0] - c[0])
464 return theta + (2 * np.pi if theta < 0 else 0)
466 @staticmethod
467 def intersection_between_two_lines(line1, line2):
468 """
469 Finds the intersection point between two lines
470 :param line1: list of A, B, C (A*x + B*y + C = 0)
471 :param line2: list of A, B, C (A*x + B*y + C = 0)
472 """
473 if line1[1] == 0.0:
474 x = - line1[2] / line1[0]
475 y = - (line2[0] * x + line2[2]) / line2[1]
476 elif line2[1] == 0.0:
477 x = - line2[2] / line2[0]
478 y = - (line1[0] * x + line1[2]) / line1[1]
479 else:
480 a = - line1[0] / line1[1]
481 c = - line1[2] / line1[1]
482 b = - line2[0] / line2[1]
483 d = - line2[2] / line2[1]
484 x = (d - c) / (a - b)
485 y = a * x + c
486 return [x, y]
488 @staticmethod
489 def intersection_between_circle_and_line(line, circle, get_only_closest: bool = False):
490 """
491 Finds the intersection point/s between a circle and a line
492 :param line: list of A, B, C (A*x + B*y + C = 0)
493 :param circle: list of lists (x and y coordinates of the center, and point)
494 :param get_only_closest: boolean to return only closest intersection point to the circle point
495 """
496 vertical = line[1] == 0
497 c, d = circle
498 r = GeometricFunctions.points_distance(c, d)
499 intersect = []
500 if vertical:
501 m = - line[2] / line[0]
502 delta = r ** 2 + 2 * m * c[0] - m ** 2 - c[0] ** 2
503 else:
504 m, b = - line[0] / line[1], - line[2] / line[1]
505 A = m ** 2 + 1
506 B = 2 * (m * b - c[0] - m * c[1])
507 C = b ** 2 - r ** 2 + c[0] ** 2 + c[1] ** 2 - 2 * c[1] * b
508 delta = B ** 2 - 4 * A * C
510 if delta < 0: # no intersection with the circle
511 return None
512 elif delta == 0: # tangent to the circle
513 x0 = m if vertical else - B / 2 / A
514 y0 = c[1] if vertical else m * x0 + b
515 intersect.append([x0, y0])
516 else: # two intersections with the circle
517 x1 = m if vertical else (- B + np.sqrt(delta)) / 2 / A
518 y1 = np.sqrt(delta) + c[1] if vertical else m * x1 + b
519 x2 = m if vertical else (- B - np.sqrt(delta)) / 2 / A
520 y2 = - np.sqrt(delta) + c[1] if vertical else m * x2 + b
521 intersect.append([x1, y1])
522 intersect.append([x2, y2])
523 if get_only_closest:
524 distance1 = GeometricFunctions.points_distance(d, intersect[0])
525 distance2 = GeometricFunctions.points_distance(d, intersect[1])
526 if distance1 > distance2:
527 intersect.pop(0)
528 else:
529 intersect.pop(1)
530 return intersect
532 @staticmethod
533 def intersection_between_arc_and_line(line, arc):
534 """
535 Finds the intersection point/s between an arc and a line
536 :param line: list of A, B, C (A*x + B*y + C = 0)
537 :param arc: list of lists (x and y coordinates of the center, high-angle endpoint, and low-angle endpoint)
538 """
539 vertical = line[1] == 0
540 c, d, e = arc
541 r = GeometricFunctions.points_distance(c, d)
542 angle_d = GeometricFunctions.arc_angle_between_point_and_abscissa(d, c)
543 if angle_d == 0:
544 angle_d = 2 * np.pi # if the 'high-angle' angle is 0, set it to 2*pi to avoid issues with the arc
545 angle_e = GeometricFunctions.arc_angle_between_point_and_abscissa(e, c)
546 intersect = []
547 if vertical:
548 m = - line[2] / line[0]
549 delta = r ** 2 + 2 * m * c[0] - m ** 2 - c[0] ** 2
550 else:
551 m, b = - line[0] / line[1], - line[2] / line[1]
552 A = m ** 2 + 1
553 B = 2 * (m * b - c[0] - m * c[1])
554 C = b ** 2 - r ** 2 + c[0] ** 2 + c[1] ** 2 - 2 * c[1] * b
555 delta = B ** 2 - 4 * A * C
557 if delta < 0: # no intersection with the circle
558 return None
559 elif delta == 0: # tangent to the circle
560 x0 = m if vertical else - B / 2 / A
561 y0 = c[1] if vertical else m * x0 + b
562 angle0 = GeometricFunctions.arc_angle_between_point_and_abscissa([x0, y0], c)
563 intersect0 = True if angle_e < angle0 < angle_d else False
564 if intersect0:
565 intersect.append([x0, y0])
566 else: # no intersection with the arc
567 return None
568 else: # two intersections with the circle
569 x1 = m if vertical else (- B + np.sqrt(delta)) / 2 / A
570 y1 = np.sqrt(delta) + c[1] if vertical else m * x1 + b
571 angle1 = GeometricFunctions.arc_angle_between_point_and_abscissa([x1, y1], c)
572 intersect1 = True if (angle_e < angle1 < angle_d) or abs(angle1 - angle_e) < 1e-6 or abs(angle1 - angle_d) < 1e-6 else False
573 x2 = m if vertical else (- B - np.sqrt(delta)) / 2 / A
574 y2 = - np.sqrt(delta) + c[1] if vertical else m * x2 + b
575 angle2 = GeometricFunctions.arc_angle_between_point_and_abscissa([x2, y2], c)
576 intersect2 = True if (angle_e < angle2 < angle_d) or abs(angle2 - angle_e) < 1e-6 or abs(angle2 - angle_d) < 1e-6 else False
577 if not intersect1 and not intersect2: # no intersection with the arc
578 return None
579 if intersect1: # first point intersecting the arc
580 intersect.append([x1, y1])
581 if intersect2: # second point intersecting the arc
582 intersect.append([x2, y2])
584 return intersect
587class GmshUtils:
589 def __init__(self, model_name='dummy_name', verbose=True):
590 self.model_name = model_name
591 self.verbose = verbose
593 def initialize(self, verbosity_Gmsh: int = 5):
594 """
595 Initialize Gmsh with options for FiQuS
596 :param verbosity_Gmsh: Input file run.verbosity_Gmsh
597 :type verbosity_Gmsh: int
598 """
599 if not gmsh.is_initialized():
600 gmsh.initialize(sys.argv, interruptible=False, readConfigFiles=False)
601 gmsh.model.add(str(self.model_name))
602 num_threads = multiprocessing.cpu_count()
603 gmsh.option.setNumber('General.NumThreads', num_threads) # enable multithreading (this seems to be only for meshing)
604 gmsh.option.setNumber('Mesh.MaxNumThreads1D', num_threads)
605 gmsh.option.setNumber('Mesh.MaxNumThreads2D', num_threads)
606 gmsh.option.setNumber('Mesh.MaxNumThreads3D', num_threads)
607 gmsh.option.setNumber('Geometry.OCCParallel', 1)
608 gmsh.option.setNumber('Geometry.ToleranceBoolean', 0.000001)
609 gmsh.option.setString('Geometry.OCCTargetUnit', 'M')
610 gmsh.option.setNumber("General.Verbosity", verbosity_Gmsh)
611 if self.verbose:
612 gmsh.option.setNumber('General.Terminal', 1)
613 else:
614 gmsh.option.setNumber('General.Terminal', 0)
616 def check_for_event(self): # pragma: no cover
617 action = gmsh.onelab.getString("ONELAB/Action")
618 if len(action) and action[0] == "check":
619 gmsh.onelab.setString("ONELAB/Action", [""])
620 if self.verbose:
621 print("-------------------check----------------")
622 gmsh.fltk.update()
623 gmsh.graphics.draw()
624 if len(action) and action[0] == "compute":
625 gmsh.onelab.setString("ONELAB/Action", [""])
626 if self.verbose:
627 print("-------------------compute----------------")
628 gmsh.onelab.setChanged("Gmsh", 0)
629 gmsh.onelab.setChanged("GetDP", 0)
630 gmsh.fltk.update()
631 gmsh.graphics.draw()
632 return True
634 def launch_interactive_GUI(self, close_after=-1): # pragma: no cover
635 gmsh.fltk.initialize()
636 while gmsh.fltk.isAvailable() and self.check_for_event():
637 gmsh.fltk.wait()
638 if close_after >= 0:
639 sleep(close_after)
640 gmsh.fltk.finalize()
641 gmsh.finalize()
644class RoxieParsers:
645 def __init__(self, conductor, block, xyCorner):
646 self.conductor = conductor
647 self.block = block
648 self.xyCorner = xyCorner
650 @staticmethod
651 def parseMap2d(map2dFile: Path, physical_quantity: str = 'magnetic_flux_density'):
652 """
653 Generates pandas data frame with map2d content
654 :param map2dFile: path of map2dFile containing the content to parse
655 :param physical_quantity: magnetic_flux_density or temperature
656 """
657 physical_quantities_abbreviations = {'magnetic_flux_density': ('BX/T', 'BY/T'), 'temperature': ('T/K', '-')}
658 columns = ['BL.', 'COND.', 'NO.', 'X-POS/MM', 'Y-POS/MM'] + \
659 [abbr for abbr in physical_quantities_abbreviations[physical_quantity]] + \
660 ['AREA/MM**2', 'CURRENT', 'FILL FAC.']
661 return pd.read_csv(map2dFile, sep=r"\s{2,}|(?<=2) |(?<=T) ", engine='python', usecols=columns)
663 @staticmethod
664 def parseCond2d(cond2dFile: Path):
665 """
666 Read input file and return list of ConductorPosition objects
668 # input: fileName
669 # output: conductorPositionsList
671 """
672 # conductorStartKeyword = "CONDUCTOR POSITION IN THE CROSS-SECTION"
673 blockStartKeyword = "BLOCK POSITION IN THE CROSS-SECTION"
675 fileContent = open(cond2dFile, "r").read()
677 # separate rows
678 fileContentByRow = fileContent.split("\n")
680 # Find block definition
681 for i in range(len(fileContentByRow)):
682 if blockStartKeyword in fileContentByRow[i]:
683 startOfBlockDefinitionIndex = i
685 # separate part of the data with conductor position information
686 conductorPositions = fileContentByRow[5:startOfBlockDefinitionIndex - 2]
688 # drop every 5th row
689 conductorPositionsFourVertices = list(conductorPositions)
690 del conductorPositionsFourVertices[4::5]
692 # arrange data in a list of lists
693 outputConductorPositions = []
694 for row in conductorPositionsFourVertices:
695 rowSplitStr = row.split(',')
696 rowSplitFloat = [float(elem) for elem in rowSplitStr]
697 outputConductorPositions.append(rowSplitFloat)
699 # arrange data from list to numpy.array
700 outputConductorPositionsMatrix = np.array(outputConductorPositions)
702 # input: outputConductorPositions
703 # output: conductorPositionsList
704 conductorPositionsList = []
705 for i in range(0, len(outputConductorPositions), 4):
706 out = outputConductorPositions[i]
707 conductor = int(out[1])
708 block = int(out[2])
709 xyCorner = outputConductorPositionsMatrix[i:i + 4, 4:6]
710 conductorPositionsList.append(RoxieParsers(conductor, block, xyCorner))
712 return conductorPositionsList
715def initialize_logger(work_folder: str = None, time_stamp: str = None, verbose: bool = True, ):
716 """
717 This is logger function to write FiQuS log files.
719 :param work_folder: Folder where the log file is written to
720 :type work_folder: str
721 :param time_stamp: time stamp put in the log file name
722 :type time_stamp: str
723 :param verbose: if true INFO level logs are printed, if false only WARNING level logs are printed to the console
724 :type verbose: bool
725 :return: logger object
726 :rtype: object
727 """
729 logger = logging.getLogger('FiQuS')
731 for handler in logger.handlers:
732 if isinstance(handler, logging.FileHandler):
733 handler.close()
734 logger.removeHandler(handler)
736 if verbose:
737 logger.setLevel(logging.INFO)
738 else:
739 logger.setLevel(logging.WARNING)
741 stdout_handler = logging.StreamHandler(sys.stdout)
742 stdout_handler.setLevel(logging.INFO)
743 stdout_handler.setFormatter(LoggingFormatter())
744 logger.addHandler(stdout_handler)
746 FilesAndFolders.prep_folder(work_folder)
747 FilesAndFolders.prep_folder(os.path.join(work_folder, "logs"))
748 file_handler = logging.FileHandler(os.path.join(work_folder, "logs", f"{time_stamp}.FiQuS.log"))
749 file_handler.setLevel(logging.INFO)
750 fileFormatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')
751 file_handler.setFormatter(fileFormatter)
752 logger.addHandler(file_handler)
754 errorsAndWarnings_file_handler = logging.FileHandler(os.path.join(work_folder, "logs", f"ERRORS_WARNINGS_{time_stamp}.FiQuS.log"))
755 errorsAndWarnings_file_handler.setLevel(logging.WARNING)
756 errorsAndWarnings_file_handler.setFormatter(fileFormatter)
757 logger.addHandler(errorsAndWarnings_file_handler)
759 return logger
762def create_json_schema(data_model: FDM):
763 """
764 Create the JSON Schema from a Pydantic data model
765 :param data_model: FDM
766 :type data_model: FDM
767 """
769 # Generate the raw JSON schema from the Pydantic model
770 json_schema_dict = data_model.model_json_schema()
772 # Replace anyOf with oneOf for better compatibility
773 json_schema_str = json.dumps(json_schema_dict)
774 json_schema_str = json_schema_str.replace("anyOf", "oneOf")
776 # Pretty-print the schema with proper indentation
777 pretty_json_schema = json.dumps(json.loads(json_schema_str), indent=4, ensure_ascii=False)
779 # Define the output folder for the schema
780 docs_folder = os.path.join(
781 os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "docs"
782 )
784 # Create the _inputs folder for the JSON schema
785 json_schema_file_path = os.path.join(docs_folder, "schema.json")
786 os.makedirs(os.path.dirname(json_schema_file_path), exist_ok=True)
788 # Write the prettified JSON schema to a file
789 with open(json_schema_file_path, "w", encoding="utf-8") as file:
790 file.write(pretty_json_schema)
793def get_data_settings(GetDP_path=None, settings=None):
794 user_name = getpass.getuser()
796 if user_name == 'root':
797 user_name = 'SYSTEM'
798 elif user_name == 'MP-WIN-02$':
799 user_name = 'MP_WIN_02'
800 if not settings:
801 path_to_settings_file = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "tests", f"settings.{user_name}.yaml")
802 try:
803 settings = FilesAndFolders.read_data_from_yaml(path_to_settings_file, DataSettings)
804 except:
805 with open(settings.error.log, 'a') as file:
806 # Append the string to the file
807 file.write(f'Could not find: {path_to_settings_file}' + '\n')
808 raise ValueError(f'File: {path_to_settings_file} does not exist.')
810 if GetDP_path:
811 settings.GetDP_path = GetDP_path
813 return settings