Coverage for fiqus/getdp_runners/RunGetdpConductorAC_Strand.py: 53%

219 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-11-03 01:32 +0000

1import timeit 

2import logging 

3import os 

4import subprocess 

5import re 

6import pandas as pd 

7import pickle 

8import numpy as np 

9import gmsh 

10 

11from fiqus.utils.Utils import GmshUtils, FilesAndFolders 

12from fiqus.data.RegionsModelFiQuS import RegionsModel 

13import fiqus.data.DataFiQuSConductor as geom 

14from fiqus.geom_generators.GeometryConductorAC_Strand import TwistedStrand 

15 

16from fiqus.pro_assemblers.ProAssembler import ASS_PRO 

17 

18logger = logging.getLogger('FiQuS') 

19 

20 

21class Solve: 

22 def __init__(self, fdm, GetDP_path, geometry_folder, mesh_folder, verbose=True): 

23 self.fdm = fdm 

24 self.cacdm = fdm.magnet 

25 self.GetDP_path = GetDP_path 

26 self.solution_folder = os.path.join(os.getcwd()) 

27 self.magnet_name = fdm.general.magnet_name 

28 self.geometry_folder = geometry_folder 

29 self.mesh_folder = mesh_folder 

30 self.mesh_file = os.path.join(self.mesh_folder, f"{self.magnet_name}.msh") 

31 self.pro_file = os.path.join(self.solution_folder, f"{self.magnet_name}.pro") 

32 self.regions_file = os.path.join(mesh_folder, f"{self.magnet_name}.regions") 

33 

34 self.verbose = verbose 

35 self.gu = GmshUtils(self.solution_folder, self.verbose) 

36 self.gu.initialize(verbosity_Gmsh=fdm.run.verbosity_Gmsh) 

37 

38 self.ass_pro = ASS_PRO(os.path.join(self.solution_folder, self.magnet_name)) 

39 self.regions_model = FilesAndFolders.read_data_from_yaml(self.regions_file, RegionsModel) 

40 self.material_properties_model = None 

41 

42 self.ed = {} # excitation dictionary 

43 

44 gmsh.option.setNumber("General.Terminal", verbose) 

45 

46 def read_excitation(self, inputs_folder_path): 

47 """ 

48 Function for reading a CSV file for the 'from_file' excitation case. 

49 

50 :param inputs_folder_path: The full path to the folder with input files. 

51 :type inputs_folder_path: str 

52 """ 

53 if self.cacdm.solve.source_parameters.source_type == 'piecewise' and self.cacdm.solve.source_parameters.piecewise.source_csv_file: 

54 input_file = os.path.join(inputs_folder_path, self.cacdm.solve.source_parameters.piecewise.source_csv_file) 

55 print(f'Using excitation from file: {input_file}') 

56 df = pd.read_csv(input_file, delimiter=',', engine='python') 

57 excitation_time = df['time'].to_numpy(dtype='float').tolist() 

58 self.ed['time'] = excitation_time 

59 excitation_value = df['value'].to_numpy(dtype='float').tolist() 

60 self.ed['value'] = excitation_value 

61 

62 def get_solution_parameters_from_yaml(self, inputs_folder_path): 

63 """ 

64 Function for reading material properties from the geometry YAML file. 

65 

66 This reads the 'solution' section of the YAML file and stores it in the solution folder. 

67 This could also be a place to change the material properties in the future. 

68 

69 :param inputs_folder_path: The full path to the folder with input files. 

70 :type inputs_folder_path: str 

71 """ 

72 # load the geometry class from the .pkl file 

73 geom_save_file = os.path.join(self.mesh_folder, f'{self.magnet_name}.pkl') 

74 with open(geom_save_file, "rb") as geom_save_file: 

75 geometry_class: TwistedStrand = pickle.load(geom_save_file) 

76 

77 if self.cacdm.geometry.io_settings.load.load_from_yaml: 

78 # If the geometry is loaded from a YAML file, we need to read the solution parameters (material properties and surfaces to exclude from IP-problem) from the YAML file. 

79 input_yaml_file = os.path.join(inputs_folder_path, self.cacdm.geometry.io_settings.load.filename) 

80 Conductor_dm = FilesAndFolders.read_data_from_yaml(input_yaml_file, geom.Conductor) 

