Coverage for fiqus/utils/Utils.py: 79%

257 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2024-05-20 03:24 +0200

1import sys 

2import os 

3import shutil 

4import numpy as np 

5from pathlib import Path 

6from time import sleep 

7import multiprocessing 

8import ruamel.yaml 

9import warnings 

10import gmsh 

11import logging 

12import re 

13 

14logger = logging.getLogger(__name__) 

15 

16class LoggingFormatter(logging.Formatter): 

17 """ 

18 Logging formatter class 

19 """ 

20 grey = "\x1b[38;20m" # debug level 

21 white = "\x1b[37;20m" # info level 

22 yellow = "\x1b[33;20m" # warning level 

23 red = "\x1b[31;20m" # error level 

24 bold_red = "\x1b[31;1m" # critical level 

25 

26 reset = "\x1b[0m" 

27 format = '%(asctime)s | %(levelname)s | %(message)s' 

28 

29 FORMATS = { 

30 logging.DEBUG: grey + format + reset, 

31 logging.INFO: white + format + reset, 

32 logging.WARNING: yellow + format + reset, 

33 logging.ERROR: red + format + reset, 

34 logging.CRITICAL: bold_red + format + reset 

35 } 

36 

37 def format(self, record): 

38 log_fmt = self.FORMATS.get(record.levelno) 

39 formatter = logging.Formatter(log_fmt) 

40 return formatter.format(record) 

41 

42 

43class FilesAndFolders: 

44 @staticmethod 

45 def read_data_from_yaml(full_file_path, data_class): 

46 with open(full_file_path, 'r') as stream: 

47 yaml = ruamel.yaml.YAML(typ='safe', pure=True) 

48 yaml_str = yaml.load(stream) 

49 if "magnet" in yaml_str: 

50 yaml_str["magnet"]["input_file_path"] = str(full_file_path) 

51 

52 return data_class(**yaml_str) 

53 

54 @staticmethod 

55 def write_data_to_yaml(full_file_path, dict_of_data_class): 

56 def my_represent_none(self, data): 

57 return self.represent_scalar('tag:yaml.org,2002:null', 'null') 

58 

59 yaml = ruamel.yaml.YAML() 

60 yaml.default_flow_style = False 

61 yaml.emitter.alt_null = 'Null' 

62 yaml.representer.add_representer(type(None), my_represent_none) 

63 with open(full_file_path, 'w') as yaml_file: 

64 yaml.dump(dict_of_data_class, yaml_file) 

65 

66 @staticmethod 

67 def prep_folder(folder_full_path, clear: bool = False): 

68 if clear: 

69 if os.path.exists(folder_full_path): 

70 shutil.rmtree(folder_full_path) # delete directory 

71 if not os.path.exists(folder_full_path): 

72 os.makedirs(folder_full_path) # make new directory 

73 

74 @staticmethod 

75 def get_folder_path(folder_type, folder, ref_nr, overwrite, required_folder): 

76 if required_folder and not (ref_nr and overwrite): 

77 last_nr = 0 

78 for study in [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()]: 

79 last_nr = max(int(study[study.find('_') + 1:]), last_nr) 

80 if overwrite and required_folder and last_nr > 0: 

81 run_nr = str(last_nr) 

82 else: 

83 run_nr = str(last_nr + 1) 

84 else: 

85 run_nr = str(ref_nr) 

86 

87 folder_path = os.path.join(folder, folder_type + '_' + run_nr) 

88 FilesAndFolders.prep_folder(folder_path, overwrite and required_folder) 

89 return folder_path 

90 

91 @staticmethod 

92 def get_folder_path(folder_type, folder, folder_key, overwrite, required_folder): 

93 if required_folder and not (folder_key and overwrite): 

94 all_dirs = [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()] 

95 all_relevant_dirs = [x for x in all_dirs if x.startswith(f"{folder_type}_{folder_key}")] 

