Coverage for fiqus/getdp_runners/RunGetdpConductorAC_Strand.py: 53%
219 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-11-03 01:32 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-11-03 01:32 +0000
1import timeit
2import logging
3import os
4import subprocess
5import re
6import pandas as pd
7import pickle
8import numpy as np
9import gmsh
11from fiqus.utils.Utils import GmshUtils, FilesAndFolders
12from fiqus.data.RegionsModelFiQuS import RegionsModel
13import fiqus.data.DataFiQuSConductor as geom
14from fiqus.geom_generators.GeometryConductorAC_Strand import TwistedStrand
16from fiqus.pro_assemblers.ProAssembler import ASS_PRO
18logger = logging.getLogger('FiQuS')
21class Solve:
22 def __init__(self, fdm, GetDP_path, geometry_folder, mesh_folder, verbose=True):
23 self.fdm = fdm
24 self.cacdm = fdm.magnet
25 self.GetDP_path = GetDP_path
26 self.solution_folder = os.path.join(os.getcwd())
27 self.magnet_name = fdm.general.magnet_name
28 self.geometry_folder = geometry_folder
29 self.mesh_folder = mesh_folder
30 self.mesh_file = os.path.join(self.mesh_folder, f"{self.magnet_name}.msh")
31 self.pro_file = os.path.join(self.solution_folder, f"{self.magnet_name}.pro")
32 self.regions_file = os.path.join(mesh_folder, f"{self.magnet_name}.regions")
34 self.verbose = verbose
35 self.gu = GmshUtils(self.solution_folder, self.verbose)
36 self.gu.initialize(verbosity_Gmsh=fdm.run.verbosity_Gmsh)
38 self.ass_pro = ASS_PRO(os.path.join(self.solution_folder, self.magnet_name))
39 self.regions_model = FilesAndFolders.read_data_from_yaml(self.regions_file, RegionsModel)
40 self.material_properties_model = None
42 self.ed = {} # excitation dictionary
44 gmsh.option.setNumber("General.Terminal", verbose)
46 def read_excitation(self, inputs_folder_path):
47 """
48 Function for reading a CSV file for the 'from_file' excitation case.
50 :param inputs_folder_path: The full path to the folder with input files.
51 :type inputs_folder_path: str
52 """
53 if self.cacdm.solve.source_parameters.source_type == 'piecewise' and self.cacdm.solve.source_parameters.piecewise.source_csv_file:
54 input_file = os.path.join(inputs_folder_path, self.cacdm.solve.source_parameters.piecewise.source_csv_file)
55 print(f'Using excitation from file: {input_file}')
56 df = pd.read_csv(input_file, delimiter=',', engine='python')
57 excitation_time = df['time'].to_numpy(dtype='float').tolist()
58 self.ed['time'] = excitation_time
59 excitation_value = df['value'].to_numpy(dtype='float').tolist()
60 self.ed['value'] = excitation_value
62 def get_solution_parameters_from_yaml(self, inputs_folder_path):
63 """
64 Function for reading material properties from the geometry YAML file.
66 This reads the 'solution' section of the YAML file and stores it in the solution folder.
67 This could also be a place to change the material properties in the future.
69 :param inputs_folder_path: The full path to the folder with input files.
70 :type inputs_folder_path: str
71 """
72 # load the geometry class from the .pkl file
73 geom_save_file = os.path.join(self.mesh_folder, f'{self.magnet_name}.pkl')
74 with open(geom_save_file, "rb") as geom_save_file:
75 geometry_class: TwistedStrand = pickle.load(geom_save_file)
77 if self.cacdm.geometry.io_settings.load.load_from_yaml:
78 # If the geometry is loaded from a YAML file, we need to read the solution parameters (material properties and surfaces to exclude from IP-problem) from the YAML file.
79 input_yaml_file = os.path.join(inputs_folder_path, self.cacdm.geometry.io_settings.load.filename)
80 Conductor_dm = FilesAndFolders.read_data_from_yaml(input_yaml_file, geom.Conductor)
81 solution_parameters = Conductor_dm.Solution
83 # The geometry YAML file lists surfaces to exclude from the TI problem by their IDs. Here we convert these IDs to Gmsh physical surface tags.
84 # This is done by comparing the outer boundary points of the surfaces to exclude with the outer boundary points of the matrix partitions.
85 surfaces_excluded_from_TI_tags = []
87 for surface_ID in solution_parameters.Surfaces_excluded_from_TI: # 1) Find the outer boundary points of the surfaces to exclude
88 outer_boundary_points_a = []
89 outer_boundary_curves_a = Conductor_dm.Geometry.Areas[surface_ID].Boundary
90 for curve_ID in outer_boundary_curves_a:
91 curve = Conductor_dm.Geometry.Curves[curve_ID]
92 for point_ID in curve.Points:
93 point = Conductor_dm.Geometry.Points[point_ID]
94 outer_boundary_points_a.append(tuple(point.Coordinates))
96 for matrix_partition in geometry_class.matrix: # 2) Find the outer boundary points of the matrix partitions
97 outer_boundary_points_b = []
98 outer_boundary_curves_b = matrix_partition.boundary_curves
99 if len(outer_boundary_curves_b) == len(outer_boundary_curves_a): # If the number of boundary curves is different, the surfaces are not the same
100 for curve in outer_boundary_curves_b:
101 for point in curve.points:
102 outer_boundary_points_b.append(tuple(point.pos))
104 if np.allclose(sorted(outer_boundary_points_a), sorted(outer_boundary_points_b)): # If the outer boundary points are the same, the surfaces are the same
105 surfaces_excluded_from_TI_tags.append(matrix_partition.physical_surface_tag) # 3) Add the physical surface tag to the list of surfaces to exclude
106 break
108 solution_parameters.Surfaces_excluded_from_TI = surfaces_excluded_from_TI_tags # Replace the surface IDs with the physical surface tags
110 else:
111 # If the geometry is not loaded from a YAML file, we initialize the solution parameters with an empty model, in which we may store the resistances of the diffusion barriers.
112 solution_parameters = geom.SolutionParameters()
114 if self.cacdm.solve.diffusion_barriers.enable:
115 # If the diffusion barriers are enabled, we either read the resistances of the barriers from the geometry YAML file or calculate them based on the material properties specified in the input YAML file.
116 if self.cacdm.geometry.io_settings.load.load_from_yaml and self.cacdm.solve.diffusion_barriers.load_data_from_yaml:
117 # If the diffusion barriers are enabled and the geometry YAML file provides information about the barriers, we can read the resistances from the YAML file.
119 # If the file provides information about the diffusion barriers we determine the resistance associated with the barrier from: Material, Circumferences, Thicknesses and periodicity length, ell.
120 # The code below loops trough all surfaces provided in the geometry file and determines which filaments each surface corresponds to.
121 # The resistances of the barriers are then calculated and stored in the solution parameters data structure which is accessed by the .pro template.
122 # The resistances are sorted by the physical surface tag of the surface to ensure that the order is consistent with the order of the surfaces in the .regions file.
124 # 1) Loop through all the areas in the geometry file
125 filament_barrier_resistances = []
126 filament_barrier_areas = []
127 for area_ID, area in Conductor_dm.Geometry.Areas.items():
128 # 2) If the area contains a BoundaryThickness, we continue
129 # print(f'Area ID: {area_ID}, area: {area}')
130 if area.BoundaryThickness:
131 # 3) We find the physical surface tag of the area by comparing the outer boundary points of the area with the outer boundary points of all surfaces
132 # surfaces = set(geometry_class.matrix + sum(geometry_class.filaments, [])) # All surfaces in the geometry
133 surfaces = set(sum(geometry_class.filaments, [])) # All surfaces in the geometry (only filaments are currently considered for diffusion barriers)
134 outer_boundary_points_a = []
135 outer_boundary_curves_a = area.Boundary
136 for curve_ID in outer_boundary_curves_a:
137 curve = Conductor_dm.Geometry.Curves[curve_ID]
138 for point_ID in curve.Points:
139 point = Conductor_dm.Geometry.Points[point_ID]
140 outer_boundary_points_a.append(tuple(point.Coordinates))
142 for surface in surfaces:
143 outer_boundary_points_b = []
144 outer_boundary_curves_b = surface.boundary_curves
145 if len(outer_boundary_curves_b) == len(outer_boundary_curves_a):
146 for curve in outer_boundary_curves_b:
147 for point in curve.points:
148 outer_boundary_points_b.append(tuple(point.pos))
150 if np.allclose(sorted(outer_boundary_points_a), sorted(outer_boundary_points_b)):
151 # 4) If the outer boundary points are the same we have found the surface corresponding to the area
152 # We calculate the resistance of the barrier and add it to the list of filament resistances
153 if self.cacdm.solve.formulation_parameters.two_ell_periodicity:
154 correctionFactor = 0.827
155 ell = 2 * correctionFactor * self.fdm.conductors[self.cacdm.solve.conductor_name].strand.fil_twist_pitch / 6
156 else:
157 correctionFactor = 0.9549
158 ell = correctionFactor * self.fdm.conductors[self.cacdm.solve.conductor_name].strand.fil_twist_pitch / 6
159 # If the .yaml contains information about the material of the diffusion barrier, use the given material property for this material...
160 if area.BoundaryMaterial:
161 material = Conductor_dm.Solution.Materials[area.BoundaryMaterial]
162 if material.Resistivity:
163 resistivity = float(material.Resistivity)
164 else:
165 resistivity = self.cacdm.solve.diffusion_barriers.resistivity
166 # ... otherwise, use the single (unique) value given in the model .yaml
167 else:
168 resistivity = self.cacdm.solve.diffusion_barriers.resistivity
169 circumference = surface.get_circumference()
170 thickness = area.BoundaryThickness
172 resistance = float(resistivity * thickness / (ell * circumference)) # R = rho * L / A. A is chosen as the area of the surface of the filament over the periodicity length
173 filament_barrier_resistances.append((surface.physical_surface_tag, resistance)) # Add the resistance to the list of filament resistances (along with the physical surface tag)
174 filament_barrier_areas.append(float(thickness * circumference))
176 surfaces = surfaces - {surface} # Remove the surface from the set of surfaces to speed up the search
177 break
179 filament_barrier_resistances.sort(key=lambda x: x[0]) # Sort the resistances by the physical surface tags to ensure that the order is consistent with the order of the surfaces in the .regions file
181 solution_parameters.DiffusionBarriers.FilamentResistances = [resistance for _, resistance in filament_barrier_resistances]
182 solution_parameters.DiffusionBarriers.DiffusionBarrierAreas = filament_barrier_areas
184 else:
185 # If the diffusion barriers are enabled but should not be read from any geometry YAML file, we need to calculate the resistances of the barriers according to the material properties specified in the YAML file.
186 # In this case we will also write a MaterialProperties.yaml file to the solution folder, but only with the resistances of the barriers.
187 # Note that we do this for each filament individually, as the resistances of the barriers may differ between filaments if their circumferences are different.
188 # The steps are the following:
189 # 1) First we get the circumference of each filament.
190 # 2) Then we calculate the resistance of the barrier for each filament, using the resistivity of the material, the thickness of the barrier, and the periodicity length, ell.
191 # 3) We then write the resistances to the MaterialProperties.yaml file.
193 if self.cacdm.solve.formulation_parameters.two_ell_periodicity:
194 correctionFactor = 0.827
195 ell = 2 * correctionFactor * self.fdm.conductors[self.cacdm.solve.conductor_name].strand.fil_twist_pitch / 6
196 else:
197 correctionFactor = 0.9549
198 ell = correctionFactor * self.fdm.conductors[self.cacdm.solve.conductor_name].strand.fil_twist_pitch / 6
199 barrier_resistivity = self.cacdm.solve.diffusion_barriers.resistivity
200 barrier_thickness = self.cacdm.solve.diffusion_barriers.thickness
202 filament_barrier_resistances = [] # List to store the resistances of the barriers
203 filament_barrier_areas = [] # List to store the surface areas of the barriers
204 for filament in sum(geometry_class.filaments, []): # Loop through all filaments
205 circumference = filament.get_circumference() # The circumference of the filament
206 resistance = float(barrier_resistivity * barrier_thickness / (ell * circumference)) # R = rho * L / A. A is determined as the area of the surface of the filament over the periodicity length
207 filament_barrier_resistances.append(resistance) # Add the resistance to the list of resistances
208 filament_barrier_areas.append(float(barrier_thickness * circumference)) # Add the surface area to the list of resistances
210 solution_parameters.DiffusionBarriers.FilamentResistances = filament_barrier_resistances # Add the resistances to the solution parameters
211 solution_parameters.DiffusionBarriers.DiffusionBarrierAreas = filament_barrier_areas # Add the surface areas to the solution parameters
213 if self.cacdm.solve.global_diffusion_barrier.enable:
214 # 1) Retrieve the tag of the correct matrix partition, it should be the only one with filaments inside.
215 # Here, we just take the partition with more than one internal boundary: it should be the correct one.
216 for matrix_partition in geometry_class.matrix:
217 # print(matrix_partition.inner_curve_loop_tags)
218 if len(matrix_partition.inner_curve_loop_tags) > 1:
219 tag = matrix_partition.physical_boundary_tag
220 surface_tag = matrix_partition.physical_surface_tag
221 # print(tag)
222 break
223 solution_parameters.GlobalDiffusionBarrier.RegionTag = tag
224 solution_parameters.GlobalDiffusionBarrier.InternalRegionTag = surface_tag
225 # 2) Compute the contact resistivity based on either the information in the .yaml input for geometry, or the data in the .yaml input for the main model
226 if self.cacdm.geometry.io_settings.load.load_from_yaml and self.cacdm.solve.global_diffusion_barrier.load_data_from_yaml:
227 # Find the corresponding surface from the .yaml input for geometry (the one with many inner surfaces)
228 for area_ID, area in Conductor_dm.Geometry.Areas.items():
229 if len(area.InnerBoundaries) > 1: # Same as above, we take the surface area which has more than one internal boundary, whose boundary should be the global diffusion barrier
230 thickness = area.BoundaryThickness
231 material = Conductor_dm.Solution.Materials[area.BoundaryMaterial]
232 if material.Resistivity:
233 resistivity = float(material.Resistivity)
234 else:
235 resistivity = self.cacdm.solve.global_diffusion_barrier.resistivity
236 solution_parameters.GlobalDiffusionBarrier.ContactResistivity = resistivity * thickness
237 else:
238 solution_parameters.GlobalDiffusionBarrier.ContactResistivity = self.cacdm.solve.global_diffusion_barrier.resistivity * self.cacdm.solve.global_diffusion_barrier.thickness
240 if self.cacdm.geometry.io_settings.load.load_from_yaml or self.cacdm.solve.diffusion_barriers.enable or self.cacdm.solve.global_diffusion_barrier.enable:
241 # If the geometry is loaded from a YAML file or the diffusion barriers are enabled, we need to write these solution parameters to a data-structure in the solution folder, which can be read by the template.
242 FilesAndFolders.write_data_to_yaml(os.path.join(self.solution_folder, "MaterialProperties.yaml"), solution_parameters.model_dump())
243 self.material_properties_model = solution_parameters
245 if self.cacdm.geometry.type == 'periodic_square' and not (self.cacdm.solve.source_parameters.sine.current_amplitude == 0.0):
246 raise ValueError(
247 f"FiQuS does not support periodic_square geometry type with non zero current amplitude!"
248 )
250 def assemble_pro(self):
251 print("Assembling .pro file")
252 self.ass_pro.assemble_combined_pro(template=self.cacdm.solve.pro_template, rm=self.regions_model, dm=self.fdm, ed=self.ed, mp=self.material_properties_model)
254 def run_getdp(self, solve=True, postOperation=True, gui=False):
256 command = ["-v2", "-verbose", str(self.fdm.run.verbosity_GetDP), "-mat_mumps_icntl_14", "100"]
257 if solve:
258 command += ["-solve", "MagDyn"]
259 if self.cacdm.solve.formulation_parameters.formulation == "CATI" and self.cacdm.solve.formulation_parameters.dynamic_correction:
260 command += ["-pos", "MagDyn", "MagDyn_dynCorr"]
261 else:
262 command += ["-pos", "MagDyn"]
264 if self.cacdm.solve.general_parameters.noOfMPITasks:
265 mpi_prefix = ["mpiexec", "-np", str(self.cacdm.solve.general_parameters.noOfMPITasks)]
266 else:
267 mpi_prefix = []
269 startTime = timeit.default_timer()
271 getdpProcess = subprocess.Popen(mpi_prefix + [self.GetDP_path, self.pro_file, "-msh", self.mesh_file] + command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
273 with getdpProcess.stdout:
274 for line in iter(getdpProcess.stdout.readline, b""):
275 line = line.decode("utf-8").rstrip()
276 line = line.split("\r")[-1]
277 if not "Test" in line:
278 if line.startswith("Info"):
279 parsedLine = re.sub(r"Info\s+:\s+", "", line)
280 logger.info(parsedLine)
281 elif line.startswith("Warning"):
282 parsedLine = re.sub(r"Warning\s+:\s+", "", line)
283 logger.warning(parsedLine)
284 elif line.startswith("Error"):
285 parsedLine = re.sub(r"Error\s+:\s+", "", line)
286 logger.error(parsedLine)
287 logger.error("Solving CAC failed.")
288 # raise Exception(parsedLine)
289 elif re.match("##", line):
290 logger.critical(line)
291 else:
292 logger.info(line)
294 simulation_time = timeit.default_timer() - startTime
295 # Save simulation time:
296 if solve:
297 logger.info(f"Solving CAC_1 has finished in {round(simulation_time, 3)} seconds.")
298 with open(os.path.join(self.solution_folder, 'text_output', 'simulation_time.txt'), 'w') as file:
299 file.write(str(simulation_time))
301 if gui and ((postOperation and not solve) or (solve and postOperation and self.cacdm.postproc.pos_files.quantities is not [])):
302 # gmsh.option.setNumber("Geometry.Volumes", 1)
303 # gmsh.option.setNumber("Geometry.Surfaces", 1)
304 # gmsh.option.setNumber("Geometry.Curves", 1)
305 # gmsh.option.setNumber("Geometry.Points", 0)
306 posFiles = [
307 fileName
308 for fileName in os.listdir(self.solution_folder)
309 if fileName.endswith(".pos")
310 ]
311 for posFile in posFiles:
312 gmsh.open(os.path.join(self.solution_folder, posFile))
313 self.gu.launch_interactive_GUI()
314 else:
315 if gmsh.isInitialized():
316 gmsh.clear()
317 gmsh.finalize()
319 def cleanup(self):
320 """
321 This function is used to remove .msh, .pre and .res files from the solution folder, as they may be large and not needed.
322 """
323 magnet_name = self.fdm.general.magnet_name
324 cleanup = self.cacdm.postproc.cleanup
326 if cleanup.remove_res_file:
327 res_file_path = os.path.join(self.solution_folder, f"{magnet_name}.res")
328 if os.path.exists(res_file_path):
329 os.remove(res_file_path)
330 logger.info(f"Removed {magnet_name}.res")
332 if cleanup.remove_pre_file:
333 pre_file_path = os.path.join(self.solution_folder, f"{magnet_name}.pre")
334 if os.path.exists(pre_file_path):
335 os.remove(pre_file_path)
336 logger.info(f"Removed {magnet_name}.pre")
338 if cleanup.remove_msh_file:
339 msh_file_path = os.path.join(self.mesh_folder, f"{magnet_name}.msh")
340 if os.path.exists(msh_file_path):
341 os.remove(msh_file_path)
342 logger.info(f"Removed {magnet_name}.msh")