Coverage for fiqus/utils/Utils.py: 77%
440 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-25 02:54 +0100
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-25 02:54 +0100
1import sys
2import os
3import getpass
4import platform
5import shutil
6import logging
7import re
9import numpy as np
10from pathlib import Path
11from time import sleep
12import multiprocessing
14import pandas as pd
15import ruamel.yaml
16import gmsh
17import json
19from fiqus.data.DataSettings import DataSettings
20from fiqus.data.DataFiQuS import FDM
22logger = logging.getLogger(__name__)
25class LoggingFormatter(logging.Formatter):
26 """
27 Logging formatter class
28 """
29 grey = "\x1b[38;20m" # debug level
30 white = "\x1b[37;20m" # info level
31 yellow = "\x1b[33;20m" # warning level
32 red = "\x1b[31;20m" # error level
33 bold_red = "\x1b[31;1m" # critical level
35 reset = "\x1b[0m"
36 format = '%(asctime)s | %(levelname)s | %(message)s'
38 FORMATS = {
39 logging.DEBUG: grey + format + reset,
40 logging.INFO: white + format + reset,
41 logging.WARNING: yellow + format + reset,
42 logging.ERROR: red + format + reset,
43 logging.CRITICAL: bold_red + format + reset
44 }
46 def format(self, record):
47 log_fmt = self.FORMATS.get(record.levelno)
48 formatter = logging.Formatter(log_fmt)
49 return formatter.format(record)
52class FilesAndFolders:
53 @staticmethod
54 def read_data_from_yaml(full_file_path, data_class):
55 with open(full_file_path, 'r') as stream:
56 yaml = ruamel.yaml.YAML(typ='safe', pure=True)
57 yaml_str = yaml.load(stream)
58 if "magnet" in yaml_str:
59 yaml_str["magnet"]["input_file_path"] = str(full_file_path)
61 return data_class(**yaml_str)
63 @staticmethod
64 def write_data_to_yaml(full_file_path, dict_of_data_class, list_exceptions=[]):
65 def my_represent_none(self, data):
66 """
67 Change data representation from empty string to "null" string
68 """
69 return self.represent_scalar('tag:yaml.org,2002:null', 'null')
71 def flist(x):
72 """
73 Define a commented sequence to allow writing a list in a single row
74 """
75 retval = ruamel.yaml.comments.CommentedSeq(x)
76 retval.fa.set_flow_style() # fa -> format attribute
77 return retval
79 def list_single_row_recursively(data_dict: dict, exceptions: list):
80 """
81 Write lists in a single row
82 :param data_dict: Dictionary to edit
83 :param exceptions: List of strings defining keys that will not be written
84 in a single row
85 :return:
86 """
87 for key, value in data_dict.items():
88 if isinstance(value, list) and (key not in exceptions):
89 data_dict[key] = flist(value)
90 elif isinstance(value, np.ndarray):
91 data_dict[key] = flist(value.tolist())
92 elif isinstance(value, dict):
93 data_dict[key] = list_single_row_recursively(value, exceptions)
95 return data_dict
97 yaml = ruamel.yaml.YAML()
98 yaml.default_flow_style = False
99 yaml.emitter.alt_null = 'Null'
100 yaml.representer.add_representer(type(None), my_represent_none)
101 dict_of_data_class = list_single_row_recursively(dict_of_data_class, exceptions=list_exceptions)
102 with open(full_file_path, 'w') as yaml_file:
103 yaml.dump(dict_of_data_class, yaml_file)
105 @staticmethod
106 def write_data_model_to_yaml(full_file_path, data_model, with_comments=True, by_alias=True):
107 if data_model:
108 # Set up YAML instance settings:
109 yamlInstance = ruamel.yaml.YAML()
111 # Convert the model_data to a ruamel.yaml object/dictionary:
112 if with_comments:
113 path_object = Path(full_file_path)
114 # Add pydantic descriptions to the yaml file as comments:
115 dummy_yaml_file_to_create_ruamel_object = (
116 path_object.resolve().parent.joinpath("dummy.yaml")
117 )
118 with open(dummy_yaml_file_to_create_ruamel_object, "w") as stream:
119 yamlInstance.dump(data_model.dict(by_alias=by_alias), stream)
121 # Read the file:
122 with open(dummy_yaml_file_to_create_ruamel_object, "r") as stream:
123 # Read the yaml file and store the date inside ruamel_yaml_object:
124 # ruamel_yaml_object is a special object that stores both the data and
125 # comments. Even though the data might be changed or added, the same
126 # object will be used to create the new YAML file to store the comments.
127 ruamel_yaml_object = yamlInstance.load(
128 dummy_yaml_file_to_create_ruamel_object
129 )
131 os.remove(dummy_yaml_file_to_create_ruamel_object)
133 def iterate_fields(model, ruamel_yaml_object):
134 for currentPydanticKey, value in model.__fields__.items():
135 if value.alias and by_alias:
136 currentDictionaryKey = value.alias
137 else:
138 currentDictionaryKey = currentPydanticKey
140 if value.description:
141 ruamel_yaml_object.yaml_add_eol_comment(
142 value.description,
143 currentDictionaryKey,
144 )
146 if hasattr(getattr(model, currentPydanticKey), "__fields__"):
147 new_ruamel_yaml_object = iterate_fields(
148 getattr(model, currentPydanticKey),
149 ruamel_yaml_object[currentDictionaryKey],
150 )
152 ruamel_yaml_object[currentDictionaryKey] = new_ruamel_yaml_object
154 elif isinstance(getattr(model, currentPydanticKey), list):
155 for i, item in enumerate(getattr(model, currentPydanticKey)):
156 if hasattr(item, "__fields__"):
157 new_ruamel_yaml_object = iterate_fields(
158 item,
159 ruamel_yaml_object[currentDictionaryKey][i],
160 )
162 ruamel_yaml_object[currentDictionaryKey][i] = new_ruamel_yaml_object
164 return ruamel_yaml_object
166 iterate_fields(data_model, ruamel_yaml_object)
167 for currentPydanticKey, value in data_model.__fields__.items():
168 if value.alias and by_alias:
169 currentDictionaryKey = value.alias
170 else:
171 currentDictionaryKey = currentPydanticKey
173 if hasattr(getattr(data_model, currentPydanticKey), "__fields__"):
174 ruamel_yaml_object[currentDictionaryKey] = iterate_fields(
175 getattr(data_model, currentPydanticKey),
176 ruamel_yaml_object[currentDictionaryKey],
177 )
179 data_dict = ruamel_yaml_object
181 else:
182 data_dict = data_model.dict(by_alias=by_alias)
184 yamlInstance.indent(sequence=4, offset=2)
185 with open(full_file_path, 'w') as yaml_file:
186 yamlInstance.dump(data_dict, yaml_file)
188 @staticmethod
189 def prep_folder(folder_full_path, clear: bool = False):
190 if clear:
191 if os.path.exists(folder_full_path):
192 shutil.rmtree(folder_full_path) # delete directory
193 if not os.path.exists(folder_full_path):
194 os.makedirs(folder_full_path) # make new directory
196 @staticmethod
197 def get_folder_path(folder_type, folder, folder_key, overwrite, required_folder):
198 """
199 Method for ...
200 :param folder_type:
201 :type folder_type:
202 :param folder:
203 :type folder:
204 :param folder_key:
205 :type folder_key:
206 :param overwrite:
207 :type overwrite:
208 :param required_folder:
209 :type required_folder:
210 :return:
211 :rtype:
212 """
213 if required_folder and not (folder_key and overwrite):
214 all_dirs = [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()]
215 all_relevant_dirs = [x for x in all_dirs if x.startswith(f"{folder_type}_{folder_key}")]
216 if f"{folder_type}_{folder_key}" in all_relevant_dirs:
217 new_folder_key = f"{folder_key}_{len(all_relevant_dirs) + 1}"
218 folder_key = new_folder_key
220 folder_path = os.path.join(folder, folder_type + '_' + str(folder_key))
221 # Disable the line below to avoid deleting the folder # TODO: add logic to control this at a higher level
222 FilesAndFolders.prep_folder(folder_path, overwrite and required_folder)
223 return folder_path
225 @staticmethod
226 def compute_folder_key(folder_type, folder, overwrite):
227 # Find all the directories in the folder
228 all_dirs = [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()]
230 # Find all the directiories that start with the folder_type (e.g. geometry, mesh, solution)
231 # Then combine them into a single string with a custom seperator (se@p)
232 # Seperators are used to guarantee the directories can be split later
233 all_relevant_dirs = " se@p ".join([x for x in all_dirs if x.startswith(f"{folder_type}_")])
234 all_relevant_dirs = f"{all_relevant_dirs} se@p "
236 # Find all the integer keys in the relevant directories
237 integers_in_relevant_dirs = re.findall(rf'{folder_type}_(\d+) se@p ', all_relevant_dirs)
239 if integers_in_relevant_dirs is None:
240 # If there are no integers in the relevant directories, set the key to 1
241 folder_key = 1
242 else:
243 # Make a list of integers out of the integers in the relevant directories
244 integers_in_relevant_dirs = [int(x) for x in integers_in_relevant_dirs]
246 # Sort the integers in the relevant directories
247 integers_in_relevant_dirs.sort()
249 if overwrite:
250 # If overwrite is true, set the key to the largest integer in the
251 # so that the folder with the largest integer key is overwritten
252 if len(integers_in_relevant_dirs) == 0:
253 folder_key = 1
254 else:
255 folder_key = max(integers_in_relevant_dirs)
256 else:
257 # If overwrite is false, then find the smallest integer key that is not
258 # in the list of integers in the relevant directories
259 folder_key = 1
260 for i in integers_in_relevant_dirs:
261 if folder_key < i:
262 break
263 folder_key += 1
265 return folder_key
267 @staticmethod
268 def print_welcome_graphics():
269 logger.info(r" _____ _ ___ ____ ")
270 logger.info(r"| ___(_)/ _ \ _ _/ ___| ")
271 logger.info(r"| |_ | | | | | | | \___ \ ")
272 logger.info(r"| _| | | |_| | |_| |___) |")
273 logger.info(r"|_| |_|\__\_\\__,_|____/ ")
274 logger.info("")
277class CheckForExceptions:
279 @staticmethod
280 def check_inputs(run): # RunFiQuS()
281 # """
282 # This method raises errors when geometry, mesh or solution folders inputs are incorrect. Warnings are disabled as a trial.
283 # :param run: FDM.run object
284 # :type run: FDM.run
285 # """
286 if run.type == 'start_from_yaml':
287 pass
288 # if run.geometry and not run.overwrite:
289 # warnings.warn("Warning: Geometry folder is needed only if it has to be overwritten. Ignoring it...")
290 # if run.solution or run.mesh:
291 # warnings.warn("Warning: Mesh and Solution folders are not needed. Ignoring them...")
292 # elif run.type == 'geometry_only':
293 # if run.solution or run.mesh:
294 # warnings.warn("Warning: Mesh and Solution folders are not needed. Ignoring them...")
295 # elif run.type == 'geometry_and_mesh':
296 # if run.geometry and not run.overwrite:
297 # warnings.warn("Warning: Geometry folder is needed only if it has to be overwritten. Ignoring it...")
298 # if run.mesh:
299 # warnings.warn("Warning: Mesh folder is not needed. Ignoring it...")
300 elif run.type == 'mesh_and_solve_with_post_process':
301 if not run.geometry:
302 raise Exception('Full path to Geometry not provided. '
303 'Insert options -> reference_files -> geometry.')
304 # if run.mesh and not run.overwrite:
305 # warnings.warn("Warning: Mesh folder is needed only if it has to be overwritten. Ignoring it...")
306 # if run.solution:
307 # warnings.warn("Warning: Solution folder is not needed. Ignoring it...")
308 elif run.type == 'mesh_only':
309 if not run.geometry:
310 raise Exception('Full path to Mesh not provided. '
311 'Insert options -> reference_files -> geometry.')
312 # if run.solution:
313 # warnings.warn("Warning: Solution folder is not needed. Ignoring it...")
314 elif run.type == 'solve_with_post_process':
315 if not run.mesh or not run.geometry:
316 raise Exception('Full path to Mesh not provided. '
317 'Insert options -> reference_files -> geometry and mesh.')
318 # if run.solution and not run.overwrite:
319 # warnings.warn("Warning: Solution folder is needed only if it has to be overwritten. Ignoring it...")
320 elif run.type == 'solve_only':
321 if not run.mesh or not run.geometry:
322 raise Exception('Full path to Mesh not provided. '
323 'Insert options -> reference_files -> geometry and mesh.')
324 # if run.solution and not run.overwrite:
325 # warnings.warn("Warning: Solution folder is needed only if it has to be overwritten. Ignoring it...")
326 elif run.type == 'post_process_only':
327 if not run.mesh or not run.geometry or not run.solution:
328 raise Exception('Full path to Solution not provided. '
329 'Insert options -> reference_files -> geometry, mesh, and solution.')
331 @staticmethod
332 def check_overwrite_conditions(folder_type, folder, folder_key):
333 """
334 This method prints warning related to overwrite conditions settings. This is disabled as a trial.
335 :param folder_type:
336 :type folder_type:
337 :param folder:
338 :type folder:
339 :param folder_key:
340 :type folder_key:
341 """
342 pass
343 # if folder_key:
344 # if not os.path.exists(os.path.join(folder, folder_type + '_' + str(folder_key))):
345 # warnings.warn(
346 # f'The folder {folder_type}_{folder_key} does not exist. Creating it...')
347 # else:
348 # warnings.warn(
349 # f'Reference number of the folder {folder_type} not provided. '
350 # f'Overwriting the latest {folder_type} folder...')
353class GeometricFunctions:
355 @staticmethod
356 def sig_dig(n, precision=8):
357 return float(np.format_float_positional(n, precision=precision))
359 @staticmethod
360 def points_distance(a, b):
361 """
362 Computes the distance between two points a and b
363 :param a: list of x and y coordinates
364 :param b: list of x and y coordinates
365 """
366 a = np.array(a)
367 b = np.array(b)
368 return np.linalg.norm(a - b)
370 @staticmethod
371 def line_through_two_points(point1, point2):
372 """
373 Finds coefficients of the line through two points [x1,y1] and [x2,y2]
374 :param point1: 2-element list defining x/y positions of the 1st point
375 :param point2: 2-element list defining x/y positions of the 2nd point
376 :return: 3-element list defining the A, B, and C coefficients of the line, as in: A*x + B*y + C = 0
377 """
378 x1, y1 = point1[0], point1[1]
379 x2, y2 = point2[0], point2[1]
380 if x2 == x1:
381 A = 1
382 B = 0
383 C = - x1
384 elif y2 == y1:
385 A = 0
386 B = 1
387 C = - y1
388 else:
389 A = - (y2 - y1) / (x2 - x1)
390 B = + 1
391 C = - (x2 * y1 - x1 * y2) / (x2 - x1)
392 return [float(A), float(B), float(C)]
394 @staticmethod
395 def centroid(X, Y):
396 """
397 Computes the centroid coordinates of a non-self-intersecting closed polygon
398 :param X: list of x coordinate of the vertices
399 :param Y: list of y coordinate of the vertices
400 """
401 sum_A, sum_Cx, sum_Cy = 0, 0, 0
402 for i in range(len(X)):
403 index = i + 1 if i != len(X) - 1 else 0
404 A = X[i] * Y[index] - X[index] * Y[i]
405 sum_Cx += (X[i] + X[index]) * A
406 sum_Cy += (Y[i] + Y[index]) * A
407 sum_A += A
408 factor = 1 / (3 * sum_A)
409 return [factor * sum_Cx, factor * sum_Cy]
411 @staticmethod
412 def arc_center_from_3_points(a, b, c):
413 """
414 Computes the center coordinates of an arc passing through three points
415 :param a: list of x and y coordinates of one arc point
416 :param b: list of x and y coordinates of one arc point
417 :param c: list of x and y coordinates of one arc point
418 """
419 ab = [a[0] - b[0], a[1] - b[1]]
420 ac = [a[0] - c[0], a[1] - c[1]]
421 sac = [a[0] * a[0] - c[0] * c[0], a[1] * a[1] - c[1] * c[1]]
422 sba = [b[0] * b[0] - a[0] * a[0], b[1] * b[1] - a[1] * a[1]]
423 yy = (sac[0] * ab[0] + sac[1] * ab[0] + sba[0] * ac[0] + sba[1] * ac[0]) / \
424 (2 * ((c[1] - a[1]) * ab[0] - (b[1] - a[1]) * ac[0]))
425 xx = (sac[0] * ab[1] + sac[1] * ab[1] + sba[0] * ac[1] + sba[1] * ac[1]) / \
426 (2 * ((c[0] - a[0]) * ab[1] - (b[0] - a[0]) * ac[1]))
427 return [-xx, -yy]
429 @staticmethod
430 def corrected_arc_center(C, pnt1, pnt2):
431 """
432 Computes the center coordinates of an arc from two points and a guessed center
433 :param C: list of x and y coordinates of guessed center
434 :param pnt1: list of x and y coordinates of first arc point
435 :param pnt2: list of x and y coordinates of second arc point
436 """
437 if pnt1[1] < 0:
438 pnt_tmp = pnt1.copy()
439 pnt1 = pnt2.copy()
440 pnt2 = pnt_tmp
441 radius = (np.sqrt(np.square(pnt1[0] - C[0]) + np.square(pnt1[1] - C[1])) +
442 np.sqrt(np.square(pnt2[0] - C[0]) + np.square(pnt2[1] - C[1]))) / 2
443 d = [0.5 * abs((pnt2[0] - pnt1[0])), 0.5 * abs((pnt1[1] - pnt2[1]))]
444 aa = np.sqrt(np.square(d[0]) + np.square(d[1]))
445 bb = np.sqrt(np.square(radius) - np.square(aa))
446 M = [pnt1[0] + d[0]]
447 if pnt2[1] < pnt1[1]:
448 M.append(pnt2[1] + d[1])
449 sign = [-1, -1] if pnt2[1] >= 0. else [1, 1]
450 else:
451 M.append(pnt1[1] + d[1])
452 sign = [1, -1] if pnt2[1] >= 0. else [-1, 1]
453 return [M[0] + sign[0] * bb * d[1] / aa, M[1] + sign[1] * bb * d[0] / aa]
455 @staticmethod
456 def arc_angle_between_point_and_abscissa(p, c):
457 """
458 Returns the angle of an arc with center c and endpoints at (cx + radius, cy) and (px, py)
459 :param p: list of x and y coordinates of a point
460 :param c: list of x and y coordinates of the arc center
461 """
462 theta = np.arctan2(p[1] - c[1], p[0] - c[0])
463 return theta + (2 * np.pi if theta < 0 else 0)
465 @staticmethod
466 def intersection_between_two_lines(line1, line2):
467 """
468 Finds the intersection point between two lines
469 :param line1: list of A, B, C (A*x + B*y + C = 0)
470 :param line2: list of A, B, C (A*x + B*y + C = 0)
471 """
472 if line1[1] == 0.0:
473 x = - line1[2] / line1[0]
474 y = - (line2[0] * x + line2[2]) / line2[1]
475 elif line2[1] == 0.0:
476 x = - line2[2] / line2[0]
477 y = - (line1[0] * x + line1[2]) / line1[1]
478 else:
479 a = - line1[0] / line1[1]
480 c = - line1[2] / line1[1]
481 b = - line2[0] / line2[1]
482 d = - line2[2] / line2[1]
483 x = (d - c) / (a - b)
484 y = a * x + c
485 return [x, y]
487 @staticmethod
488 def intersection_between_circle_and_line(line, circle, get_only_closest: bool = False):
489 """
490 Finds the intersection point/s between a circle and a line
491 :param line: list of A, B, C (A*x + B*y + C = 0)
492 :param circle: list of lists (x and y coordinates of the center, and point)
493 :param get_only_closest: boolean to return only closest intersection point to the circle point
494 """
495 vertical = line[1] == 0
496 c, d = circle
497 r = GeometricFunctions.points_distance(c, d)
498 intersect = []
499 if vertical:
500 m = - line[2] / line[0]
501 delta = r ** 2 + 2 * m * c[0] - m ** 2 - c[0] ** 2
502 else:
503 m, b = - line[0] / line[1], - line[2] / line[1]
504 A = m ** 2 + 1
505 B = 2 * (m * b - c[0] - m * c[1])
506 C = b ** 2 - r ** 2 + c[0] ** 2 + c[1] ** 2 - 2 * c[1] * b
507 delta = B ** 2 - 4 * A * C
509 if delta < 0: # no intersection with the circle
510 return None
511 elif delta == 0: # tangent to the circle
512 x0 = m if vertical else - B / 2 / A
513 y0 = c[1] if vertical else m * x0 + b
514 intersect.append([x0, y0])
515 else: # two intersections with the circle
516 x1 = m if vertical else (- B + np.sqrt(delta)) / 2 / A
517 y1 = np.sqrt(delta) + c[1] if vertical else m * x1 + b
518 x2 = m if vertical else (- B - np.sqrt(delta)) / 2 / A
519 y2 = - np.sqrt(delta) + c[1] if vertical else m * x2 + b
520 intersect.append([x1, y1])
521 intersect.append([x2, y2])
522 if get_only_closest:
523 distance1 = GeometricFunctions.points_distance(d, intersect[0])
524 distance2 = GeometricFunctions.points_distance(d, intersect[1])
525 if distance1 > distance2:
526 intersect.pop(0)
527 else:
528 intersect.pop(1)
529 return intersect
531 @staticmethod
532 def intersection_between_arc_and_line(line, arc):
533 """
534 Finds the intersection point/s between an arc and a line
535 :param line: list of A, B, C (A*x + B*y + C = 0)
536 :param arc: list of lists (x and y coordinates of the center, high-angle endpoint, and low-angle endpoint)
537 """
538 vertical = line[1] == 0
539 c, d, e = arc
540 r = GeometricFunctions.points_distance(c, d)
541 angle_d = GeometricFunctions.arc_angle_between_point_and_abscissa(d, c)
542 angle_e = GeometricFunctions.arc_angle_between_point_and_abscissa(e, c)
543 intersect = []
544 if vertical:
545 m = - line[2] / line[0]
546 delta = r ** 2 + 2 * m * c[0] - m ** 2 - c[0] ** 2
547 else:
548 m, b = - line[0] / line[1], - line[2] / line[1]
549 A = m ** 2 + 1
550 B = 2 * (m * b - c[0] - m * c[1])
551 C = b ** 2 - r ** 2 + c[0] ** 2 + c[1] ** 2 - 2 * c[1] * b
552 delta = B ** 2 - 4 * A * C
554 if delta < 0: # no intersection with the circle
555 return None
556 elif delta == 0: # tangent to the circle
557 x0 = m if vertical else - B / 2 / A
558 y0 = c[1] if vertical else m * x0 + b
559 angle0 = GeometricFunctions.arc_angle_between_point_and_abscissa([x0, y0], c)
560 intersect0 = True if angle_e < angle0 < angle_d else False
561 if intersect0:
562 intersect.append([x0, y0])
563 else: # no intersection with the arc
564 return None
565 else: # two intersections with the circle
566 x1 = m if vertical else (- B + np.sqrt(delta)) / 2 / A
567 y1 = np.sqrt(delta) + c[1] if vertical else m * x1 + b
568 angle1 = GeometricFunctions.arc_angle_between_point_and_abscissa([x1, y1], c)
569 intersect1 = True if (angle_e < angle1 < angle_d) or abs(angle1 - angle_e) < 1e-6 or abs(angle1 - angle_d) < 1e-6 else False
570 x2 = m if vertical else (- B - np.sqrt(delta)) / 2 / A
571 y2 = - np.sqrt(delta) + c[1] if vertical else m * x2 + b
572 angle2 = GeometricFunctions.arc_angle_between_point_and_abscissa([x2, y2], c)
573 intersect2 = True if (angle_e < angle2 < angle_d) or abs(angle2 - angle_e) < 1e-6 or abs(angle2 - angle_d) < 1e-6 else False
574 if not intersect1 and not intersect2: # no intersection with the arc
575 return None
576 if intersect1: # first point intersecting the arc
577 intersect.append([x1, y1])
578 if intersect2: # second point intersecting the arc
579 intersect.append([x2, y2])
581 return intersect
584class GmshUtils:
586 def __init__(self, model_name='dummy_name', verbose=True):
587 self.model_name = model_name
588 self.verbose = verbose
590 def initialize(self, verbosity_Gmsh: int = 5):
591 """
592 Initialize Gmsh with options for FiQuS
593 :param verbosity_Gmsh: Input file run.verbosity_Gmsh
594 :type verbosity_Gmsh: int
595 """
596 if not gmsh.is_initialized():
597 gmsh.initialize(sys.argv)
598 gmsh.model.add(str(self.model_name))
599 num_threads = multiprocessing.cpu_count()
600 gmsh.option.setNumber('General.NumThreads', num_threads) # enable multithreading (this seems to be only for meshing)
601 gmsh.option.setNumber('Mesh.MaxNumThreads1D', num_threads)
602 gmsh.option.setNumber('Mesh.MaxNumThreads2D', num_threads)
603 gmsh.option.setNumber('Mesh.MaxNumThreads3D', num_threads)
604 gmsh.option.setNumber('Geometry.OCCParallel', 1)
605 gmsh.option.setNumber('Geometry.ToleranceBoolean', 0.000001)
606 gmsh.option.setString('Geometry.OCCTargetUnit', 'M')
607 gmsh.option.setNumber("General.Verbosity", verbosity_Gmsh)
608 if self.verbose:
609 gmsh.option.setNumber('General.Terminal', 1)
610 else:
611 gmsh.option.setNumber('General.Terminal', 0)
613 def check_for_event(self): # pragma: no cover
614 action = gmsh.onelab.getString("ONELAB/Action")
615 if len(action) and action[0] == "check":
616 gmsh.onelab.setString("ONELAB/Action", [""])
617 if self.verbose:
618 print("-------------------check----------------")
619 gmsh.fltk.update()
620 gmsh.graphics.draw()
621 if len(action) and action[0] == "compute":
622 gmsh.onelab.setString("ONELAB/Action", [""])
623 if self.verbose:
624 print("-------------------compute----------------")
625 gmsh.onelab.setChanged("Gmsh", 0)
626 gmsh.onelab.setChanged("GetDP", 0)
627 gmsh.fltk.update()
628 gmsh.graphics.draw()
629 return True
631 def launch_interactive_GUI(self, close_after=-1): # pragma: no cover
632 gmsh.fltk.initialize()
633 while gmsh.fltk.isAvailable() and self.check_for_event():
634 gmsh.fltk.wait()
635 if close_after >= 0:
636 sleep(close_after)
637 gmsh.fltk.finalize()
638 gmsh.finalize()
641class RoxieParsers:
642 def __init__(self, conductor, block, xyCorner):
643 self.conductor = conductor
644 self.block = block
645 self.xyCorner = xyCorner
647 @staticmethod
648 def parseMap2d(map2dFile: Path, physical_quantity: str = 'magnetic_flux_density'):
649 """
650 Generates pandas data frame with map2d content
651 :param map2dFile: path of map2dFile containing the content to parse
652 :param physical_quantity: magnetic_flux_density or temperature
653 """
654 physical_quantities_abbreviations = {'magnetic_flux_density': ('BX/T', 'BY/T'), 'temperature': ('T/K', '-')}
655 columns = ['BL.', 'COND.', 'NO.', 'X-POS/MM', 'Y-POS/MM'] + \
656 [abbr for abbr in physical_quantities_abbreviations[physical_quantity]] + \
657 ['AREA/MM**2', 'CURRENT', 'FILL FAC.']
658 return pd.read_csv(map2dFile, sep=r"\s{2,}|(?<=2) |(?<=T) ", engine='python', usecols=columns)
660 @staticmethod
661 def parseCond2d(cond2dFile: Path):
662 """
663 Read input file and return list of ConductorPosition objects
665 # input: fileName
666 # output: conductorPositionsList
668 """
669 # conductorStartKeyword = "CONDUCTOR POSITION IN THE CROSS-SECTION"
670 blockStartKeyword = "BLOCK POSITION IN THE CROSS-SECTION"
672 fileContent = open(cond2dFile, "r").read()
674 # separate rows
675 fileContentByRow = fileContent.split("\n")
677 # Find block definition
678 for i in range(len(fileContentByRow)):
679 if blockStartKeyword in fileContentByRow[i]:
680 startOfBlockDefinitionIndex = i
682 # separate part of the data with conductor position information
683 conductorPositions = fileContentByRow[5:startOfBlockDefinitionIndex - 2]
685 # drop every 5th row
686 conductorPositionsFourVertices = list(conductorPositions)
687 del conductorPositionsFourVertices[4::5]
689 # arrange data in a list of lists
690 outputConductorPositions = []
691 for row in conductorPositionsFourVertices:
692 rowSplitStr = row.split(',')
693 rowSplitFloat = [float(elem) for elem in rowSplitStr]
694 outputConductorPositions.append(rowSplitFloat)
696 # arrange data from list to numpy.array
697 outputConductorPositionsMatrix = np.array(outputConductorPositions)
699 # input: outputConductorPositions
700 # output: conductorPositionsList
701 conductorPositionsList = []
702 for i in range(0, len(outputConductorPositions), 4):
703 out = outputConductorPositions[i]
704 conductor = int(out[1])
705 block = int(out[2])
706 xyCorner = outputConductorPositionsMatrix[i:i + 4, 4:6]
707 conductorPositionsList.append(RoxieParsers(conductor, block, xyCorner))
709 return conductorPositionsList
712def initialize_logger(work_folder: str = None, time_stamp: str = None, verbose: bool = True, ):
713 """
714 THis is logger function to write FiQuS log files.
716 :param work_folder: Folder where the log file is written to
717 :type work_folder: str
718 :param time_stamp: time stamp put in the log file name
719 :type time_stamp: str
720 :param verbose: if true INFO level logs are printed, if false only WARNING level logs are printed to the console
721 :type verbose: bool
722 :return: logger object
723 :rtype: object
724 """
726 logger = logging.getLogger()
728 while logger.hasHandlers():
729 logger.removeHandler(logger.handlers[0])
731 if verbose:
732 logger.setLevel(logging.INFO)
733 else:
734 logger.setLevel(logging.WARNING)
736 stdout_handler = logging.StreamHandler(sys.stdout)
737 stdout_handler.setLevel(logging.INFO)
738 stdout_handler.setFormatter(LoggingFormatter())
739 logger.addHandler(stdout_handler)
741 FilesAndFolders.prep_folder(work_folder)
742 FilesAndFolders.prep_folder(os.path.join(work_folder, "logs"))
743 file_handler = logging.FileHandler(os.path.join(work_folder, "logs", f"{time_stamp}.FiQuS.log"))
744 file_handler.setLevel(logging.INFO)
745 fileFormatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')
746 file_handler.setFormatter(fileFormatter)
747 logger.addHandler(file_handler)
749 errorsAndWarnings_file_handler = logging.FileHandler(os.path.join(work_folder, "logs", f"ERRORS_WARNINGS_{time_stamp}.FiQuS.log"))
750 errorsAndWarnings_file_handler.setLevel(logging.WARNING)
751 errorsAndWarnings_file_handler.setFormatter(fileFormatter)
752 logger.addHandler(errorsAndWarnings_file_handler)
754 return logger
757def create_json_schema(data_model: FDM):
758 """
759 Create the JSON Schema from a Pydantic data model
760 :param data_model: FDM
761 :type data_model: FDM
762 """
764 # Generate the raw JSON schema from the Pydantic model
765 json_schema_dict = data_model.model_json_schema()
767 # Replace anyOf with oneOf for better compatibility
768 json_schema_str = json.dumps(json_schema_dict)
769 json_schema_str = json_schema_str.replace("anyOf", "oneOf")
771 # Pretty-print the schema with proper indentation
772 pretty_json_schema = json.dumps(json.loads(json_schema_str), indent=4, ensure_ascii=False)
774 # Define the output folder for the schema
775 docs_folder = os.path.join(
776 os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "docs"
777 )
779 # Create the _inputs folder for the JSON schema
780 json_schema_file_path = os.path.join(docs_folder, "schema.json")
781 os.makedirs(os.path.dirname(json_schema_file_path), exist_ok=True)
783 # Write the prettified JSON schema to a file
784 with open(json_schema_file_path, "w", encoding="utf-8") as file:
785 file.write(pretty_json_schema)
787def get_data_settings(GetDP_path=None, settings=None):
788 user_name = getpass.getuser()
790 if user_name == 'root':
791 user_name = 'SYSTEM'
792 elif user_name == 'MP-WIN-02$':
793 user_name = 'MP_WIN_02'
794 if not settings:
795 path_to_settings_file = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "tests", f"settings.{user_name}.yaml")
796 try:
797 settings = FilesAndFolders.read_data_from_yaml(path_to_settings_file, DataSettings)
798 except:
799 with open(settings.error.log, 'a') as file:
800 # Append the string to the file
801 file.write(f'Could not find: {path_to_settings_file}' + '\n')
802 raise ValueError(f'File: {path_to_settings_file} does not exist.')
804 if platform.system() == 'Windows':
805 if GetDP_path:
806 settings.GetDP_path = GetDP_path
807 elif platform.system() == 'Linux':
808 settings.GetDP_path = 'getdp'
809 elif platform.system() == 'Darwin':
810 if GetDP_path:
811 settings.GetDP_path = GetDP_path
812 else:
813 raise ValueError(f'Python claims that you are running on operating system: {platform.system()} and it is not supported by FiQuS!')
815 return settings