96 if f"{folder_type}_{folder_key}" in all_relevant_dirs: 

97 new_folder_key = f"{folder_key}_{len(all_relevant_dirs) + 1}" 

98 folder_key = new_folder_key 

99 

100 folder_path = os.path.join(folder, folder_type + '_' + str(folder_key)) 

101 # Disable the line below to avoid deleating volder 

102 FilesAndFolders.prep_folder(folder_path, overwrite and required_folder) 

103 return folder_path 

104 

105 @staticmethod 

106 def compute_folder_key(folder_type, folder, overwrite): 

107 # Find all the directories in the folder 

108 all_dirs = [x.parts[-1] for x in Path(folder).iterdir() if x.is_dir()] 

109 

110 # Find all the directiories that start with the folder_type (e.g. geometry, mesh, solution) 

111 # Then combine them into a single string with a custom seperator (se@p) 

112 # Seperators are used to guarantee the directories can be split later 

113 all_relevant_dirs = " se@p ".join([x for x in all_dirs if x.startswith(f"{folder_type}_")]) 

114 all_relevant_dirs = f"{all_relevant_dirs} se@p " 

115 

116 # Find all the integer keys in the relevant directories 

117 integers_in_relevant_dirs = re.findall(rf'{folder_type}_(\d+) se@p ', all_relevant_dirs) 

118 

119 if integers_in_relevant_dirs is None: 

120 # If there are no integers in the relevant directories, set the key to 1 

121 folder_key = 1 

122 else: 

123 # Make a list of integers out of the integers in the relevant directories 

124 integers_in_relevant_dirs = [int(x) for x in integers_in_relevant_dirs] 

125 

126 # Sort the integers in the relevant directories 

127 integers_in_relevant_dirs.sort() 

128 

129 if overwrite: 

130 # If overwrite is true, set the key to the largest integer in the 

131 # so that the folder with the largest integer key is overwritten 

132 if len(integers_in_relevant_dirs) == 0: 

133 folder_key = 1 

134 else: 

135 folder_key = max(integers_in_relevant_dirs) 

136 else: 

137 # If overwrite is false, then find the smallest integer key that is not 

138 # in the list of integers in the relevant directories 

139 folder_key = 1 

140 for i in integers_in_relevant_dirs: 

141 if folder_key < i: 

142 break 

143 folder_key += 1 

144 

145 return folder_key 

146 

147 @staticmethod 

148 def print_welcome_graphics(): 

149 print(r" _____ _ ___ ____ ") 

150 print(r"| ___(_)/ _ \ _ _/ ___| ") 

151 print(r"| |_ | | | | | | | \___ \ ") 

152 print(r"| _| | | |_| | |_| |___) |") 

153 print(r"|_| |_|\__\_\\__,_|____/ ") 

154 print("") 

155 

156 

157class CheckForExceptions: 

158 @staticmethod 

159 def check_inputs(run): # RunFiQuS() 

160 if run.type == 'start_from_yaml': 

161 if run.geometry and not run.overwrite: 

162 warnings.warn("Warning: Geometry folder is needed only if it has to be overwritten. Ignoring it...") 

163 if run.solution or run.mesh: 

164 warnings.warn("Warning: Mesh and Solution folders are not needed. Ignoring them...") 

165 elif run.type == 'geometry_only': 

166 if run.solution or run.mesh: 

167 warnings.warn("Warning: Mesh and Solution folders are not needed. Ignoring them...") 

168 elif run.type == 'mesh_and_solve_with_post_process': 

169 if not run.geometry: 

170 raise Exception('Full path to Geometry not provided. ' 

171 'Insert options -> reference_files -> geometry.') 

172 if run.mesh and not run.overwrite: 

173 warnings.warn("Warning: Mesh folder is needed only if it has to be overwritten. Ignoring it...") 

174 if run.solution: 

175 warnings.warn("Warning: Solution folder is not needed. Ignoring it...") 