81 solution_parameters = Conductor_dm.Solution 

82 

83 # The geometry YAML file lists surfaces to exclude from the TI problem by their IDs. Here we convert these IDs to Gmsh physical surface tags. 

84 # This is done by comparing the outer boundary points of the surfaces to exclude with the outer boundary points of the matrix partitions. 

85 surfaces_excluded_from_TI_tags = [] 

86 

87 for surface_ID in solution_parameters.Surfaces_excluded_from_TI: # 1) Find the outer boundary points of the surfaces to exclude 

88 outer_boundary_points_a = [] 

89 outer_boundary_curves_a = Conductor_dm.Geometry.Areas[surface_ID].Boundary 

90 for curve_ID in outer_boundary_curves_a: 

91 curve = Conductor_dm.Geometry.Curves[curve_ID] 

92 for point_ID in curve.Points: 

93 point = Conductor_dm.Geometry.Points[point_ID] 

94 outer_boundary_points_a.append(tuple(point.Coordinates)) 

95 

96 for matrix_partition in geometry_class.matrix: # 2) Find the outer boundary points of the matrix partitions 

97 outer_boundary_points_b = [] 

98 outer_boundary_curves_b = matrix_partition.boundary_curves 

99 if len(outer_boundary_curves_b) == len(outer_boundary_curves_a): # If the number of boundary curves is different, the surfaces are not the same 

100 for curve in outer_boundary_curves_b: 

101 for point in curve.points: 

102 outer_boundary_points_b.append(tuple(point.pos)) 

103 

104 if np.allclose(sorted(outer_boundary_points_a), sorted(outer_boundary_points_b)): # If the outer boundary points are the same, the surfaces are the same 

105 surfaces_excluded_from_TI_tags.append(matrix_partition.physical_surface_tag) # 3) Add the physical surface tag to the list of surfaces to exclude 

106 break 

107 

108 solution_parameters.Surfaces_excluded_from_TI = surfaces_excluded_from_TI_tags # Replace the surface IDs with the physical surface tags 

109 

110 else: 

111 # If the geometry is not loaded from a YAML file, we initialize the solution parameters with an empty model, in which we may store the resistances of the diffusion barriers. 

112 solution_parameters = geom.SolutionParameters() 

113 

114 if self.cacdm.solve.diffusion_barriers.enable: 

115 # If the diffusion barriers are enabled, we either read the resistances of the barriers from the geometry YAML file or calculate them based on the material properties specified in the input YAML file. 

116 if self.cacdm.geometry.io_settings.load.load_from_yaml and self.cacdm.solve.diffusion_barriers.load_data_from_yaml: 

117 # If the diffusion barriers are enabled and the geometry YAML file provides information about the barriers, we can read the resistances from the YAML file. 

118 

119 # If the file provides information about the diffusion barriers we determine the resistance associated with the barrier from: Material, Circumferences, Thicknesses and periodicity length, ell. 

120 # The code below loops trough all surfaces provided in the geometry file and determines which filaments each surface corresponds to. 

121 # The resistances of the barriers are then calculated and stored in the solution parameters data structure which is accessed by the .pro template. 

122 # The resistances are sorted by the physical surface tag of the surface to ensure that the order is consistent with the order of the surfaces in the .regions file. 

123 

124 # 1) Loop through all the areas in the geometry file 

125 filament_barrier_resistances = [] 

126 filament_barrier_areas = [] 

127 for area_ID, area in Conductor_dm.Geometry.Areas.items(): 

128 # 2) If the area contains a BoundaryThickness, we continue 

129 # print(f'Area ID: {area_ID}, area: {area}') 

130 if area.BoundaryThickness: 

131 # 3) We find the physical surface tag of the area by comparing the outer boundary points of the area with the outer boundary points of all surfaces 

132 # surfaces = set(geometry_class.matrix + sum(geometry_class.filaments, [])) # All surfaces in the geometry 

133 surfaces = set(sum(geometry_class.filaments, [])) # All surfaces in the geometry (only filaments are currently considered for diffusion barriers) 

134 outer_boundary_points_a = [] 

135 outer_boundary_curves_a = area.Boundary 

136 for curve_ID in outer_boundary_curves_a: 

137 curve = Conductor_dm.Geometry.Curves[curve_ID] 

