Coverage for fiqus/getdp_runners/RunGetdpConductorAC_Strand.py: 80%
134 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-14 02:37 +0100
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-14 02:37 +0100
1import timeit
2import logging
3import os
4import subprocess
5import re
6import pandas as pd
7import pickle
8import numpy as np
10import gmsh
12from fiqus.utils.Utils import GmshUtils, FilesAndFolders
13from fiqus.data.RegionsModelFiQuS import RegionsModel
14import fiqus.data.DataFiQuSConductor as geom
15from fiqus.geom_generators.GeometryConductorAC_Strand import TwistedStrand
17from fiqus.pro_assemblers.ProAssembler import ASS_PRO
19logger = logging.getLogger(__name__)
21class Solve:
22 def __init__(self, fdm, GetDP_path, geometry_folder, mesh_folder, verbose=True):
23 self.fdm = fdm
24 self.cacdm = fdm.magnet
25 self.GetDP_path = GetDP_path
26 self.solution_folder = os.path.join(os.getcwd())
27 self.magnet_name = fdm.general.magnet_name
28 self.geometry_folder = geometry_folder
29 self.mesh_folder = mesh_folder
30 self.mesh_file = os.path.join(self.mesh_folder, f"{self.magnet_name}.msh")
31 self.pro_file = os.path.join(self.solution_folder, f"{self.magnet_name}.pro")
32 self.regions_file = os.path.join(mesh_folder, f"{self.magnet_name}.regions")
34 self.verbose = verbose
35 self.gu = GmshUtils(self.solution_folder, self.verbose)
36 self.gu.initialize()
38 self.ass_pro = ASS_PRO(os.path.join(self.solution_folder, self.magnet_name))
39 self.regions_model = FilesAndFolders.read_data_from_yaml(self.regions_file, RegionsModel)
40 self.material_properties_model = None
42 self.ed = {} # excitation dictionary
44 gmsh.option.setNumber("General.Terminal", verbose)
46 def read_excitation(self, inputs_folder_path):
47 """
48 Function for reading a CSV file for the 'from_file' excitation case.
50 :param inputs_folder_path: The full path to the folder with input files.
51 :type inputs_folder_path: str
52 """
53 if self.cacdm.solve.source_parameters.source_type == 'piecewise' and self.cacdm.solve.source_parameters.piecewise.source_csv_file:
54 input_file = os.path.join(inputs_folder_path, self.cacdm.solve.source_parameters.piecewise.source_csv_file)
55 print(f'Using excitation from file: {input_file}')
56 df = pd.read_csv(input_file, delimiter=',', engine='python')
57 excitation_time = df['time'].to_numpy(dtype='float').tolist()
58 self.ed['time'] = excitation_time
59 excitation_value = df['value'].to_numpy(dtype='float').tolist()
60 self.ed['value'] = excitation_value
62 def get_solution_parameters_from_yaml(self, inputs_folder_path):
63 """
64 Function for reading material properties from the geometry YAML file.
66 This reads the 'solution' section of the YAML file and stores it in the solution folder.
67 This could also be a place to change the material properties in the future.
69 :param inputs_folder_path: The full path to the folder with input files.
70 :type inputs_folder_path: str
71 """
72 if self.cacdm.geometry.io_settings.load.load_from_yaml:
73 #load the geometry class from the pkl file
74 geom_save_file = os.path.join(self.geometry_folder, f'{self.magnet_name}.pkl')
75 with open(geom_save_file, "rb") as geom_save_file:
76 geometry_class: TwistedStrand = pickle.load(geom_save_file)
78 input_yaml_file = os.path.join(inputs_folder_path, self.cacdm.geometry.io_settings.load.filename)
79 Conductor_dm = FilesAndFolders.read_data_from_yaml(input_yaml_file, geom.Conductor)
80 solution_parameters = Conductor_dm.Solution
82 # The geometry YAML file lists surfaces to exclude from the TI problem by their IDs. Here we convert these IDs to Gmsh physical surface tags.
83 # This is done by comparing the outer boundary points of the surfaces to exclude with the outer boundary points of the matrix partitions.
84 surfaces_excluded_from_TI_tags = []
86 for surface_ID in solution_parameters.Surfaces_excluded_from_TI: # 1) Find the outer boundary points of the surfaces to exclude
87 outer_boundary_points_a = []
88 outer_boundary_curves_a = Conductor_dm.Geometry.Areas[surface_ID].Boundary
89 for curve_ID in outer_boundary_curves_a:
90 curve = Conductor_dm.Geometry.Curves[curve_ID]
91 for point_ID in curve.Points:
92 point = Conductor_dm.Geometry.Points[point_ID]
93 outer_boundary_points_a.append(tuple(point.Coordinates))
95 for matrix_partition in geometry_class.matrix: # 2) Find the outer boundary points of the matrix partitions
96 outer_boundary_points_b = []
97 outer_boundary_curves_b = matrix_partition.boundary_curves
98 if len(outer_boundary_curves_b) == len(outer_boundary_curves_a): # If the number of boundary curves is different, the surfaces are not the same
99 for curve in outer_boundary_curves_b:
100 for point in curve.points:
101 outer_boundary_points_b.append(tuple(point.pos))
103 if np.allclose(sorted(outer_boundary_points_a), sorted(outer_boundary_points_b)): # If the outer boundary points are the same, the surfaces are the same
104 surfaces_excluded_from_TI_tags.append(matrix_partition.physical_surface_tag) # 3) Add the physical surface tag to the list of surfaces to exclude
105 break
107 solution_parameters.Surfaces_excluded_from_TI = surfaces_excluded_from_TI_tags # Replace the surface IDs with the physical surface tags
109 FilesAndFolders.write_data_to_yaml(os.path.join(self.solution_folder, "MaterialProperties.yaml"), solution_parameters.dict())
110 self.material_properties_model = solution_parameters
115 def assemble_pro(self):
116 print("Assembling .pro file")
117 self.ass_pro.assemble_combined_pro(template = self.cacdm.solve.pro_template, rm = self.regions_model, dm = self.fdm, ed=self.ed, mp=self.material_properties_model)
119 def run_getdp(self, solve = True, postOperation = True, gui = False):
121 command = ["-v2", "-verbose", "3"]
122 if solve:
123 command += ["-solve", "MagDyn"]
124 if self.cacdm.solve.formulation_parameters.dynamic_correction:
125 command += ["-pos", "MagDyn", "MagDyn_dynCorr"]
126 else:
127 command += ["-pos", "MagDyn"]
130 startTime = timeit.default_timer()
131 getdpProcess = subprocess.Popen([self.GetDP_path, self.pro_file, "-msh", self.mesh_file] + command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
133 with getdpProcess.stdout:
134 for line in iter(getdpProcess.stdout.readline, b""):
135 line = line.decode("utf-8").rstrip()
136 line = line.split("\r")[-1]
137 if not "Test" in line:
138 if line.startswith("Info"):
139 parsedLine = re.sub(r"Info\s+:\s+", "", line)
140 logger.info(parsedLine)
141 elif line.startswith("Warning"):
142 parsedLine = re.sub(r"Warning\s+:\s+", "", line)
143 logger.warning(parsedLine)
144 elif line.startswith("Error"):
145 parsedLine = re.sub(r"Error\s+:\s+", "", line)
146 logger.error(parsedLine)
147 logger.error("Solving CAC failed.")
148 # raise Exception(parsedLine)
149 elif re.match("##", line):
150 logger.critical(line)
151 else:
152 logger.info(line)
154 simulation_time = timeit.default_timer()-startTime
155 # Save simulation time:
156 if solve:
157 logger.info(f"Solving CAC_1 has finished in {round(simulation_time, 3)} seconds.")
158 with open(os.path.join(self.solution_folder, 'test_temporary', 'simulation_time.txt'), 'w') as file:
159 file.write(str(simulation_time))
162 if gui and ((postOperation and not solve) or (solve and postOperation and self.cacdm.postproc.generate_pos_files)):
163 # gmsh.option.setNumber("Geometry.Volumes", 1)
164 # gmsh.option.setNumber("Geometry.Surfaces", 1)
165 # gmsh.option.setNumber("Geometry.Curves", 1)
166 # gmsh.option.setNumber("Geometry.Points", 0)
167 posFiles = [
168 fileName
169 for fileName in os.listdir(self.solution_folder)
170 if fileName.endswith(".pos")
171 ]
172 for posFile in posFiles:
173 gmsh.open(os.path.join(self.solution_folder, posFile))
174 self.gu.launch_interactive_GUI()
175 else:
176 gmsh.clear()
177 gmsh.finalize()
179 def cleanup(self):
180 """
181 This funtion is used to remove .msh, .pre and .res files from the solution folder, as they may be large and not needed.
182 """
183 magnet_name = self.fdm.general.magnet_name
184 cleanup = self.cacdm.postproc.cleanup
186 if cleanup.remove_res_file:
187 res_file_path = os.path.join(self.solution_folder, f"{magnet_name}.res")
188 if os.path.exists(res_file_path):
189 os.remove(res_file_path)
190 logger.info(f"Removed {magnet_name}.res")
192 if cleanup.remove_pre_file:
193 pre_file_path = os.path.join(self.solution_folder, f"{magnet_name}.pre")
194 if os.path.exists(pre_file_path):
195 os.remove(pre_file_path)
196 logger.info(f"Removed {magnet_name}.pre")
198 if cleanup.remove_msh_file:
199 msh_file_path = os.path.join(self.mesh_folder, f"{magnet_name}.msh")
200 if os.path.exists(msh_file_path):
201 os.remove(msh_file_path)
202 logger.info(f"Removed {magnet_name}.msh")