176 elif run.type == 'mesh_only': 

177 if not run.geometry: 

178 raise Exception('Full path to Mesh not provided. ' 

179 'Insert options -> reference_files -> geometry.') 

180 if run.solution: 

181 warnings.warn("Warning: Solution folder is not needed. Ignoring it...") 

182 elif run.type == 'solve_with_post_process': 

183 if not run.mesh or not run.geometry: 

184 raise Exception('Full path to Mesh not provided. ' 

185 'Insert options -> reference_files -> geometry and mesh.') 

186 if run.solution and not run.overwrite: 

187 warnings.warn("Warning: Solution folder is needed only if it has to be overwritten. Ignoring it...") 

188 elif run.type == 'solve_only': 

189 if not run.mesh or not run.geometry: 

190 raise Exception('Full path to Mesh not provided. ' 

191 'Insert options -> reference_files -> geometry and mesh.') 

192 if run.solution and not run.overwrite: 

193 warnings.warn("Warning: Solution folder is needed only if it has to be overwritten. Ignoring it...") 

194 elif run.type == 'post_process_only': 

195 if not run.mesh or not run.geometry or not run.solution: 

196 raise Exception('Full path to Solution not provided. ' 

197 'Insert options -> reference_files -> geometry, mesh, and solution.') 

198 

199 @staticmethod 

200 def check_overwrite_conditions(folder_type, folder, folder_key): 

201 if folder_key: 

202 if not os.path.exists(os.path.join(folder, folder_type + '_' + str(folder_key))): 

203 warnings.warn( 

204 f'The folder {folder_type}_{folder_key} does not exist. Creating it...') 

205 else: 

206 warnings.warn( 

207 f'Reference number of the folder {folder_type} not provided. ' 

208 f'Overwriting the latest {folder_type} folder...') 

209 

210 

211class GeometricFunctions: 

212 

213 @staticmethod 

214 def centroid(X, Y): 

215 """ 

216 Computes the centroid coordinates of a non-self-intersecting closed polygon 

217 :param X: list of X coordinate of the vertices 

218 :param Y: list of y coordinate of the vertices 

219 """ 

220 sum_A, sum_Cx, sum_Cy = 0, 0, 0 

221 for i in range(len(X)): 

222 index = i + 1 if i != len(X) - 1 else 0 

223 A = X[i] * Y[index] - X[index] * Y[i] 

224 sum_Cx += (X[i] + X[index]) * A 

225 sum_Cy += (Y[i] + Y[index]) * A 

226 sum_A += A 

227 factor = 1 / (3 * sum_A) 

228 return [factor * sum_Cx, factor * sum_Cy] 

229 

230 @staticmethod 

231 def arc_center_from_3_points(a, b, c): 

232 """ 

233 Computes the center coordinates of an arc passing through three points 

234 :param a: DataRoxieParser.Coord class object of one arc point 

235 :param b: DataRoxieParser.Coord class object of one arc point 

236 :param c: DataRoxieParser.Coord class object of one arc point 

237 """ 

238 ab = [a.x - b.x, a.y - b.y] 

239 ac = [a.x - c.x, a.y - c.y] 

240 sac = [a.x * a.x - c.x * c.x, a.y * a.y - c.y * c.y] 

241 sba = [b.x * b.x - a.x * a.x, b.y * b.y - a.y * a.y] 

242 yy = (sac[0] * ab[0] + sac[1] * ab[0] + sba[0] * ac[0] + sba[1] * ac[0]) / \ 

243 (2 * ((c.y - a.y) * ab[0] - (b.y - a.y) * ac[0])) 

244 xx = (sac[0] * ab[1] + sac[1] * ab[1] + sba[0] * ac[1] + sba[1] * ac[1]) / \ 

245 (2 * ((c.x - a.x) * ab[1] - (b.x - a.x) * ac[1])) 

246 return [-xx, -yy] 

247 

248 

249class GmshUtils: 

250 

