Coverage for fiqus/getdp_runners/RunGetdpConductorAC_Strand.py: 80%

134 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2024-09-01 03:24 +0200

1import timeit 

2import logging 

3import os 

4import subprocess 

5import re 

6import pandas as pd 

7import pickle 

8import numpy as np 

9 

10import gmsh 

11 

12from fiqus.utils.Utils import GmshUtils, FilesAndFolders 

13from fiqus.data.RegionsModelFiQuS import RegionsModel 

14import fiqus.data.DataFiQuSConductor as geom 

15from fiqus.geom_generators.GeometryConductorAC_Strand import TwistedStrand 

16 

17from fiqus.pro_assemblers.ProAssembler import ASS_PRO 

18 

19logger = logging.getLogger(__name__) 

20 

21class Solve: 

22 def __init__(self, fdm, GetDP_path, geometry_folder, mesh_folder, verbose=True): 

23 self.fdm = fdm 

24 self.cacdm = fdm.magnet 

25 self.GetDP_path = GetDP_path 

26 self.solution_folder = os.path.join(os.getcwd()) 

27 self.magnet_name = fdm.general.magnet_name 

28 self.geometry_folder = geometry_folder 

29 self.mesh_folder = mesh_folder 

30 self.mesh_file = os.path.join(self.mesh_folder, f"{self.magnet_name}.msh") 

31 self.pro_file = os.path.join(self.solution_folder, f"{self.magnet_name}.pro") 

32 self.regions_file = os.path.join(mesh_folder, f"{self.magnet_name}.regions") 

33 

34 self.verbose = verbose 

35 self.gu = GmshUtils(self.solution_folder, self.verbose) 

36 self.gu.initialize() 

37 

38 self.ass_pro = ASS_PRO(os.path.join(self.solution_folder, self.magnet_name)) 

39 self.regions_model = FilesAndFolders.read_data_from_yaml(self.regions_file, RegionsModel) 

40 self.material_properties_model = None 

41 

42 self.ed = {} # excitation dictionary 

43 

44 gmsh.option.setNumber("General.Terminal", verbose) 

45 

46 def read_excitation(self, inputs_folder_path): 

47 """ 

48 Function for reading a CSV file for the 'from_file' excitation case. 

49 

50 :param inputs_folder_path: The full path to the folder with input files. 

51 :type inputs_folder_path: str 

52 """ 

53 if self.cacdm.solve.source_parameters.source_type == 'piecewise' and self.cacdm.solve.source_parameters.piecewise.source_csv_file: 

54 input_file = os.path.join(inputs_folder_path, self.cacdm.solve.source_parameters.piecewise.source_csv_file) 

55 print(f'Using excitation from file: {input_file}') 

56 df = pd.read_csv(input_file, delimiter=',', engine='python') 

57 excitation_time = df['time'].to_numpy(dtype='float').tolist() 

58 self.ed['time'] = excitation_time 

59 excitation_value = df['value'].to_numpy(dtype='float').tolist() 

60 self.ed['value'] = excitation_value 

61 

62 def get_solution_parameters_from_yaml(self, inputs_folder_path): 

63 """ 

64 Function for reading material properties from the geometry YAML file. 

65 

66 This reads the 'solution' section of the YAML file and stores it in the solution folder. 

67 This could also be a place to change the material properties in the future. 

68 

69 :param inputs_folder_path: The full path to the folder with input files. 

70 :type inputs_folder_path: str 

71 """ 

72 if self.cacdm.geometry.io_settings.load.load_from_yaml: 

73 #load the geometry class from the pkl file 

74 geom_save_file = os.path.join(self.geometry_folder, f'{self.magnet_name}.pkl') 

75 with open(geom_save_file, "rb") as geom_save_file: 

76 geometry_class: TwistedStrand = pickle.load(geom_save_file) 

77 

78 input_yaml_file = os.path.join(inputs_folder_path, self.cacdm.geometry.io_settings.load.filename) 

79 Conductor_dm = FilesAndFolders.read_data_from_yaml(input_yaml_file, geom.Conductor) 

80 solution_parameters = Conductor_dm.Solution 

81 

82 # The geometry YAML file lists surfaces to exclude from the TI problem by their IDs. Here we convert these IDs to Gmsh physical surface tags. 

83 # This is done by comparing the outer boundary points of the surfaces to exclude with the outer boundary points of the matrix partitions. 

84 surfaces_excluded_from_TI_tags = [] 

85 

86 for surface_ID in solution_parameters.Surfaces_excluded_from_TI: # 1) Find the outer boundary points of the surfaces to exclude 

87 outer_boundary_points_a = [] 

88 outer_boundary_curves_a = Conductor_dm.Geometry.Areas[surface_ID].Boundary 

89 for curve_ID in outer_boundary_curves_a: 

90 curve = Conductor_dm.Geometry.Curves[curve_ID] 

91 for point_ID in curve.Points: 

92 point = Conductor_dm.Geometry.Points[point_ID] 

93 outer_boundary_points_a.append(tuple(point.Coordinates)) 

94 

95 for matrix_partition in geometry_class.matrix: # 2) Find the outer boundary points of the matrix partitions 

96 outer_boundary_points_b = [] 

97 outer_boundary_curves_b = matrix_partition.boundary_curves 