138 for point_ID in curve.Points: 

139 point = Conductor_dm.Geometry.Points[point_ID] 

140 outer_boundary_points_a.append(tuple(point.Coordinates)) 

141 

142 for surface in surfaces: 

143 outer_boundary_points_b = [] 

144 outer_boundary_curves_b = surface.boundary_curves 

145 if len(outer_boundary_curves_b) == len(outer_boundary_curves_a): 

146 for curve in outer_boundary_curves_b: 

147 for point in curve.points: 

148 outer_boundary_points_b.append(tuple(point.pos)) 

149 

150 if np.allclose(sorted(outer_boundary_points_a), sorted(outer_boundary_points_b)): 

151 # 4) If the outer boundary points are the same we have found the surface corresponding to the area 

152 # We calculate the resistance of the barrier and add it to the list of filament resistances 

153 if self.cacdm.solve.formulation_parameters.two_ell_periodicity: 

154 correctionFactor = 0.827 

155 ell = 2 * correctionFactor * self.fdm.conductors[self.cacdm.solve.conductor_name].strand.fil_twist_pitch / 6 

156 else: 

157 correctionFactor = 0.9549 

158 ell = correctionFactor * self.fdm.conductors[self.cacdm.solve.conductor_name].strand.fil_twist_pitch / 6 

159 # If the .yaml contains information about the material of the diffusion barrier, use the given material property for this material... 

160 if area.BoundaryMaterial: 

161 material = Conductor_dm.Solution.Materials[area.BoundaryMaterial] 

162 if material.Resistivity: 

163 resistivity = float(material.Resistivity) 

164 else: 

165 resistivity = self.cacdm.solve.diffusion_barriers.resistivity 

166 # ... otherwise, use the single (unique) value given in the model .yaml 

167 else: 

168 resistivity = self.cacdm.solve.diffusion_barriers.resistivity 

169 circumference = surface.get_circumference() 

170 thickness = area.BoundaryThickness 

171 

172 resistance = float(resistivity * thickness / (ell * circumference)) # R = rho * L / A. A is chosen as the area of the surface of the filament over the periodicity length 

173 filament_barrier_resistances.append((surface.physical_surface_tag, resistance)) # Add the resistance to the list of filament resistances (along with the physical surface tag) 

174 filament_barrier_areas.append(float(thickness * circumference)) 

175 

176 surfaces = surfaces - {surface} # Remove the surface from the set of surfaces to speed up the search 

177 break 

178 

179 filament_barrier_resistances.sort(key=lambda x: x[0]) # Sort the resistances by the physical surface tags to ensure that the order is consistent with the order of the surfaces in the .regions file 

180 

181 solution_parameters.DiffusionBarriers.FilamentResistances = [resistance for _, resistance in filament_barrier_resistances] 

182 solution_parameters.DiffusionBarriers.DiffusionBarrierAreas = filament_barrier_areas 

183 

184 else: 

185 # If the diffusion barriers are enabled but should not be read from any geometry YAML file, we need to calculate the resistances of the barriers according to the material properties specified in the YAML file. 

186 # In this case we will also write a MaterialProperties.yaml file to the solution folder, but only with the resistances of the barriers. 

187 # Note that we do this for each filament individually, as the resistances of the barriers may differ between filaments if their circumferences are different. 

188 # The steps are the following: 

189 # 1) First we get the circumference of each filament. 

190 # 2) Then we calculate the resistance of the barrier for each filament, using the resistivity of the material, the thickness of the barrier, and the periodicity length, ell. 

191 # 3) We then write the resistances to the MaterialProperties.yaml file. 

192 

193 if self.cacdm.solve.formulation_parameters.two_ell_periodicity: 

194 correctionFactor = 0.827 

195 ell = 2 * correctionFactor * self.fdm.conductors[self.cacdm.solve.conductor_name].strand.fil_twist_pitch / 6 

196 else: 

197 correctionFactor = 0.9549 

198 ell = correctionFactor * self.fdm.conductors[self.cacdm.solve.conductor_name].strand.fil_twist_pitch / 6 

199 barrier_resistivity = self.cacdm.solve.diffusion_barriers.resistivity 

200 barrier_thickness = self.cacdm.solve.diffusion_barriers.thickness 

201 