251 def __init__(self, model_name=None, verbose=True): 

252 self.model_name = model_name 

253 self.verbose = verbose 

254 

255 @staticmethod 

256 def initialize(): 

257 if not gmsh.is_initialized(): 

258 gmsh.initialize(sys.argv) 

259 num_threads = multiprocessing.cpu_count() 

260 gmsh.option.setNumber('General.NumThreads', num_threads) # enable multithreading (this seems to be only for meshing) 

261 gmsh.option.setNumber('Mesh.MaxNumThreads1D', num_threads) 

262 gmsh.option.setNumber('Mesh.MaxNumThreads2D', num_threads) 

263 gmsh.option.setNumber('Mesh.MaxNumThreads3D', num_threads) 

264 

265 gmsh.option.setNumber('Geometry.ToleranceBoolean', 0.0000001) 

266 gmsh.option.setNumber('General.Terminal', 1) 

267 # gmsh.model.add(self.model_name) 

268 

269 def check_for_event(self): # pragma: no cover 

270 action = gmsh.onelab.getString("ONELAB/Action") 

271 if len(action) and action[0] == "check": 

272 gmsh.onelab.setString("ONELAB/Action", [""]) 

273 if self.verbose: 

274 print("-------------------check----------------") 

275 gmsh.fltk.update() 

276 gmsh.graphics.draw() 

277 if len(action) and action[0] == "compute": 

278 gmsh.onelab.setString("ONELAB/Action", [""]) 

279 if self.verbose: 

280 print("-------------------compute----------------") 

281 gmsh.onelab.setChanged("Gmsh", 0) 

282 gmsh.onelab.setChanged("GetDP", 0) 

283 gmsh.fltk.update() 

284 gmsh.graphics.draw() 

285 return True 

286 

287 def launch_interactive_GUI(self, close_after=-1): # pragma: no cover 

288 gmsh.fltk.initialize() 

289 while gmsh.fltk.isAvailable() and self.check_for_event(): 

290 gmsh.fltk.wait() 

291 if close_after >= 0: 

292 sleep(close_after) 

293 gmsh.fltk.finalize() 

294 gmsh.finalize() 

295 

296 

297class RoxieParsers: 

298 def __init__(self, conductor, block, xyCorner): 

299 self.conductor = conductor 

300 self.block = block 

301 self.xyCorner = xyCorner 

302 

303 @staticmethod 

304 def parseMap2d(map2dFile: Path, headerLines: int = 1): 

305 """ 

306 Generates array-stream of values of map2dFile 

307 :param map2dFile: path of map2dFile containing the content to parse 

308 :param headerLines: which index the header line is at - will start to read after that 

309 """ 

310 # Open map2dfile 

311 fileContent = open(map2dFile, "r").read() 

312 # Split content of file in rows 

313 fileContentByRow = fileContent.split("\n") 

314 # Create array-matrix to fill in with the values of the file 

315 output_matrix = np.array([[None for x in range(10)] for y in range(headerLines + 1, len(fileContentByRow) - 1)], 

316 dtype=float) 

317 

318 # Assign values to the matrix row by row 

319 for index, rowContent in enumerate(fileContentByRow): 

320 if index > headerLines and rowContent: # without header 

321 row = rowContent.split() 

322 output_array = np.array([]) # create temp. array 

323 output_array = np.append(output_array, int(row[0])) # strands to groups 

324 output_array = np.append(output_array, int(row[1])) # strands to halfturn 

325 output_array = np.append(output_array, float(row[2])) # idx 

326 output_array = np.append(output_array, float(row[3]) / 1e3) # x_strands in [m] 

327 output_array = np.append(output_array, float(row[4]) / 1e3) # y_strands in [m] 

328 output_array = np.append(output_array, float(row[5])) # Bx 

329 output_array = np.append(output_array, float(row[6])) # By 

330 output_array = np.append(output_array, float(row[7]) / 1e6) # Area in [m^2] 