98 if len(outer_boundary_curves_b) == len(outer_boundary_curves_a): # If the number of boundary curves is different, the surfaces are not the same 

99 for curve in outer_boundary_curves_b: 

100 for point in curve.points: 

101 outer_boundary_points_b.append(tuple(point.pos)) 

102 

103 if np.allclose(sorted(outer_boundary_points_a), sorted(outer_boundary_points_b)): # If the outer boundary points are the same, the surfaces are the same 

104 surfaces_excluded_from_TI_tags.append(matrix_partition.physical_surface_tag) # 3) Add the physical surface tag to the list of surfaces to exclude 

105 break 

106 

107 solution_parameters.Surfaces_excluded_from_TI = surfaces_excluded_from_TI_tags # Replace the surface IDs with the physical surface tags 

108 

109 FilesAndFolders.write_data_to_yaml(os.path.join(self.solution_folder, "MaterialProperties.yaml"), solution_parameters.dict()) 

110 self.material_properties_model = solution_parameters 

111 

112 

113 

114 

115 def assemble_pro(self): 

116 print("Assembling .pro file") 

117 self.ass_pro.assemble_combined_pro(template = self.cacdm.solve.pro_template, rm = self.regions_model, dm = self.fdm, ed=self.ed, mp=self.material_properties_model) 

118 

119 def run_getdp(self, solve = True, postOperation = True, gui = False): 

120 

121 command = ["-v2", "-verbose", "3"] 

122 if solve: 

123 command += ["-solve", "MagDyn"] 

124 if self.cacdm.solve.formulation_parameters.dynamic_correction: 

125 command += ["-pos", "MagDyn", "MagDyn_dynCorr"] 

126 else: 

127 command += ["-pos", "MagDyn"] 

128 

129 

130 startTime = timeit.default_timer() 

131 getdpProcess = subprocess.Popen([self.GetDP_path, self.pro_file, "-msh", self.mesh_file] + command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 

132 

133 with getdpProcess.stdout: 

134 for line in iter(getdpProcess.stdout.readline, b""): 

135 line = line.decode("utf-8").rstrip() 

136 line = line.split("\r")[-1] 

137 if not "Test" in line: 

138 if line.startswith("Info"): 

139 parsedLine = re.sub(r"Info\s+:\s+", "", line) 

140 logger.info(parsedLine) 

141 elif line.startswith("Warning"): 

142 parsedLine = re.sub(r"Warning\s+:\s+", "", line) 

143 logger.warning(parsedLine) 

144 elif line.startswith("Error"): 

145 parsedLine = re.sub(r"Error\s+:\s+", "", line) 

146 logger.error(parsedLine) 

147 logger.error("Solving CAC failed.") 

148 # raise Exception(parsedLine) 

149 elif re.match("##", line): 

150 logger.critical(line) 

151 else: 

152 logger.info(line) 

153 

154 simulation_time = timeit.default_timer()-startTime 

155 # Save simulation time: 

156 if solve: 

157 logger.info(f"Solving CAC_1 has finished in {round(simulation_time, 3)} seconds.") 

158 with open(os.path.join(self.solution_folder, 'test_temporary', 'simulation_time.txt'), 'w') as file: 

159 file.write(str(simulation_time)) 

160 

161 

162 if gui and ((postOperation and not solve) or (solve and postOperation and self.cacdm.postproc.generate_pos_files)): 

163 # gmsh.option.setNumber("Geometry.Volumes", 1) 

164 # gmsh.option.setNumber("Geometry.Surfaces", 1) 

165 # gmsh.option.setNumber("Geometry.Curves", 1) 

166 # gmsh.option.setNumber("Geometry.Points", 0) 

167 posFiles = [ 

168 fileName 

169 for fileName in os.listdir(self.solution_folder) 

170 if fileName.endswith(".pos") 

171 ] 

172 for posFile in posFiles: 

173 gmsh.open(os.path.join(self.solution_folder, posFile)) 

174 self.gu.launch_interactive_GUI() 

175 else: 

176 gmsh.clear() 

177 gmsh.finalize() 

178 

179 def cleanup(self): 

180 """ 

181 This funtion is used to remove .msh, .pre and .res files from the solution folder, as they may be large and not needed. 

182 """ 

183 magnet_name = self.fdm.general.magnet_name 

184 cleanup = self.cacdm.postproc.cleanup 

185 

186 if cleanup.remove_res_file: 

187 res_file_path = os.path.join(self.solution_folder, f"{magnet_name}.res") 

188 if os.path.exists(res_file_path): 

189 os.remove(res_file_path) 

190 logger.info(f"Removed {magnet_name}.res") 

191 

192 if cleanup.remove_pre_file: 

193 pre_file_path = os.path.join(self.solution_folder, f"{magnet_name}.pre") 

194 if os.path.exists(pre_file_path): 

195 os.remove(pre_file_path) 

196 logger.info(f"Removed {magnet_name}.pre") 

197 

198 if cleanup.remove_msh_file: 

199 msh_file_path = os.path.join(self.mesh_folder, f"{magnet_name}.msh") 

200 if os.path.exists(msh_file_path): 

201 os.remove(msh_file_path) 

202 logger.info(f"Removed {magnet_name}.msh")