Coverage for fiqus/utils/Utils.py: 79%
257 statements
« prev ^ index » next coverage.py v6.4.4, created at 2024-05-20 03:24 +0200
« prev ^ index » next coverage.py v6.4.4, created at 2024-05-20 03:24 +0200
1import sys
2import os
3import shutil
4import numpy as np
5from pathlib import Path
6from time import sleep
7import multiprocessing
8import ruamel.yaml
9import warnings
10import gmsh
11import logging
12import re
14logger = logging.getLogger(__name__)
16class LoggingFormatter(logging.Formatter):
17 """
18 Logging formatter class
19 """
20 grey = "\x1b[38;20m" # debug level
21 white = "\x1b[37;20m" # info level
22 yellow = "\x1b[33;20m" # warning level
23 red = "\x1b[31;20m" # error level
24 bold_red = "\x1b[31;1m" # critical level
26 reset = "\x1b[0m"
27 format = '%(asctime)s | %(levelname)s | %(message)s'
29 FORMATS = {
30 logging.DEBUG: grey + format + reset,
31 logging.INFO: white + format + reset,
32 logging.WARNING: yellow + format + reset,
33 logging.ERROR: red + format + reset,
34 logging.CRITICAL: bold_red + format + reset
35 }
37 def format(self, record):
38 log_fmt = self.FORMATS.get(record.levelno)
39 formatter = logging.Formatter(log_fmt)
40 return formatter.format(record)
43class FilesAndFolders:
44 @staticmethod
45 def read_data_from_yaml(full_file_path, data_class):
46 with open(full_file_path, 'r') as stream:
47 yaml = ruamel.yaml.YAML(typ='safe', pure=True)
48 yaml_str = yaml.load(stream)
49 if "magnet" in yaml_str:
50 yaml_str["magnet"]["input_file_path"] = str(full_file_path)
52 return data_class(**yaml_str)
54 @staticmethod
55 def write_data_to_yaml(full_file_path, dict_of_data_class):
56 def my_represent_none(self, data):
57 return self.represent_scalar('tag:yaml.org,2002:null', 'null')
59 yaml = ruamel.yaml.YAML()
60 yaml.default_flow_style = False
61 yaml.emitter.alt_null = 'Null'
62 yaml.representer.add_representer(type(None), my_represent_none)
63 with open(full_file_path, 'w') as yaml_file:
64 yaml.dump(dict_of_data_class, yaml_file)
66 @staticmethod
67 def prep_folder(folder_full_path, clear: bool = False):
68 if clear:
69 if os.path.exists(folder_full_path):
70 shutil.rmtree(folder_full_path) # delete directory
71 if not os.path.exists(folder_full_path):
72 os.makedirs(folder_full_path) # make new directory
74 @staticmethod
75 def get_folder_path(folder_type, folder, ref_nr, overwrite, required_folder):
76 if required_folder and not (ref_nr and overwrite):
77 last_nr = 0
78 for study in [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()]:
79 last_nr = max(int(study[study.find('_') + 1:]), last_nr)
80 if overwrite and required_folder and last_nr > 0:
81 run_nr = str(last_nr)
82 else:
83 run_nr = str(last_nr + 1)
84 else:
85 run_nr = str(ref_nr)
87 folder_path = os.path.join(folder, folder_type + '_' + run_nr)
88 FilesAndFolders.prep_folder(folder_path, overwrite and required_folder)
89 return folder_path
91 @staticmethod
92 def get_folder_path(folder_type, folder, folder_key, overwrite, required_folder):
93 if required_folder and not (folder_key and overwrite):
94 all_dirs = [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()]
95 all_relevant_dirs = [x for x in all_dirs if x.startswith(f"{folder_type}_{folder_key}")]
96 if f"{folder_type}_{folder_key}" in all_relevant_dirs:
97 new_folder_key = f"{folder_key}_{len(all_relevant_dirs) + 1}"
98 folder_key = new_folder_key
100 folder_path = os.path.join(folder, folder_type + '_' + str(folder_key))
101 # Disable the line below to avoid deleating volder
102 FilesAndFolders.prep_folder(folder_path, overwrite and required_folder)
103 return folder_path
105 @staticmethod
106 def compute_folder_key(folder_type, folder, overwrite):
107 # Find all the directories in the folder
108 all_dirs = [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()]
110 # Find all the directiories that start with the folder_type (e.g. geometry, mesh, solution)
111 # Then combine them into a single string with a custom seperator (se@p)
112 # Seperators are used to guarantee the directories can be split later
113 all_relevant_dirs = " se@p ".join([x for x in all_dirs if x.startswith(f"{folder_type}_")])
114 all_relevant_dirs = f"{all_relevant_dirs} se@p "
116 # Find all the integer keys in the relevant directories
117 integers_in_relevant_dirs = re.findall(rf'{folder_type}_(\d+) se@p ', all_relevant_dirs)
119 if integers_in_relevant_dirs is None:
120 # If there are no integers in the relevant directories, set the key to 1
121 folder_key = 1
122 else:
123 # Make a list of integers out of the integers in the relevant directories
124 integers_in_relevant_dirs = [int(x) for x in integers_in_relevant_dirs]
126 # Sort the integers in the relevant directories
127 integers_in_relevant_dirs.sort()
129 if overwrite:
130 # If overwrite is true, set the key to the largest integer in the
131 # so that the folder with the largest integer key is overwritten
132 if len(integers_in_relevant_dirs) == 0:
133 folder_key = 1
134 else:
135 folder_key = max(integers_in_relevant_dirs)
136 else:
137 # If overwrite is false, then find the smallest integer key that is not
138 # in the list of integers in the relevant directories
139 folder_key = 1
140 for i in integers_in_relevant_dirs:
141 if folder_key < i:
142 break
143 folder_key += 1
145 return folder_key
147 @staticmethod
148 def print_welcome_graphics():
149 print(r" _____ _ ___ ____ ")
150 print(r"| ___(_)/ _ \ _ _/ ___| ")
151 print(r"| |_ | | | | | | | \___ \ ")
152 print(r"| _| | | |_| | |_| |___) |")
153 print(r"|_| |_|\__\_\\__,_|____/ ")
154 print("")
157class CheckForExceptions:
158 @staticmethod
159 def check_inputs(run): # RunFiQuS()
160 if run.type == 'start_from_yaml':
161 if run.geometry and not run.overwrite:
162 warnings.warn("Warning: Geometry folder is needed only if it has to be overwritten. Ignoring it...")
163 if run.solution or run.mesh:
164 warnings.warn("Warning: Mesh and Solution folders are not needed. Ignoring them...")
165 elif run.type == 'geometry_only':
166 if run.solution or run.mesh:
167 warnings.warn("Warning: Mesh and Solution folders are not needed. Ignoring them...")
168 elif run.type == 'mesh_and_solve_with_post_process':
169 if not run.geometry:
170 raise Exception('Full path to Geometry not provided. '
171 'Insert options -> reference_files -> geometry.')
172 if run.mesh and not run.overwrite:
173 warnings.warn("Warning: Mesh folder is needed only if it has to be overwritten. Ignoring it...")
174 if run.solution:
175 warnings.warn("Warning: Solution folder is not needed. Ignoring it...")
176 elif run.type == 'mesh_only':
177 if not run.geometry:
178 raise Exception('Full path to Mesh not provided. '
179 'Insert options -> reference_files -> geometry.')
180 if run.solution:
181 warnings.warn("Warning: Solution folder is not needed. Ignoring it...")
182 elif run.type == 'solve_with_post_process':
183 if not run.mesh or not run.geometry:
184 raise Exception('Full path to Mesh not provided. '
185 'Insert options -> reference_files -> geometry and mesh.')
186 if run.solution and not run.overwrite:
187 warnings.warn("Warning: Solution folder is needed only if it has to be overwritten. Ignoring it...")
188 elif run.type == 'solve_only':
189 if not run.mesh or not run.geometry:
190 raise Exception('Full path to Mesh not provided. '
191 'Insert options -> reference_files -> geometry and mesh.')
192 if run.solution and not run.overwrite:
193 warnings.warn("Warning: Solution folder is needed only if it has to be overwritten. Ignoring it...")
194 elif run.type == 'post_process_only':
195 if not run.mesh or not run.geometry or not run.solution:
196 raise Exception('Full path to Solution not provided. '
197 'Insert options -> reference_files -> geometry, mesh, and solution.')
199 @staticmethod
200 def check_overwrite_conditions(folder_type, folder, folder_key):
201 if folder_key:
202 if not os.path.exists(os.path.join(folder, folder_type + '_' + str(folder_key))):
203 warnings.warn(
204 f'The folder {folder_type}_{folder_key} does not exist. Creating it...')
205 else:
206 warnings.warn(
207 f'Reference number of the folder {folder_type} not provided. '
208 f'Overwriting the latest {folder_type} folder...')
211class GeometricFunctions:
213 @staticmethod
214 def centroid(X, Y):
215 """
216 Computes the centroid coordinates of a non-self-intersecting closed polygon
217 :param X: list of X coordinate of the vertices
218 :param Y: list of y coordinate of the vertices
219 """
220 sum_A, sum_Cx, sum_Cy = 0, 0, 0
221 for i in range(len(X)):
222 index = i + 1 if i != len(X) - 1 else 0
223 A = X[i] * Y[index] - X[index] * Y[i]
224 sum_Cx += (X[i] + X[index]) * A
225 sum_Cy += (Y[i] + Y[index]) * A
226 sum_A += A
227 factor = 1 / (3 * sum_A)
228 return [factor * sum_Cx, factor * sum_Cy]
230 @staticmethod
231 def arc_center_from_3_points(a, b, c):
232 """
233 Computes the center coordinates of an arc passing through three points
234 :param a: DataRoxieParser.Coord class object of one arc point
235 :param b: DataRoxieParser.Coord class object of one arc point
236 :param c: DataRoxieParser.Coord class object of one arc point
237 """
238 ab = [a.x - b.x, a.y - b.y]
239 ac = [a.x - c.x, a.y - c.y]
240 sac = [a.x * a.x - c.x * c.x, a.y * a.y - c.y * c.y]
241 sba = [b.x * b.x - a.x * a.x, b.y * b.y - a.y * a.y]
242 yy = (sac[0] * ab[0] + sac[1] * ab[0] + sba[0] * ac[0] + sba[1] * ac[0]) / \
243 (2 * ((c.y - a.y) * ab[0] - (b.y - a.y) * ac[0]))
244 xx = (sac[0] * ab[1] + sac[1] * ab[1] + sba[0] * ac[1] + sba[1] * ac[1]) / \
245 (2 * ((c.x - a.x) * ab[1] - (b.x - a.x) * ac[1]))
246 return [-xx, -yy]
249class GmshUtils:
251 def __init__(self, model_name=None, verbose=True):
252 self.model_name = model_name
253 self.verbose = verbose
255 @staticmethod
256 def initialize():
257 if not gmsh.is_initialized():
258 gmsh.initialize(sys.argv)
259 num_threads = multiprocessing.cpu_count()
260 gmsh.option.setNumber('General.NumThreads', num_threads) # enable multithreading (this seems to be only for meshing)
261 gmsh.option.setNumber('Mesh.MaxNumThreads1D', num_threads)
262 gmsh.option.setNumber('Mesh.MaxNumThreads2D', num_threads)
263 gmsh.option.setNumber('Mesh.MaxNumThreads3D', num_threads)
265 gmsh.option.setNumber('Geometry.ToleranceBoolean', 0.0000001)
266 gmsh.option.setNumber('General.Terminal', 1)
267 # gmsh.model.add(self.model_name)
269 def check_for_event(self): # pragma: no cover
270 action = gmsh.onelab.getString("ONELAB/Action")
271 if len(action) and action[0] == "check":
272 gmsh.onelab.setString("ONELAB/Action", [""])
273 if self.verbose:
274 print("-------------------check----------------")
275 gmsh.fltk.update()
276 gmsh.graphics.draw()
277 if len(action) and action[0] == "compute":
278 gmsh.onelab.setString("ONELAB/Action", [""])
279 if self.verbose:
280 print("-------------------compute----------------")
281 gmsh.onelab.setChanged("Gmsh", 0)
282 gmsh.onelab.setChanged("GetDP", 0)
283 gmsh.fltk.update()
284 gmsh.graphics.draw()
285 return True
287 def launch_interactive_GUI(self, close_after=-1): # pragma: no cover
288 gmsh.fltk.initialize()
289 while gmsh.fltk.isAvailable() and self.check_for_event():
290 gmsh.fltk.wait()
291 if close_after >= 0:
292 sleep(close_after)
293 gmsh.fltk.finalize()
294 gmsh.finalize()
297class RoxieParsers:
298 def __init__(self, conductor, block, xyCorner):
299 self.conductor = conductor
300 self.block = block
301 self.xyCorner = xyCorner
303 @staticmethod
304 def parseMap2d(map2dFile: Path, headerLines: int = 1):
305 """
306 Generates array-stream of values of map2dFile
307 :param map2dFile: path of map2dFile containing the content to parse
308 :param headerLines: which index the header line is at - will start to read after that
309 """
310 # Open map2dfile
311 fileContent = open(map2dFile, "r").read()
312 # Split content of file in rows
313 fileContentByRow = fileContent.split("\n")
314 # Create array-matrix to fill in with the values of the file
315 output_matrix = np.array([[None for x in range(10)] for y in range(headerLines + 1, len(fileContentByRow) - 1)],
316 dtype=float)
318 # Assign values to the matrix row by row
319 for index, rowContent in enumerate(fileContentByRow):
320 if index > headerLines and rowContent: # without header
321 row = rowContent.split()
322 output_array = np.array([]) # create temp. array
323 output_array = np.append(output_array, int(row[0])) # strands to groups
324 output_array = np.append(output_array, int(row[1])) # strands to halfturn
325 output_array = np.append(output_array, float(row[2])) # idx
326 output_array = np.append(output_array, float(row[3]) / 1e3) # x_strands in [m]
327 output_array = np.append(output_array, float(row[4]) / 1e3) # y_strands in [m]
328 output_array = np.append(output_array, float(row[5])) # Bx
329 output_array = np.append(output_array, float(row[6])) # By
330 output_array = np.append(output_array, float(row[7]) / 1e6) # Area in [m^2]
331 output_array = np.append(output_array, float(row[8])) # I_strands
332 output_array = np.append(output_array, float(row[9])) # fill factor
333 output_matrix[index - headerLines - 1] = output_array # assign into matrix
334 return output_matrix
336 @staticmethod
337 def parseCond2d(cond2dFile: Path):
338 """
339 Read input file and return list of ConductorPosition objects
341 # input: fileName
342 # output: conductorPositionsList
344 """
345 # conductorStartKeyword = "CONDUCTOR POSITION IN THE CROSS-SECTION"
346 blockStartKeyword = "BLOCK POSITION IN THE CROSS-SECTION"
348 fileContent = open(cond2dFile, "r").read()
350 # separate rows
351 fileContentByRow = fileContent.split("\n")
353 # Find block definition
354 for i in range(len(fileContentByRow)):
355 if blockStartKeyword in fileContentByRow[i]:
356 startOfBlockDefinitionIndex = i
358 # separate part of the data with conductor position information
359 conductorPositions = fileContentByRow[5:startOfBlockDefinitionIndex - 2]
361 # drop every 5th row
362 conductorPositionsFourVertices = list(conductorPositions)
363 del conductorPositionsFourVertices[4::5]
365 # arrange data in a list of lists
366 outputConductorPositions = []
367 for row in conductorPositionsFourVertices:
368 rowSplitStr = row.split(',')
369 rowSplitFloat = [float(elem) for elem in rowSplitStr]
370 outputConductorPositions.append(rowSplitFloat)
372 # arrange data from list to numpy.array
373 outputConductorPositionsMatrix = np.array(outputConductorPositions)
375 # input: outputConductorPositions
376 # output: conductorPositionsList
377 conductorPositionsList = []
378 for i in range(0, len(outputConductorPositions), 4):
379 out = outputConductorPositions[i]
380 conductor = int(out[1])
381 block = int(out[2])
382 xyCorner = outputConductorPositionsMatrix[i:i + 4, 4:6]
383 conductorPositionsList.append(RoxieParsers(conductor, block, xyCorner))
385 return conductorPositionsList
387def initialize_logger(verbose: bool = True, work_folder: str = None, time_stamp: str = None):
388 logger = logging.getLogger()
390 if verbose:
391 logger.setLevel(logging.INFO)
392 else:
393 logger.setLevel(logging.WARNING)
395 for handler in logger.handlers:
396 logger.handlers.remove(handler)
397 handler.close()
399 stdout_handler = logging.StreamHandler(sys.stdout)
400 stdout_handler.setLevel(logging.INFO)
401 stdout_handler.setFormatter(LoggingFormatter())
402 logger.addHandler(stdout_handler)
404 FilesAndFolders.prep_folder(work_folder)
405 FilesAndFolders.prep_folder(os.path.join(work_folder, "logs"))
406 file_handler = logging.FileHandler(os.path.join(work_folder, "logs", f"{time_stamp}.FiQuS.log"))
407 file_handler.setLevel(logging.INFO)
408 fileFormatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')
409 file_handler.setFormatter(fileFormatter)
410 logger.addHandler(file_handler)
412 errorsAndWarnings_file_handler = logging.FileHandler(os.path.join(work_folder, "logs", f"ERRORS_WARNINGS_{time_stamp}.FiQuS.log"))
413 errorsAndWarnings_file_handler.setLevel(logging.WARNING)
414 errorsAndWarnings_file_handler.setFormatter(fileFormatter)
415 logger.addHandler(errorsAndWarnings_file_handler)
417 return logger