331 output_array = np.append(output_array, float(row[8])) # I_strands 

332 output_array = np.append(output_array, float(row[9])) # fill factor 

333 output_matrix[index - headerLines - 1] = output_array # assign into matrix 

334 return output_matrix 

335 

336 @staticmethod 

337 def parseCond2d(cond2dFile: Path): 

338 """ 

339 Read input file and return list of ConductorPosition objects 

340 

341 # input: fileName 

342 # output: conductorPositionsList 

343 

344 """ 

345 # conductorStartKeyword = "CONDUCTOR POSITION IN THE CROSS-SECTION" 

346 blockStartKeyword = "BLOCK POSITION IN THE CROSS-SECTION" 

347 

348 fileContent = open(cond2dFile, "r").read() 

349 

350 # separate rows 

351 fileContentByRow = fileContent.split("\n") 

352 

353 # Find block definition 

354 for i in range(len(fileContentByRow)): 

355 if blockStartKeyword in fileContentByRow[i]: 

356 startOfBlockDefinitionIndex = i 

357 

358 # separate part of the data with conductor position information 

359 conductorPositions = fileContentByRow[5:startOfBlockDefinitionIndex - 2] 

360 

361 # drop every 5th row 

362 conductorPositionsFourVertices = list(conductorPositions) 

363 del conductorPositionsFourVertices[4::5] 

364 

365 # arrange data in a list of lists 

366 outputConductorPositions = [] 

367 for row in conductorPositionsFourVertices: 

368 rowSplitStr = row.split(',') 

369 rowSplitFloat = [float(elem) for elem in rowSplitStr] 

370 outputConductorPositions.append(rowSplitFloat) 

371 

372 # arrange data from list to numpy.array 

373 outputConductorPositionsMatrix = np.array(outputConductorPositions) 

374 

375 # input: outputConductorPositions 

376 # output: conductorPositionsList 

377 conductorPositionsList = [] 

378 for i in range(0, len(outputConductorPositions), 4): 

379 out = outputConductorPositions[i] 

380 conductor = int(out[1]) 

381 block = int(out[2]) 

382 xyCorner = outputConductorPositionsMatrix[i:i + 4, 4:6] 

383 conductorPositionsList.append(RoxieParsers(conductor, block, xyCorner)) 

384 

385 return conductorPositionsList 

386 

387def initialize_logger(verbose: bool = True, work_folder: str = None, time_stamp: str = None): 

388 logger = logging.getLogger() 

389 

390 if verbose: 

391 logger.setLevel(logging.INFO) 

392 else: 

393 logger.setLevel(logging.WARNING) 

394 

395 for handler in logger.handlers: 

396 logger.handlers.remove(handler) 

397 handler.close() 

398 

399 stdout_handler = logging.StreamHandler(sys.stdout) 

400 stdout_handler.setLevel(logging.INFO) 

401 stdout_handler.setFormatter(LoggingFormatter()) 

402 logger.addHandler(stdout_handler) 

403 

404 FilesAndFolders.prep_folder(work_folder) 

405 FilesAndFolders.prep_folder(os.path.join(work_folder, "logs")) 

406 file_handler = logging.FileHandler(os.path.join(work_folder, "logs", f"{time_stamp}.FiQuS.log")) 

407 file_handler.setLevel(logging.INFO) 

408 fileFormatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s') 

409 file_handler.setFormatter(fileFormatter) 

410 logger.addHandler(file_handler) 

411 

412 errorsAndWarnings_file_handler = logging.FileHandler(os.path.join(work_folder, "logs", f"ERRORS_WARNINGS_{time_stamp}.FiQuS.log")) 

413 errorsAndWarnings_file_handler.setLevel(logging.WARNING) 

414 errorsAndWarnings_file_handler.setFormatter(fileFormatter) 

415 logger.addHandler(errorsAndWarnings_file_handler) 

416 

417 return logger