202 filament_barrier_resistances = [] # List to store the resistances of the barriers 

203 filament_barrier_areas = [] # List to store the surface areas of the barriers 

204 for filament in sum(geometry_class.filaments, []): # Loop through all filaments 

205 circumference = filament.get_circumference() # The circumference of the filament 

206 resistance = float(barrier_resistivity * barrier_thickness / (ell * circumference)) # R = rho * L / A. A is determined as the area of the surface of the filament over the periodicity length 

207 filament_barrier_resistances.append(resistance) # Add the resistance to the list of resistances 

208 filament_barrier_areas.append(float(barrier_thickness * circumference)) # Add the surface area to the list of resistances 

209 

210 solution_parameters.DiffusionBarriers.FilamentResistances = filament_barrier_resistances # Add the resistances to the solution parameters 

211 solution_parameters.DiffusionBarriers.DiffusionBarrierAreas = filament_barrier_areas # Add the surface areas to the solution parameters 

212 

213 if self.cacdm.solve.global_diffusion_barrier.enable: 

214 # 1) Retrieve the tag of the correct matrix partition, it should be the only one with filaments inside. 

215 # Here, we just take the partition with more than one internal boundary: it should be the correct one. 

216 for matrix_partition in geometry_class.matrix: 

217 # print(matrix_partition.inner_curve_loop_tags) 

218 if len(matrix_partition.inner_curve_loop_tags) > 1: 

219 tag = matrix_partition.physical_boundary_tag 

220 surface_tag = matrix_partition.physical_surface_tag 

221 # print(tag) 

222 break 

223 solution_parameters.GlobalDiffusionBarrier.RegionTag = tag 

224 solution_parameters.GlobalDiffusionBarrier.InternalRegionTag = surface_tag 

225 # 2) Compute the contact resistivity based on either the information in the .yaml input for geometry, or the data in the .yaml input for the main model 

226 if self.cacdm.geometry.io_settings.load.load_from_yaml and self.cacdm.solve.global_diffusion_barrier.load_data_from_yaml: 

227 # Find the corresponding surface from the .yaml input for geometry (the one with many inner surfaces) 

228 for area_ID, area in Conductor_dm.Geometry.Areas.items(): 

229 if len(area.InnerBoundaries) > 1: # Same as above, we take the surface area which has more than one internal boundary, whose boundary should be the global diffusion barrier 

230 thickness = area.BoundaryThickness 

231 material = Conductor_dm.Solution.Materials[area.BoundaryMaterial] 

232 if material.Resistivity: 

233 resistivity = float(material.Resistivity) 

234 else: 

235 resistivity = self.cacdm.solve.global_diffusion_barrier.resistivity 

236 solution_parameters.GlobalDiffusionBarrier.ContactResistivity = resistivity * thickness 

237 else: 

238 solution_parameters.GlobalDiffusionBarrier.ContactResistivity = self.cacdm.solve.global_diffusion_barrier.resistivity * self.cacdm.solve.global_diffusion_barrier.thickness 

239 

240 if self.cacdm.geometry.io_settings.load.load_from_yaml or self.cacdm.solve.diffusion_barriers.enable or self.cacdm.solve.global_diffusion_barrier.enable: 

241 # If the geometry is loaded from a YAML file or the diffusion barriers are enabled, we need to write these solution parameters to a data-structure in the solution folder, which can be read by the template. 

242 FilesAndFolders.write_data_to_yaml(os.path.join(self.solution_folder, "MaterialProperties.yaml"), solution_parameters.model_dump()) 

243 self.material_properties_model = solution_parameters 

244 

245 if self.cacdm.geometry.type == 'periodic_square' and not (self.cacdm.solve.source_parameters.sine.current_amplitude == 0.0): 

246 raise ValueError( 

247 f"FiQuS does not support periodic_square geometry type with non zero current amplitude!" 

248 ) 

249 

250 def assemble_pro(self): 

251 print("Assembling .pro file") 

252 self.ass_pro.assemble_combined_pro(template=self.cacdm.solve.pro_template, rm=self.regions_model, dm=self.fdm, ed=self.ed, mp=self.material_properties_model) 

253 

254 def run_getdp(self, solve=True, postOperation=True, gui=False): 

255 

256 command = ["-v2", "-verbose", str(self.fdm.run.verbosity_GetDP), "-mat_mumps_icntl_14", "100"] 

257 if solve: 

258 command += ["-solve", "MagDyn"] 

259 if self.cacdm.solve.formulation_parameters.formulation == "CATI" and self.cacdm.solve.formulation_parameters.dynamic_correction: 

260 command += ["-pos", "MagDyn", "MagDyn_dynCorr"] 

261 else: 

262 command += ["-pos", "MagDyn"] 

263 

264 if self.cacdm.solve.general_parameters.noOfMPITasks: 

265 mpi_prefix = ["mpiexec", "-np", str(self.cacdm.solve.general_parameters.noOfMPITasks)] 

266 else: 

267 mpi_prefix = [] 

268 

269 startTime = timeit.default_timer() 

270 

271 getdpProcess = subprocess.Popen(mpi_prefix + [self.GetDP_path, self.pro_file, "-msh", self.mesh_file] + command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 

272 

273 with getdpProcess.stdout: 

274 for line in iter(getdpProcess.stdout.readline, b""): 

275 line = line.decode("utf-8").rstrip() 

276 line = line.split("\r")[-1] 

277 if not "Test" in line: 

278 if line.startswith("Info"): 

279 parsedLine = re.sub(r"Info\s+:\s+", "", line) 

280 logger.info(parsedLine) 

281 elif line.startswith("Warning"): 

282 parsedLine = re.sub(r"Warning\s+:\s+", "", line) 

283 logger.warning(parsedLine) 

284 elif line.startswith("Error"): 

285 parsedLine = re.sub(r"Error\s+:\s+", "", line) 

286 logger.error(parsedLine) 

287 logger.error("Solving CAC failed.") 

288 # raise Exception(parsedLine) 

289 elif re.match("##", line): 

290 logger.critical(line) 

291 else: 

292 logger.info(line) 

293 

294 simulation_time = timeit.default_timer() - startTime 

295 # Save simulation time: 

296 if solve: 

297 logger.info(f"Solving CAC_1 has finished in {round(simulation_time, 3)} seconds.") 

298 with open(os.path.join(self.solution_folder, 'text_output', 'simulation_time.txt'), 'w') as file: 

299 file.write(str(simulation_time)) 

300 

301 if gui and ((postOperation and not solve) or (solve and postOperation and self.cacdm.postproc.pos_files.quantities is not [])): 

302 # gmsh.option.setNumber("Geometry.Volumes", 1) 

303 # gmsh.option.setNumber("Geometry.Surfaces", 1) 

304 # gmsh.option.setNumber("Geometry.Curves", 1) 

305 # gmsh.option.setNumber("Geometry.Points", 0) 

306 posFiles = [ 

307 fileName 

308 for fileName in os.listdir(self.solution_folder) 

309 if fileName.endswith(".pos") 

310 ] 

311 for posFile in posFiles: 

312 gmsh.open(os.path.join(self.solution_folder, posFile)) 

313 self.gu.launch_interactive_GUI() 

314 else: 

315 if gmsh.isInitialized(): 

316 gmsh.clear() 

317 gmsh.finalize() 

318 

319 def cleanup(self): 

320 """ 

321 This function is used to remove .msh, .pre and .res files from the solution folder, as they may be large and not needed. 

322 """ 

323 magnet_name = self.fdm.general.magnet_name 

324 cleanup = self.cacdm.postproc.cleanup 

325 

326 if cleanup.remove_res_file: 

327 res_file_path = os.path.join(self.solution_folder, f"{magnet_name}.res") 

328 if os.path.exists(res_file_path): 

329 os.remove(res_file_path) 

330 logger.info(f"Removed {magnet_name}.res") 

331 

332 if cleanup.remove_pre_file: 

333 pre_file_path = os.path.join(self.solution_folder, f"{magnet_name}.pre") 

334 if os.path.exists(pre_file_path): 

335 os.remove(pre_file_path) 

336 logger.info(f"Removed {magnet_name}.pre") 

337 

338 if cleanup.remove_msh_file: 

339 msh_file_path = os.path.join(self.mesh_folder, f"{magnet_name}.msh") 

340 if os.path.exists(msh_file_path): 

341 os.remove(msh_file_path) 

342 logger.info(f"Removed {magnet_name}.msh")