Coverage for fiqus/post_processors/PostProcessConductorAC.py: 9%

616 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-11-03 01:32 +0000

1import os, re 

2import numpy as np 

3import pandas as pd 

4import matplotlib.pyplot as plt 

5 

6try: 

7 import steammaterials # TESTING PURPOSES 

8 

9 steammaterials_flag = False 

10except: 

11 steammaterials_flag = True 

12 

13from scipy import constants, optimize, integrate, signal, interpolate 

14from fiqus.data.DataFiQuSConductorAC_Strand import CACStrandGeometry, CACStrandMesh, CACStrandSolve 

15from fiqus.plotters.PlotPythonConductorAC import BatchPlotPythonConductorAC, PlotPythonConductorAC, PDFreport 

16from fiqus.utils.Utils import FilesAndFolders 

17import ruamel.yaml 

18 

19 

20class SimulationData: 

21 """ 

22 Class used to store and manage data from a single simulation. 

23 

24 This class is responsible for loading and organizing the data from a single simulation. 

25 It stores the data in various attributes and provides methods for retrieving and processing the data. 

26 

27 """ 

28 

29 def __init__(self, model_data_output_path, geometry_name, mesh_name, solution_name, fdm) -> None: 

30 self.model_data_output_path = model_data_output_path # This is the path to the folder where the model output data is stored (e.g. geometries) 

31 self.geometry_name = geometry_name # Name of the geometry folder 

32 self.mesh_name = mesh_name # Name of the mesh folder 

33 self.solution_name = solution_name # Name of the solution folder 

34 self.conductor_name = fdm.magnet.solve.conductor_name 

35 

36 # Organize the folders: 

37 self.geometry_folder = os.path.join(self.model_data_output_path, geometry_name) # Path to the geometry folder 

38 self.mesh_folder = os.path.join(self.geometry_folder, mesh_name) # Path to the mesh folder 

39 self.solution_folder = os.path.join(self.mesh_folder, solution_name) # Path to the solution folder 

40 

41 # hacky solution for now ... pass conductor data through fdm because its not stored in geom yaml 

42 self.conductor = fdm.conductors[self.conductor_name] 

43 # Store the YAML input-files in a data model, fdm: 

44 self.geometry, self.mesh, self.solve = self.retrieve_fiqusDataModel() 

45 # Store losses, simulation time and check if the simulation crashed: 

46 temp_file_path = os.path.join(self.solution_folder, 'text_output') 

47 loss_file = [f for f in os.listdir(temp_file_path) if f.startswith('power') and f.endswith('.txt')][0] 

48 self.power_columns = ['Time', 'FilamentLoss', 'CouplingLoss', 'EddyLoss', 'TotalLoss', 'CouplingLoss_dyn', 'TotalLoss_dyn'] # Only in the case dynamic correction is used, must be changed later 

49 self.power = pd.read_csv(os.path.join(self.solution_folder, 'text_output', loss_file), sep=' ', names=self.power_columns) # Store instantaneous losses as pandas dataframe 

50 # Add a row of zeros at the beginning of the dataframe to account for the initial condition: 

51 self.power = pd.concat([pd.DataFrame({col: 0 for col in self.power_columns}, index=[0]), self.power]).reset_index(drop=True) 

52 if self.solve.formulation_parameters.formulation == "CATI" and self.solve.diffusion_barriers: # Check if dm.solve has the entry for diffusion barriers 

53 if self.solve.diffusion_barriers.enable: # Check if diffusion barriers are enabled 

54 # Load and add the power dissipated in the diffusion barriers (if they are enabled): 

55 self.power_diffusion_barriers = np.loadtxt(os.path.join(temp_file_path, 'power_diffusion_barrier.txt'), skiprows=1) 

56 self.power_diffusion_barriers = np.sum(self.power_diffusion_barriers[:, 1:], axis=1) 

57 self.power['CouplingLoss'] += self.power_diffusion_barriers 

58 self.power['CouplingLoss_dyn'] += self.power_diffusion_barriers 

59 self.power['TotalLoss'] += self.power_diffusion_barriers 

60 self.power['TotalLoss_dyn'] += self.power_diffusion_barriers 

61 self.crash = True if 'crash_report.txt' in os.listdir(temp_file_path) else False 

62 # Store simulation time: 

63 try: 

64 with open(os.path.join(self.solution_folder, 'text_output', 'simulation_time.txt'), 'r') as f: 

65 self.simulation_time = float(f.readline().strip()) 

66 except: 

67 self.simulation_time = None # If the simulation time file does not exist, the simulation has not finished running. 

68 

69 # Store the rest of the post-processing data: 

70 self.time = self.power['Time'] 

71 self.instantaneous_temperature = self.load_standard_data(os.path.join(temp_file_path, 'temperature.txt'), 1, add_initial_zero=True) 

72 self.temperature = self.load_standard_data(os.path.join(temp_file_path, 'temperature.txt'), 2, add_initial_zero=True) 

73 self.I_transport = self.load_standard_data(os.path.join(temp_file_path, 'I_transport.txt'), 1) 

74 self.V_transport = self.load_standard_data(os.path.join(temp_file_path, 'V_transport.txt'), 1) 

75 self.V_transport_unitlen = self.load_standard_data(os.path.join(temp_file_path, 'V_transport_unitlen.txt'), 1) 

76 self.hs_val = self.load_standard_data(os.path.join(temp_file_path, 'hs_val.txt'), 1) 

77 self.magn_fil = self.load_standard_data(os.path.join(temp_file_path, 'magn_fil.txt'), [1, 2, 3]) 

78 self.magn_matrix = self.load_standard_data(os.path.join(temp_file_path, 'magn_matrix.txt'), [1, 2, 3]) 

79 self.I = self.load_standard_data(os.path.join(temp_file_path, 'I.txt'), 1, len(self.time)) 

80 self.V = self.load_standard_data(os.path.join(temp_file_path, 'V.txt'), 1, len(self.time)) 

81 self.I_integral = self.load_standard_data(os.path.join(temp_file_path, 'I_integral.txt'), 1, len(self.time)) 

82 self.I_abs_integral = self.load_standard_data(os.path.join(temp_file_path, 'I_abs_integral.txt'), 1, len(self.time)) 

83 self.magnetic_energy_internal = self.load_standard_data(os.path.join(temp_file_path, 'magnetic_energy_internal.txt'), 1) 

84 self.magnetic_energy_external = self.load_standard_data(os.path.join(temp_file_path, 'magnetic_energy_external.txt'), 1) 

85 self.flux_external = self.load_standard_data(os.path.join(temp_file_path, 'flux_external.txt'), 1) 

86 self.flux_internal = self.load_standard_data(os.path.join(temp_file_path, 'flux_internal.txt'), 1) 

87 self.selffield_dfluxdt = self.load_standard_data(os.path.join(temp_file_path, 'selffield_dfluxdt.txt'), 1) 

88 self.Ip = self.load_standard_data(os.path.join(temp_file_path, 'Ip.txt'), 1, len(self.time)) 

89 self.Vp = self.load_standard_data(os.path.join(temp_file_path, 'Vp.txt'), 1, len(self.time)) 

90 # try to init - otherwise NONE 

91 self.f, self.B_background, self.I_amp, self.T_background = self.get_source_params(verbose=True) 

92 # self.data_impedance = self.compute_data_impedance(verbose=False) 

93 # self.analytic_RL = self.compute_analytic_RL() 

94 # Integrate the losses to obtain the cumulative power and the total power per cycle: 

95 self.cumulative_power, self.total_power_per_cycle = self.integrate_power() # Store cumulative power and total cumulative power per cycle 

96 

97 def export2txt(self, data_names, data_columns, filename): 

98 """ This function can be used to export one or more arrays of simulation data in one txt file (stored in solution folder).""" 

99 np_data = np.array(data_columns).T 

100 np.savetxt(os.path.join(self.solution_folder, str(filename) + '.txt'), np_data, header=" ".join(data_names), comments="") 

101 

102 def rad_to_deg(self, radiants): 

103 return radiants / np.pi * 180 

104 

105 def get_source_params(self, verbose=False): 

106 source = self.solve.source_parameters.source_type 

107 T = self.solve.general_parameters.temperature 

108 if source == "rotating": 

109 f = self.solve.source_parameters.rotating.frequency 

110 B = self.solve.source_parameters.rotating.field_magnitude 

111 I = 0 

112 elif source == "sine": 

113 f = self.solve.source_parameters.sine.frequency 

114 I = self.solve.source_parameters.sine.current_amplitude 

115 # background field amplitude 

116 B = self.solve.source_parameters.sine.superimposed_DC.field_magnitude 

117 if verbose: print("Loading Simulation: f=" + "{:.2f}".format(f) + " [Hz]; B_back=" + "{:.2f}".format(B) + " [T]; I=" + "{:.2f}".format(I) + " [A]") 

118 else: 

119 f = None 

120 B = 0 

121 I = max(abs(self.I_transport)) 

122 

123 return f, B, I, T 

124 

125 def first_rise(self, current=False): 

126 """ returns the time, current and flux on the first penetration interval until the first current peak/saddle is reached """ 

127 first_extrema_idx = signal.argrelextrema(self.I_transport if current else self.flux_internal, np.greater_equal)[0][0] # first rel extrema 

128 return self.time[0:first_extrema_idx], self.I_transport[0:first_extrema_idx], self.flux_internal[0:first_extrema_idx] 

129 

130 def halfcycle(self, current=False): 

131 """ returns the time, current and flux on the first halfcycle of transport current or flux """ 

132 data = self.I_transport if current else self.flux_internal 

133 zero_crossings = np.where(np.diff(np.sign(data[1:])))[0] + 1 

134 return self.time[0:zero_crossings[0]], self.I_transport[0:zero_crossings[0]], self.flux_internal[0:zero_crossings[0]] 

135 

136 def load_standard_data(self, file_path, columns, reshape=None, add_initial_zero=False): 

137 """ 

138 There are many output .txt-files with similar format. This function loads the data from one of these files and returns it as a numpy array. 

139 If the file does not exist, None is returned without raising an error. 

140 """ 

141 try: 

142 data = np.loadtxt(file_path, comments='#', usecols=columns) 

143 if reshape: 

144 data = data.reshape(-1, reshape).T 

145 except IOError: 

146 return None 

147 

148 if add_initial_zero: 

149 if len(data.shape) == 1: 

150 data = np.insert(data, 0, 0) 

151 else: 

152 zeros = np.zeros((1, data.shape[1])) 

153 data = np.vstack((zeros, data)) 

154 return data 

155 

156 def retrieve_fiqusDataModel(self): 

157 """ 

158 This function reads the YAML input-files for geometry, mesh and solve and stores them in three dictionaries which are returned. 

159 This function is to be called only once, when the object is created. 

160 """ 

161 geometry_dataModel = FilesAndFolders.read_data_from_yaml(os.path.join(self.geometry_folder, 'geometry.yaml'), CACStrandGeometry) 

162 mesh_dataModel = FilesAndFolders.read_data_from_yaml(os.path.join(self.mesh_folder, 'mesh.yaml'), CACStrandMesh) 

163 # Now that the solve.yaml file contains more than just the solve part, we query only that part here. 

164 # solution_dataModel = FilesAndFolders.read_data_from_yaml(os.path.join(self.solution_folder, 'solve.yaml'), CACStrandSolve) 

165 with open(os.path.join(self.solution_folder, 'solve.yaml'), 'r') as stream: 

166 yaml = ruamel.yaml.YAML(typ='safe', pure=True) 

167 yaml_str = yaml.load(stream) 

168 solution_dataModel = CACStrandSolve(**yaml_str['solve']) 

169 return geometry_dataModel, mesh_dataModel, solution_dataModel 

170 

171 def integrate_power(self): 

172 """ 

173 This function integrates the instantaneous power over time to obtain the cumulative power. 

174 It also calculates the total cumulative power per cycle. 

175 The cumulative power is returned as a pandas dataframe and the total cumulative power per cycle is returned as a dictionary. 

176 """ 

177 find_closest_idx = lambda arr, val: np.abs(arr - val).argmin() 

178 

179 t = np.array(self.power['Time']) 

180 t_final = t[-1] 

181 if self.f: 

182 t_init = find_closest_idx(t, t_final - 1 / (self.f)) 

183 else: 

184 t_init = find_closest_idx(t, t_final) 

185 

186 cumulative_power = pd.DataFrame(columns=self.power_columns) 

187 total_power_per_cycle = {} 

188 

189 cumulative_power['Time'] = self.power["Time"] 

190 for column in self.power_columns[1:]: 

191 cumulative_power[column] = np.insert(integrate.cumulative_trapezoid(self.power[column], t), 0, 0) 

192 total_power_per_cycle[column] = (cumulative_power[column].iloc[-1] - cumulative_power[column].iloc[t_init]) # / (np.pi*matrix_radius**2 * loss_factor) # Why do we divide by pi*matrix_radius**2*loss_factor? 

193 

194 return cumulative_power, total_power_per_cycle 

195 

196 def compute_data_impedance(self, verbose=False): 

197 """ 

198 This function estimates a complex impedance per unit length for the OOP based on sinus fits for the transport voltage and current. 

199 If no sinus fit is possible it returns NONE as impedance estimate. 

200 """ 

201 if not self.f: 

202 return None 

203 

204 I_mean = np.mean(self.I_transport) 

205 V_mean = np.mean(self.V_transport_unitlen) 

206 I_peak = max(abs(self.I_transport)) 

207 V_peak = max(abs(self.V_transport_unitlen)) 

208 # ignore values before this index - tweaking parameter depending on source ramping 

209 N_h = round(len(self.I_transport) / 4) 

210 # sine fit for current and voltage 

211 optimize_func_current = lambda x: x[0] * np.sin(x[1] * self.time[N_h:] + x[2]) + x[3] - self.I_transport[N_h:] 

212 optimize_func_voltage = lambda x: x[0] * np.sin(x[1] * self.time[N_h:] + x[2]) + x[3] - self.V_transport_unitlen[N_h:] 

213 current_estimate = optimize.leastsq(optimize_func_current, [I_peak - I_mean, self.f, 0, I_mean])[0] # est_amp, est_freq, est_phase, est_mean 

214 voltage_estimate = optimize.leastsq(optimize_func_voltage, [V_peak - V_mean, self.f, 0, V_mean])[0] 

215 delta_phi = abs(voltage_estimate[2] - current_estimate[2]) % (2 * np.pi) 

216 Z_magnitude = V_peak / I_peak # both sinus - no formfactor 

217 L_inductance = abs(np.sin(delta_phi) * Z_magnitude) / (2 * np.pi * self.f) 

218 R_resistance = abs(np.cos(delta_phi) * Z_magnitude) 

219 Z_impedance = R_resistance + 2 * np.pi * self.f * L_inductance * 1j 

220 if verbose: 

221 print("estimated simulation Impedance: " + str(Z_magnitude) + " Ohm") 

222 print(" -> L: " + str(L_inductance) + " Henry") 

223 print(" -> R: " + str(R_resistance) + " Ohm") 

224 print(" -> Phase: " + str(np.angle(Z_impedance, deg=True)) + " Deg") 

225 return Z_impedance 

226 

227 def compute_analytic_RL(self): 

228 """ This function calculates the analytic impedance parameters R & L per unit length based on reduced telegraphers equations with C, G = 0 """ 

229 

230 rho_CU = 1.81e-10 

231 coil_radius = self.geometry.coil_radius 

232 air_radius = self.geometry.air_radius 

233 strand_radius = self.conductor.strand.diameter / 2 

234 filament_radius = self.conductor.strand.filament_diameter / 2 

235 

236 correction_factor = 0.9549 # CATI (needs to be corrected) 

237 ell = correction_factor * self.conductor.strand.fil_twist_pitch / 6 # CATI 

238 

239 if self.geometry.type == 'strand_only': 

240 # single filament in open space 

241 inductance_L = constants.mu_0 / (2 * np.pi) * np.log(air_radius / filament_radius) # == coax cable 

242 resistance_R = rho_CU / (np.pi * (self.conductor.strand.diameter / 2) ** 2) * ell if self.solve.general_parameters.superconductor_linear else 1e-12 

243 elif self.geometry.type == 'coil': 

244 # single coil winding closed in infinity 

245 inductance_L = constants.mu_0 / np.pi * np.log((coil_radius * 2 - strand_radius) / strand_radius) # + ell * constants.mu_0 / (4 * np.pi) 

246 resistance_R = 2 * rho_CU / (np.pi * (self.conductor.strand.diameter / 2) ** 2) * ell if self.solve.general_parameters.superconductor_linear else 1e-12 

247 else: 

248 return None 

249 

250 return [resistance_R, inductance_L] 

251 

252 

253class ROHFmodel: 

254 """ Base class for a Reduced Order Hysteretic Flux model. Defining a chain of rate dependent cells for flux and loss reconstruction in a homogenized strand model.""" 

255 

256 def __init__(self, N=None, taus=None, kappas_bar=None, alphas=None, simulation=None, filepath=None, filename=None, spacing_type=None, label=''): 

257 # direct parameter initialization 

258 self.N = N 

259 self.kappas_bar = kappas_bar 

260 self.alphas = alphas 

261 self.taus = taus 

262 # Ambient sueprcon parameters 

263 self.Ic_0 = 2978 # [A] 

264 self.T_0 = 1.9 # [K] 

265 self.Ec_0 = 1e-4 # [V/m] 

266 self.n = 60 

267 # internal variables 

268 self.L0 = constants.mu_0 / (4 * np.pi) # lin. self inductance of cylindric wire 

269 # optional info for batch plots 

270 self.label = label 

271 self.simulation = simulation 

272 # initialization shortcuts 

273 if filepath and filename: 

274 self.import_params(filepath=filepath, filename=filename) # initialize from file 

275 elif simulation: # fit params on the fly 

276 self.fit_data(simulation=simulation, spacing_type=spacing_type, taus=taus, kappas_bar=kappas_bar, alphas=alphas) 

277 

278 def update(self, N, taus, kappas_bar, alphas): 

279 self.N = N 

280 self.taus = taus 

281 self.kappas_bar = kappas_bar 

282 self.alphas = alphas 

283 

284 def params(self): 

285 return np.array([self.alphas, self.kappas_bar, self.taus]).T 

286 

287 def export_params(self, filepath, filename, txt_format=False): 

288 """Exports the reduced order parameters kappa and alpha as txt file. 

289 

290 :param filepath: absolute path to storage location 

291 :param filename: name of the file (without .txt ending) 

292 """ 

293 if txt_format: 

294 header = "# fitted on " + self.simulation.solution_name + "\nalphas kappas taus" if self.simulation else "# imported params \nalphas kappas taus" 

295 np.savetxt(os.path.join(filepath, str(filename) + '.txt'), self.params(), header=header, comments="") 

296 else: 

297 df = pd.DataFrame(self.params()) 

298 df.to_csv(os.path.join(filepath, str(filename) + '.csv'), header=['alphas', 'kappas', 'taus'], index=None, float_format='{:e}'.format) 

299 

300 def import_params(self, filepath, filename, txt_format=False): 

301 """This function reads the reduced order parameters kappa and alphas from a txt file. 

302 

303 :param filepath: absolute path to storage location 

304 :param filename: name of the file (without .txt ending) 

305 """ 

306 if txt_format: 

307 data = np.loadtxt(os.path.join(filepath, filename + ".txt"), delimiter=" ", skiprows=2).T # alphas, kappas, taus 

308 else: 

309 data = np.array(pd.read_csv(os.path.join(filepath, filename + ".csv"), sep=',', header=1).values).T 

310 print(data) 

311 self.simulation = None 

312 self.name = filename 

313 self.update(N=len(data[0]), alphas=data[0], kappas_bar=data[1], taus=data[2]) 

314 

315 def ohmic_loss(self, I, simulation=None): 

316 """ This function calculates the ohmic loss in filaments and matrix of a homogenized conductor. 

317 

318 :param I: transport current 

319 :param simulation: simulation data for conductor information, defaults to None 

320 :return: array of [p_ohm_fil, p_ohm_mat, R_fil, R_mat] for all current steps 

321 """ 

322 

323 if steammaterials_flag or simulation is None: 

324 # print("Assuming zero ohmic loss") 

325 return np.zeros(len(I)), np.zeros(len(I)) 

326 

327 # geometry parameters 

328 surf_strand = np.pi * (simulation.conductor.strand.diameter / 2) ** 2 

329 surf_filaments = simulation.conductor.strand.number_of_filaments * np.pi * (simulation.conductor.strand.filament_diameter / 2) ** 2 

330 surf_matrix = surf_strand - surf_filaments 

331 # matrix resistivity 

332 matpath = os.path.join(os.path.dirname(steammaterials.__file__), 'CFUN') 

333 Tspace = np.vstack((self.T_0, 0, simulation.conductor.strand.RRR)) # T, B, RRR 

334 cu_space = steammaterials.SteamMaterials("CFUN_rhoCu_NIST_v2", Tspace.shape[0], Tspace.shape[1], matpath) 

335 rho_matrix = cu_space.evaluate(Tspace) 

336 R_matrix = rho_matrix / surf_matrix 

337 

338 # current sharing between matrix (R const) and filaments (power law) https://ieeexplore.ieee.org/document/8970335 

339 grid = np.vstack((I / (surf_strand), self.Ec_0 * np.ones(len(I)), self.Ic_0 / (surf_filaments) * np.ones(len(I)), self.n * np.ones(len(I)), rho_matrix * np.ones(len(I)), surf_filaments * np.ones(len(I)), surf_matrix * np.ones(len(I)))) 

340 sharing_space = steammaterials.SteamMaterials("CFUN_HTS_CurrentSharing_homogenized_v1", grid.shape[0], grid.shape[1], matpath) 

341 lambdas = sharing_space.evaluate(grid) 

342 # current split associated ohmic losses 

343 I_filaments = lambdas * I 

344 I_matrix = (np.ones(len(lambdas)) - lambdas) * I 

345 p_ohm_filaments = self.Ec_0 * ((abs(I_filaments)) / (self.Ic_0)) ** (self.n) * abs(I_filaments) 

346 p_ohm_matrix = R_matrix * I_matrix ** 2 

347 

348 return p_ohm_filaments, p_ohm_matrix 

349 

350 def fit_scaling(self, simulation): 

351 """ This method fits the internal kappa_scaling parameter of the ROHFmodel to a given magnetic background field magnitude for NbTi filaments. 

352 

353 :param B_background: magnitude of the magnetic background field in Tesla. 

354 """ 

355 verbose = False 

356 # Scaling params 

357 B_background = simulation.B_background 

358 T_background = simulation.T_background 

359 if verbose: print("ROHF scaling: B=" + str(B_background) + " T=" + str(T_background)) 

360 

361 filament_d = simulation.conductor.strand.filament_diameter 

362 strand_surf = np.pi * (simulation.conductor.strand.diameter / 2) ** 2 

363 filament_surf = simulation.conductor.strand.number_of_filaments * np.pi * (filament_d / 2) ** 2 

364 fill_factor = filament_surf / strand_surf 

365 Jc_0 = self.Ic_0 / filament_surf 

366 

367 if steammaterials_flag or B_background is None: 

368 raise ValueError('cant perform field scaling fit. Need steammaterials and background field amplitude.') 

369 matpath = os.path.join(os.path.dirname(steammaterials.__file__), 'CFUN') 

370 

371 # Determine ambient field with no background and max current density 

372 B_space = np.linspace(0, 14, 5000) 

373 numpy2d_Bspace = np.vstack((self.T_0 * np.ones(B_space.shape), B_space, 1 * np.ones(B_space.shape))) # T, B, A 

374 nbti_space = steammaterials.SteamMaterials("CFUN_IcNbTi_v1", numpy2d_Bspace.shape[0], numpy2d_Bspace.shape[1], matpath) 

375 jc_nbti_space = nbti_space.evaluate(numpy2d_Bspace) 

376 ambient_idx = np.abs(jc_nbti_space - Jc_0).argmin() 

377 B_ambient = B_space[ambient_idx] 

378 if verbose: print(" Ambient field: " + str(B_ambient) + " [T]") 

379 

380 # determine absolute field value including a scaled down version of the strand ambient field 

381 B_abs = B_background + B_ambient 

382 # determine first scaling factor for the magnetic field 

383 jc_nbti1 = steammaterials.SteamMaterials("CFUN_IcNbTi_v1", 3, 1, matpath) 

384 jc_Babs = jc_nbti1.evaluate((T_background, B_abs, 1)) 

385 scaling = jc_Babs / Jc_0 

386 # determine temperature scaling with finer ambient estimate 

387 if T_background != self.T_0: 

388 B_abs = float(B_background + scaling * B_ambient) 

389 jc_nbti2 = steammaterials.SteamMaterials("CFUN_IcNbTi_v1", 3, 1, matpath) 

390 jc_Babs = jc_nbti2.evaluate((T_background, B_abs, 1)) 

391 scaling = jc_Babs / Jc_0 

392 if verbose: print(" Field scaling : " + str(scaling)) 

393 return scaling, fill_factor 

394 

395 def fit_kappas(self, simulation, spacing_type=None, verbose=False): 

396 """ 

397 This method fits the coercivity parameters 'kappas_bar' according to a 'spacing_type' and given number of cells 'self.N'. 

398 The kappas are spread on the virgin rise from zero interval of a given strand simulation and can be seen as list of activation levels for the N cells. 

399 

400 :param simulation: SimulationData object which the ROHF will get fitted to 

401 :param spacing_type: Enforce a specfic spacing of the kappas (i.e. 'linear'), defaults to None 

402 :param verbose: print additional information, defaults to False 

403 :raises ValueError: Unknown options for spacing_type 

404 :return: kappas_bar ndarray of N kappas 

405 """ 

406 _, I_interval, flux_interval = simulation.first_rise() 

407 if spacing_type: 

408 if spacing_type == "exponential": 

409 [a, b], res1 = optimize.curve_fit(lambda x1, a, b: a * np.exp(b * x1), I_interval / max(I_interval), flux_interval / max(flux_interval)) 

410 kappa_spacing = max(I_interval) * np.flip(np.exp(b) - np.exp(b * np.linspace(0, 1, num=self.N + 1))) / (np.exp(b) - 1) 

411 elif spacing_type == "linear": 

412 kappa_spacing = np.linspace(min(I_interval), max(I_interval), self.N + 1) 

413 elif spacing_type == "invlog": 

414 kappa_spacing = np.insert(max(I_interval) * (1.5 - np.geomspace(1, .5, num=self.N + 1)), 0, 0) 

415 elif spacing_type == "log": 

416 kappa_spacing = np.insert(np.geomspace(I_interval[1], max(I_interval), self.N), 0, 0) 

417 else: 

418 raise ValueError('Unknown kappa spacing for S_chain fit') 

419 # get closest matching indices in I_interval as kappas 

420 kappas = I_interval[self.__closest_matches(I_interval, kappa_spacing)] 

421 else: 

422 # calculate kappas based on minimization of linear interpolation error 

423 def max_abs_error(kappas, flux_spline, I_samples): 

424 # computes maximum absolute error between rohf and reference simulation on sample points 

425 rohf_flux = np.interp(I_samples, kappas, flux_spline(kappas)) 

426 return max(abs(rohf_flux - flux_spline(I_samples))) 

427 

428 # interpolate 

429 I_samples = np.linspace(0, max(I_interval), 500) 

430 flux_spline = interpolate.CubicSpline(I_interval, flux_interval, extrapolate=True) 

431 # minimize on interval 

432 cons = (optimize.LinearConstraint([np.concatenate(([1], np.zeros(self.N)))], lb=0, ub=0), 

433 optimize.LinearConstraint([np.concatenate((np.zeros(self.N), [1]))], lb=max(I_interval), ub=max(I_interval))) 

434 bounds = ((0, max(I_interval)),) * (self.N + 1) 

435 kappas = optimize.differential_evolution(max_abs_error, args=(flux_spline, I_samples), x0=np.linspace(0, max(I_interval), self.N + 1), 

436 disp=True, constraints=cons, bounds=bounds, tol=1e-8).x 

437 # kappas = np.insert(kappas, 1, 0.08*max(I_interval)) 

438 if verbose: 

439 plt.figure() 

440 plt.title("kappa distribution") 

441 plt.plot(I_interval, flux_interval, '-b') 

442 plt.plot(kappas, flux_spline(kappas), 'gX') 

443 plt.show() 

444 

445 return kappas[:-1] 

446 

447 def fit_alphas(self, simulation, kappas_bar, dual=False, verbose=False): 

448 """Calculates the cell weights alpha for each kappa by cellwise recursion, fitting the simulation data given through I_interval and flux_interval. 

449 

450 :param simulation: SimulationData object which the ROHF will get fitted to 

451 :param kappas_bar: coercivity currents on virgin I_interval which are used to define N multicell ROHF 

452 :param verbose: display all information, defaults to False 

453 :return: kappas_bar, alphas. List of updated kappas and their matching weights. Kappas only change in case of a dual fitting strategy (dual=True) 

454 """ 

455 _, I_interval, flux_interval = simulation.first_rise(current=True) 

456 

457 flux_spline = interpolate.CubicSpline(I_interval, flux_interval, extrapolate=True) 

458 kappas = np.concatenate((kappas_bar, [I_interval[-1]])) # N + 1 kappas with interval boundary 

459 

460 # classic coercivity-weight linkage (undershoot PCloss) 

461 alphas = np.zeros(len(kappas_bar)) 

462 delta_flux = np.diff(flux_spline(kappas)) 

463 delta_current = np.diff(kappas) 

464 for k in range(0, self.N): 

465 alphas[k] = 0 if delta_current[k] == 0 else abs(delta_flux[k] / (self.L0 * delta_current[k]) - sum(alphas)) 

466 if verbose: print(" optimized alphas: " + str(alphas)) 

467 

468 if False: 

469 # tangential coercivity-weight linkage (overshoot PCloss) 

470 dflux_dI_spline = flux_spline.derivative(nu=1) 

471 fluxes = flux_spline(kappas) 

472 m_slopes = dflux_dI_spline(kappas) 

473 alphas_tan = np.diff(m_slopes) / self.L0 

474 m_slopes[0] = 0 

475 b_offsets = np.zeros(self.N + 1) 

476 kappas_tan = np.zeros(self.N + 1) 

477 for k in range(1, self.N + 1): 

478 b_offsets[k] = fluxes[k] - m_slopes[k] * kappas[k] 

479 kappas_tan[k] = (b_offsets[k] - b_offsets[k - 1]) / (m_slopes[k - 1] - m_slopes[k]) 

480 kappas_bar_tan = kappas_tan[1:] 

481 if True: 

482 plt.figure() 

483 plt.plot(I_interval, flux_interval, label='reference') 

484 plt.scatter(kappas_bar, fluxes[:-1], label='k-a-linkage kappas') 

485 plt.scatter(kappas_bar_tan, np.zeros(len(kappas_bar_tan)), label='new kappas', marker='x') 

486 for k in range(0, self.N + 1): 

487 plt.plot(I_interval, b_offsets[k] + m_slopes[k] * I_interval) 

488 plt.ylim([0, max(flux_interval)]) 

489 plt.legend() 

490 plt.plot() 

491 plt.show() 

492 

493 return kappas_bar, 0.97 * alphas 

494 

495 def fit_data(self, simulation, spacing_type=None, dual=False, taus=None, kappas_bar=None, alphas=None, verbose=False): 

496 """Fits the reduced order model params to a given simulation data by linear interpolation between kappas. 

497 

498 :param current: transport current of the model 

499 :param flux: internal flux of the model 

500 :param spacing_type: kappa distribution type (lin, log, invlog, exponential), defaults to None 

501 :param verbose: display all information, defaults to False 

502 """ 

503 if self.N is None: 

504 if kappas_bar is not None: 

505 self.N = len(kappas_bar) 

506 elif taus is not None: 

507 self.N = len(taus) 

508 elif alphas is not None: 

509 self.N = len(alphas) 

510 else: 

511 raise ValueError(f'Undefined number of cells. Cant fit to data.') 

512 

513 # fit taus 

514 if taus is None: 

515 taus = [1.5e-5] + (self.N - 1) * [2e-4] 

516 # taus = self.N*[0.0] if tau_sweep is None else self.fit_taus(tau_sweep=tau_sweep, verbose=verbose) 

517 

518 # fit kappas 

519 if kappas_bar is None: 

520 kappas_bar = self.fit_kappas(simulation, spacing_type=spacing_type, verbose=verbose) 

521 

522 # fit alphas 

523 if alphas is None: 

524 kappas_bar, alphas = self.fit_alphas(simulation, kappas_bar, verbose=verbose, dual=dual) 

525 

526 # save fitted params 

527 self.simulation = simulation 

528 self.update(len(taus), taus, kappas_bar, alphas) 

529 

530 def __closest_matches(self, dataset, values): 

531 return [min(range(len(dataset)), key=lambda i: abs(dataset[i] - value)) for value in values] 

532 

533 def __flux_multicells(self, Irev): 

534 flux = 0 

535 for k in range(self.N): 

536 flux += self.alphas[k] * self.L0 * Irev[k] 

537 return flux 

538 

539 def __g(self, I, Irev_prev, kappa): 

540 """ 1Dvector/scalar play model for g (=Irev+Ieddy) with coercivity kappas """ 

541 diff = I - Irev_prev 

542 diff_unit = diff / abs(diff) if diff != 0 else 0 

543 if abs(diff) >= kappa or diff * (diff - kappa * diff_unit) > 0: 

544 return I - kappa * diff_unit 

545 else: 

546 return Irev_prev 

547 

548 def compute(self, time, I, sim=None, v=0): 

549 """ Computing flux and losses on a given interval """ 

550 if (self.N or self.alphas[0] or self.kappas_bar[0] or self.taus[0]) is None: 

551 raise ValueError(f'Undefined parameters. Cant compute solution.') 

552 

553 # Background field scaling 

554 kappa_scaling, fill_factor = [1.0, 0.5] if sim is None else self.fit_scaling(sim) 

555 

556 # INPUT PARAMETERS ----------------------------------------------- 

557 nbSteps = len(I) 

558 rel_tol = 1e-5 # Iterations are needed because of the field-dependent parameters. 

559 iter_max = 100 

560 weightsum = np.cumsum(self.alphas) 

561 # RESOLUTION ----------------------------------------------------- 

562 flux = np.zeros((nbSteps,)) 

563 Irev = np.zeros((nbSteps, self.N)) 

564 Iirr = np.zeros((nbSteps, self.N)) 

565 Ieddy = np.zeros((nbSteps, self.N)) 

566 g = np.zeros((nbSteps, self.N)) 

567 iter_tot = 0 

568 # Computation of the associated flux density 

569 for i in range(1, nbSteps): 

570 # Vector hysteresis 

571 flux[i] = flux[i - 1] 

572 dt = time[i] - time[i - 1] 

573 conv_crit = 2 

574 iter = 0 

575 # scalar play model 

576 while conv_crit > 1 and iter < iter_max: 

577 for k in range(self.N): 

578 # update g = Irev + Ieddy with "scalar" play model 

579 diff = I[i] - Irev[i - 1, k] 

580 diff_unit = diff / abs(diff) if diff != 0 else 0 

581 # overcritical 

582 if abs(I[i]) >= kappa_scaling * self.Ic_0: 

583 Irev[i, k] = Irev[i - 1, k] + (1 - fill_factor) * (I[i] - I[i - 1]) / (2 * weightsum[k]) 

584 Ieddy[i, k] = Ieddy[i - 1, k] + self.taus[k] * (Irev[i - 1, k] - Irev[i, k]) / (dt) 

585 Iirr[i, k] = I[i] - Irev[i, k] - Ieddy[i, k] 

586 # undercritical 

587 else: 

588 if abs(diff) >= kappa_scaling * self.kappas_bar[k] or diff * (diff - kappa_scaling * self.kappas_bar[k] * diff_unit) > 0: 

589 g[i, k] = I[i] - kappa_scaling * self.kappas_bar[k] * diff_unit 

590 else: 

591 g[i, k] = Irev[i - 1, k] 

592 # g[i,k] = self.__g(I[i], Irev[i-1,k], self.kappas_bar[k]) 

593 # update irreversible 

594 Iirr[i, k] = I[i] - g[i, k] 

595 # update eddy 

596 Ieddy[i, k] = self.taus[k] / (dt + self.taus[k]) * (g[i, k] - Irev[i - 1, k]) 

597 # calculate reversible 

598 Irev[i, k] = g[i, k] - Ieddy[i, k] 

599 

600 flux_new = self.__flux_multicells(Irev[i, :]) 

601 conv_crit = abs((flux_new - flux[i]) / (flux[i] + 1e-10)) / rel_tol 

602 flux[i] = flux_new 

603 iter += 1 

604 iter_tot += iter 

605 if iter == iter_max: 

606 print(f"Step {i} reached maximum of {iter} iterations.") 

607 if v == 1: 

608 print(f'Finished with {iter_tot / nbSteps} iterations per step on average.') 

609 # POST-PROCESSING ------------------------------------------------ 

610 # Computation of the stored and dissipated power quantities 

611 p_ohm_fil, p_ohm_mat = self.ohmic_loss(I, simulation=sim) 

612 p_rev = 0 

613 p_irr = 0 

614 p_eddy = 0 

615 for k in range(self.N): 

616 dfluxdt = self.L0 * np.concatenate(([0], np.diff(Irev[:, k]) / np.diff(time))) # np.gradient(Irev[:,k], time) 

617 # Instantaneous power. Stored: \sum_k ( alpha_k * Irev_k * dphi_k/dt ). Dissipated: \sum_k ( alpha_k * Iirr_k * dphi_k/dt ) 

618 p_rev += self.alphas[k] * Irev[:, k] * dfluxdt # See Appendix B.2. of J. Dular's thesis (https://orbi.uliege.be/handle/2268/298054) 

619 p_irr += self.alphas[k] * Iirr[:, k] * dfluxdt 

620 p_eddy += self.alphas[k] * Ieddy[:, k] * dfluxdt 

621 

622 # Per cycle loss 

623 p_fil_cycle = None 

624 p_mat_cycle = None 

625 if sim and sim.f: 

626 t_end = np.array(time)[-1] 

627 # Cumulative integration over one cycle 

628 x_init = self.__closest_matches(time, [t_end - 1 / (sim.f)])[0] 

629 cumul_p_fil = integrate.cumulative_trapezoid(p_irr + p_ohm_fil, time, initial=0) 

630 cumul_p_mat = integrate.cumulative_trapezoid(p_eddy + p_ohm_mat, time, initial=0) 

631 p_fil_cycle = (cumul_p_fil[-1] - cumul_p_fil[x_init]) 

632 p_mat_cycle = (cumul_p_mat[-1] - cumul_p_mat[x_init]) 

633 

634 return flux, p_irr + p_ohm_fil, p_eddy + p_ohm_mat, p_fil_cycle, p_mat_cycle 

635 

636 

637class PostProcess: 

638 """ 

639 This class loads and stores the data from a simulation and can apply various postprocessing operations on the data. 

640 The simulation data is saved as a SimulationData object. 

641 """ 

642 

643 def __init__(self, fdm, model_data_output_path) -> None: 

644 self.fdm = fdm 

645 self.model_data_output_path = model_data_output_path 

646 self.geometry_name = f"Geometry_{self.fdm.run.geometry}" 

647 self.mesh_name = f"Mesh_{self.fdm.run.mesh}" 

648 self.solution_name = f"Solution_{self.fdm.run.solution}" 

649 

650 self.simulation_data = SimulationData(model_data_output_path, self.geometry_name, self.mesh_name, self.solution_name, self.fdm) 

651 self.flux_model = ROHFmodel(N=self.fdm.magnet.postproc.plot_flux.rohf_N, 

652 spacing_type=self.fdm.magnet.postproc.plot_flux.rohf_kappa_spacing, 

653 simulation=self.simulation_data, 

654 filename=self.fdm.magnet.postproc.plot_flux.rohf_file, 

655 filepath=os.path.join(self.model_data_output_path, self.geometry_name), 

656 ) if self.fdm.magnet.postproc.plot_flux.rohf else None 

657 self.plotter = PlotPythonConductorAC(fdm, model_data_output_path, self.simulation_data, self.flux_model) 

658 self.report = PDFreport(filepath=model_data_output_path, 

659 title=f'{self.fdm.run.geometry}_{self.fdm.run.mesh}_{self.fdm.run.solution}' if self.fdm.magnet.postproc.generate_report else None) 

660 

661 def rohf_background(self): 

662 """ Test for ROHFmodel background field scaling """ 

663 if steammaterials_flag or self.flux_model is None: 

664 # skip 

665 return 

666 matpath = os.path.join(os.path.dirname(steammaterials.__file__), 'CFUN') 

667 

668 Ic_0 = 2900 

669 B_background = self.simulation_data.B_background 

670 

671 # analytic self-field on strand boundary 

672 CATI_surf = 54 * np.pi * (4.5e-05) ** 2 # 54 filament surface in CATI paper strand 

673 Jc_0 = Ic_0 / CATI_surf 

674 B_space = np.linspace(0, 14, 5000) 

675 # Determine ambient field with no background field and max current density 

676 numpy2d_Bspace = np.vstack((1.9 * np.ones(B_space.shape), B_space, 1 * np.ones(B_space.shape))) # T, B, A 

677 nbti_space = steammaterials.SteamMaterials("CFUN_IcNbTi_v1", numpy2d_Bspace.shape[0], numpy2d_Bspace.shape[1], matpath) 

678 jc_nbti_space = nbti_space.evaluate(numpy2d_Bspace) 

679 B_ambient = B_space[np.abs(jc_nbti_space - Jc_0).argmin()] 

680 print("B_ambient: " + str(B_ambient) + " [T]") 

681 

682 # Determine absolute field value which the strand experiences 

683 B_abs = B_background + (B_ambient / B_background) * B_ambient if B_background > B_ambient else B_background + B_ambient 

684 # material Jc(B) fit 

685 jc_nbti = steammaterials.SteamMaterials("CFUN_IcNbTi_v1", 3, 1, matpath) 

686 jc_Babs = jc_nbti.evaluate((1.9, B_abs, 1))[0] 

687 

688 # PLOT 

689 plt.figure() 

690 plt.plot(B_space, jc_nbti_space, label='CFUN_IcNbTi_v1') 

691 plt.scatter((B_ambient, B_abs), (Jc_0, jc_Babs), label='Jc(B)', ) 

692 plt.title('Nb-Ti') 

693 plt.grid() 

694 plt.legend() 

695 plt.xlabel("Field (T)") 

696 plt.ylabel("jc (A/m2)") 

697 

698 kappa_scaling = jc_Babs / Jc_0 

699 print(" Kappa scaling factor: " + str(kappa_scaling)) 

700 self.flux_model.kappa_scaling = kappa_scaling 

701 plt.show() 

702 

703 def internal_flux(self): 

704 """ Postproc routine for the internal flux and the related ROHF model """ 

705 self.plotter.plot_transport() 

706 self.plotter.plot_flux() 

707 self.plotter.plot_power_loss() 

708 self.plotter.plot_resistivity() 

709 

710 # self.flux_model.export_params(filepath=os.path.join(self.model_data_output_path, self.geometry_name), filename="TESTFILE") 

711 # self.flux_model.import_params(filepath=os.path.join(self.model_data_output_path, self.geometry_name), filename="TESTFILE") 

712 

713 if self.report and self.flux_model: 

714 self.report.add_table('ROHF parameters', self.flux_model.params()) 

715 

716 def instantaneous_loss(self): 

717 print("Total loss: ", self.simulation_data.cumulative_power["TotalLoss"].iloc[-1]) 

718 print("Total filament loss: ", self.simulation_data.cumulative_power["FilamentLoss"].iloc[-1]) 

719 print("Total coupling loss: ", self.simulation_data.cumulative_power["CouplingLoss"].iloc[-1]) 

720 print("Total eddy loss: ", self.simulation_data.cumulative_power["EddyLoss"].iloc[-1]) 

721 plot_options = self.fdm.magnet.postproc.plot_instantaneous_power 

722 self.plotter.plot_instantaneous_power( 

723 show=plot_options.show, 

724 title=plot_options.title, 

725 save_plot=plot_options.save, 

726 save_folder_path=os.path.join(self.model_data_output_path, self.geometry_name, self.mesh_name, self.solution_name), 

727 save_file_name=plot_options.save_file_name, 

728 overwrite=self.fdm.run.overwrite 

729 ) 

730 

731 def plot_impedance(self): 

732 """ Plots the impedance estimate and transport quantities """ 

733 conductor = self.fdm.conductors[self.fdm.magnet.solve.conductor_name] 

734 frequency_range = np.logspace(-2, 6, 1000) 

735 

736 self.simulation_data.plot_transport() 

737 

738 #### VOLTAGE #### 

739 # self.simulation_data.plot_phaseshift_voltage(90) 

740 # self.simulation_data.plot_voltage_decomp() 

741 

742 #### IMPEDANCE #### 

743 self.simulation_data.compare_impedance(frequency_range) 

744 

745 

746class BatchPostProcess: 

747 """ 

748 This class loads and stores data from multiple simulations (+flux models) and performs operations on them 

749 """ 

750 

751 def __init__(self, fdm, lossMap_gridData_folder, inputs_folder_path, outputs_folder_path) -> None: 

752 self.fdm = fdm 

753 self.inputs_folder_path = inputs_folder_path # This is the path to the folder where the input data is stored 

754 self.model_data_output_path = outputs_folder_path # This is the path to the folder where the model output data is stored (e.g. geometries) 

755 self.outputs_folder_path = os.path.join(outputs_folder_path, fdm.magnet.postproc.output_folder) # This is the path to the folder where the postprocessed data is written 

756 self.simulation_collection = self.retrieve_simulation_data() 

757 self.fluxmodel_collection = self.retrieve_fluxmodel_data() 

758 self.plotter = BatchPlotPythonConductorAC( 

759 fdm=self.fdm, lossMap_gridData_folder=lossMap_gridData_folder, outputs_folder_path=outputs_folder_path, 

760 simulation_collection=self.simulation_collection, fluxmodel_collection=self.fluxmodel_collection 

761 ) 

762 self.report = PDFreport(filepath=outputs_folder_path, title='BatchPostprocReport') if self.fdm.magnet.postproc.generate_report else None 

763 self.avg_simulation_time = np.mean([sd.simulation_time for sd in self.simulation_collection]) 

764 self.total_simulation_time = np.sum([sd.simulation_time for sd in self.simulation_collection]) 

765 print('Number of simulations considered: ', len(self.simulation_collection)) 

766 print('Average simulation time: ', self.avg_simulation_time, 's') 

767 print('Total simulation time: ', self.total_simulation_time, 's') 

768 

769 def get_base_sim(self): 

770 """ Find simulation with lowest frequency and highest amplitude""" 

771 base_sim = self.simulation_collection[0] 

772 for sim in self.simulation_collection[1:]: 

773 if sim.f <= base_sim.f and sim.I_amp >= base_sim.I_amp: base_sim = sim 

774 return base_sim 

775 

776 def rohf_on_grid(self): 

777 """ Checks the fit of an existing model to a grid of simulations or fits a new ROHF model to a grid of simulations """ 

778 

779 optimized_rohf = None 

780 if self.fdm.magnet.postproc.batch_postproc.rohf_on_grid.fit_rohf: 

781 

782 tausweep_IIC = self.fdm.magnet.postproc.batch_postproc.rohf_on_grid.fit_rohf_tausweep_IIC 

783 N = self.fdm.magnet.postproc.batch_postproc.rohf_on_grid.fit_rohf_N 

784 print("FITTING ROHF TO SIMULATIONS:") 

785 print("-> number of cells N = " + str(N)) 

786 print("-> tausweep IIC = " + str(tausweep_IIC)) 

787 

788 taus = self.optimize_taus(tausweep_IIC=tausweep_IIC, N=N) 

789 kappas, alphas = self.optimize_kappas(base_taus=taus, N=N) 

790 # export resulting ROHF 

791 optimized_rohf = ROHFmodel(N=N, kappas_bar=kappas, alphas=alphas, taus=taus) 

792 print(optimized_rohf.taus) 

793 print(optimized_rohf.alphas) 

794 print(optimized_rohf.kappas_bar) 

795 self.plotter.plot_flux_time_triplet(flux_model=optimized_rohf, title='Flux in time triplet') 

796 

797 optimized_rohf.export_params(self.outputs_folder_path, 'optimized') 

798 

799 if self.report: 

800 self.report.filename = 'ROHFoptimization.pdf' 

801 self.report.add_table('Parameter results', optimized_rohf.params()) 

802 

803 if self.fdm.magnet.postproc.batch_postproc.rohf_on_grid.produce_error_map: 

804 flux_model = optimized_rohf if optimized_rohf is not None else self.fluxmodel_collection[0] 

805 self.plotter.create_errorMap(flux_model=flux_model) 

806 

807 if self.report: 

808 self.report.add_table('Error map parameter', optimized_rohf.params()) 

809 

810 def optimize_kappas(self, N, base_kappas=None, base_taus=None): 

811 

812 base_sim = self.get_base_sim() 

813 if base_kappas is None: 

814 base_kappas = np.linspace(0, base_sim.I_amp, N + 1)[:-1] # linear spaced over max I range 

815 # base_kappas = np.geomspace(1, base_sim.I_amp, N+1)[:-1]-1 

816 if base_taus is None: 

817 base_taus = np.zeros(N) 

818 baseROHF = ROHFmodel(simulation=base_sim, taus=base_taus, kappas_bar=base_kappas) 

819 

820 print(" RUNNING KAPPA & WEIGHT OPTIMIZATION ...") 

821 

822 # ERROR ------------------------------------------------------------------------------------------------------------------------------- 

823 def fit_error_kappas(normalized_kappas): 

824 # static alpha fit based on recursion of kappas -> piecewise linear fit 

825 baseROHF.fit_data(taus=base_taus, simulation=base_sim, kappas_bar=normalized_kappas * base_sim.I_amp) 

826 flux_error = 0 

827 for simulation in self.simulation_collection: 

828 time, I, flux = simulation.halfcycle(current=True) 

829 rohf_flux, _, _, _, _ = baseROHF.compute(time, I) 

830 flux_error = flux_error + sum(np.diff(time, prepend=0) * np.absolute(flux - rohf_flux)) 

831 return flux_error 

832 # BOUNDARIES -------------------------------------------------------------------------------------------------------------------------- 

833 

834 bounds = optimize.Bounds(lb=0, ub=1, keep_feasible=True) 

835 # OPTIMIZATION 1: KAPPA DISTRIBUTION & WEIGHTS ---------------------------------------------------------------------------------------- 

836 kappa_optimization = optimize.minimize(fit_error_kappas, method='L-BFGS-B', x0=base_kappas / base_sim.I_amp, bounds=bounds, options={"disp": True}) 

837 optimized_kappas = np.sort(kappa_optimization.x * base_sim.I_amp) 

838 print(" --> OPTIMIZED KAPPAS: " + str(optimized_kappas)) 

839 

840 # ERROR ------------------------------------------------------------------------------------------------------------------------------- 

841 def fit_error_alphas(alphas): 

842 baseROHF.alphas = alphas 

843 flux_error = 0 

844 for simulation in self.simulation_collection: 

845 time, I, flux = simulation.halfcycle(current=True) 

846 rohf_flux, _, _, _, _ = baseROHF.compute(time, I) 

847 flux_error = flux_error + sum(abs(np.diff(I, prepend=0)) * np.diff(time, prepend=0) * np.absolute(flux - rohf_flux)) 

848 return flux_error 

849 # BOUNDARIES -------------------------------------------------------------------------------------------------------------------------- 

850 

851 bounds = optimize.Bounds(lb=0, ub=10, keep_feasible=True) 

852 # OPTIMIZATION 2: WEIGHTS ONLY --------------------------------------------------------------------------------------------------------- 

853 alpha_optimization = optimize.minimize(fit_error_alphas, method='L-BFGS-B', x0=baseROHF.alphas, bounds=bounds, options={"disp": True}) 

854 optimized_alphas = alpha_optimization.x 

855 print(" --> OPTIMIZED ALPHAS: " + str(optimized_alphas)) 

856 

857 return [optimized_kappas, optimized_alphas] 

858 

859 def optimize_taus(self, tausweep_IIC, N, base_taus=None, base_kappas=None): 

860 

861 force_tau1_zero = False 

862 base_sim = self.get_base_sim() 

863 

864 amplitudes = sorted(list(set([sim.I_amp for sim in self.simulation_collection]))) 

865 tausweep_I = min(amplitudes, key=lambda x: abs(x - tausweep_IIC * amplitudes[-1])) # get grid current closest to tausweep IIC 

866 tau_sweep = [sim for sim in self.simulation_collection if sim.I_amp == tausweep_I] # get asociated sweep 

867 

868 freq = sorted(list(set([sim.f for sim in tau_sweep]))) 

869 if base_kappas is None: 

870 base_kappas = np.linspace(0, amplitudes[-1], N + 1)[:-1] # start with linear spaced over max_I range 

871 if base_taus is None: 

872 # Estimate initial tau values with lowpass approximation 

873 loss = [sim.total_power_per_cycle['FilamentLoss'] for sim in tau_sweep] 

874 resamp_freq = np.linspace(freq[0], freq[-1], 500) 

875 resamp_loss = np.interp(resamp_freq, freq, loss) 

876 idx = np.abs(loss - resamp_loss[0] * 0.707).argmin() # -3dB mark -> crit freq 

877 tau_star = 1 / (2 * np.pi * resamp_freq[idx]) # f_c of lowpass 

878 base_taus = base_kappas / (N * tausweep_I) * tau_star # scale estimate down w. cell level 

879 

880 baseROHF = ROHFmodel(N=N, simulation=base_sim, kappas_bar=base_kappas, taus=base_taus) 

881 self.plotter.plot_percycle_losses(sims=tau_sweep, flux_model=baseROHF, title='STARTING TAUS') 

882 

883 if True: 

884 print(" RUNNING TAU OPTIMIZATION ... ") 

885 

886 # ERROR ------------------------------------------------------------------------------------------------------------------------------- 

887 def fit_error(taus): 

888 baseROHF.taus = np.concatenate(([0], taus)) if force_tau1_zero else taus 

889 error = 0 

890 for idx, sim in enumerate(self.simulation_collection): 

891 time, I, flux = sim.halfcycle(current=True) 

892 flux_rohf, fill_rohf, eddy_rohf, _, _ = baseROHF.compute(time=time, I=I) 

893 # errors[idx] = abs(sim.total_power_per_cycle['EddyLoss']-eddy_pc_rohf) + abs(sim.total_power_per_cycle['FilamentLoss']-fill_pc_rohf) 

894 error = error + sum(abs((sim.power['EddyLoss'][:len(I)] - eddy_rohf)) / np.mean(sim.power['EddyLoss']) + abs((sim.power['FilamentLoss'][:len(I)] - fill_rohf)) / np.mean(sim.power['FilamentLoss'])) 

895 return error 

896 # BOUNDARIES -------------------------------------------------------------------------------------------------------------------------- 

897 

898 bounds = optimize.Bounds(lb=0.0, ub=0.1) 

899 # OPTIMIZATION ------------------------------------------------------------------------------------------------------------------------ 

900 optimization = optimize.minimize(fit_error, x0=base_taus, bounds=bounds, options={'disp': True}) # , method='SLSQP' 

901 optimized_taus = np.concatenate(([0], optimization.x[1:])) if force_tau1_zero else optimization.x 

902 

903 print(" --> OPTIMIZED TAUS: " + str(optimized_taus)) 

904 baseROHF.fit_data(simulation=base_sim, spacing_type='linear', taus=np.sort(optimized_taus)) 

905 self.plotter.plot_percycle_losses(sims=tau_sweep, flux_model=baseROHF, title='OPTIMIZED TAUS') 

906 # plt.show() 

907 return np.sort(optimized_taus) 

908 

909 else: 

910 return baseROHF.taus 

911 

912 def retrieve_simulation_data(self): 

913 """ 

914 This function iterates over the input CSV-file (specifying which simulations to postprocess) and returns a list of SimulationData objects 

915 containing all the simulation data. If no CSV-file is specified, the data from the single simulation specified in the input YAML-file is returned. 

916 """ 

917 

918 simulations_csv = self.fdm.magnet.postproc.batch_postproc.simulations_csv 

919 if simulations_csv is not None: 

920 try: 

921 self.simulations_csv = pd.read_csv(os.path.join(self.inputs_folder_path, f'{simulations_csv}.csv')) # Read the csv file with the input data 

922 except: 

923 raise FileNotFoundError(f'No csv file with the name {simulations_csv}.csv was found in the inputs folder.') 

924 

925 if self.simulations_csv is not None: 

926 simulationCollection = [] 

927 for index, row in self.simulations_csv.iterrows(): 

928 if pd.isna(row['input.run.geometry']) and pd.isna(row['input.run.mesh']) and pd.isna(row['input.run.solution']): 

929 continue 

930 geometry_name = 'Geometry_' + str(row['input.run.geometry']) 

931 mesh_name = 'Mesh_' + str(row['input.run.mesh']) 

932 

933 if isinstance(row['input.run.solution'], float) and row['input.run.solution'].is_integer(): 

934 solution_name = 'Solution_' + str(int(row['input.run.solution'])) 

935 else: 

936 solution_name = 'Solution_' + str(row['input.run.solution']) 

937 

938 # Check if the row refers to a valid simulation by checking if the solution folder exists: 

939 # solution_folder = os.path.join(os.getcwd(), 'tests', '_outputs', self.fdm.general.magnet_name, geometry_name, mesh_name, solution_name) 

940 solution_folder = os.path.join(self.model_data_output_path, geometry_name, mesh_name, solution_name) 

941 if os.path.exists(solution_folder): # If the solution folder exists, add the simulation to the simulationCollection 

942 sd = SimulationData(self.model_data_output_path, geometry_name, mesh_name, solution_name, self.fdm) 

943 if sd.simulation_time is not None: # Only add the simulation if it has finished running (and therefore has written the simulation time to a file) 

944 simulationCollection.append(sd) 

945 else: 

946 simulationCollection = [SimulationData(self.model_data_output_path, 'Geometry_' + self.fdm.run.geometry, 'Mesh_' + self.fdm.run.mesh, 'Solution_' + self.fdm.run.solution)] 

947 

948 return self.sort_simulationCollection(self.filter_simulationCollection(simulationCollection)) 

949 

950 def filter_simulationCollection(self, simulationCollection): 

951 """ 

952 This function is used to filter the simulationCollection based on the filter criterion specified in the yaml input file. 

953 An example of a filter criterion is '<<solve.source_parameters.sine.frequency>> == 18', which will disregard all simulations with frequency != 18Hz. 

954 """ 

955 if self.fdm.magnet.postproc.batch_postproc.filter.apply_filter: 

956 filter_criterion = self.fdm.magnet.postproc.batch_postproc.filter.filter_criterion 

957 class_params = re.findall('<<(.*?)>>', filter_criterion) 

958 for cp in class_params: 

959 filter_criterion = filter_criterion.replace(f"<<{cp}>>", 'sd.' + cp) 

960 filtering_function = eval(f'lambda sd: {filter_criterion}') 

961 return list(filter(filtering_function, simulationCollection)) 

962 else: 

963 return simulationCollection 

964 

965 def sort_simulationCollection(self, simulationCollection): 

966 """ 

967 This function is used to sort the simulationCollection based on the sort key specified in the yaml input file. 

968 An example of a sort key is 'solve.source_parameters.sine.frequency', which will sort the simulations based on frequency. 

969 """ 

970 if self.fdm.magnet.postproc.batch_postproc.sort.apply_sort: 

971 sorting_function = eval(f'lambda sd: sd.{self.fdm.magnet.postproc.batch_postproc.sort.sort_key}') 

972 return sorted(simulationCollection, key=sorting_function) 

973 else: 

974 return simulationCollection 

975 

976 def retrieve_fluxmodel_data(self): 

977 """ Analog function to retrieve_simulation_data but for fluxmodels (Glock-Thesis). """ 

978 fluxmodels_csv = self.fdm.magnet.postproc.batch_postproc.fluxmodels_csv 

979 if fluxmodels_csv is not None: 

980 try: 

981 self.fluxmodels_csv = pd.read_csv(os.path.join(self.inputs_folder_path, f'{fluxmodels_csv}.csv')) # Read the csv file with the input data 

982 except: 

983 raise FileNotFoundError(f'No csv file with the name {fluxmodels_csv}.csv was found in the inputs folder.') 

984 

985 fluxmodelCollection = [] 

986 if fluxmodels_csv is not None: 

987 for index, row in self.fluxmodels_csv.iterrows(): 

988 if pd.isna(row['input.run.geometry']) and pd.isna(row['input.run.mesh']) and pd.isna(row['input.run.solution']): 

989 continue 

990 geometry_name = 'Geometry_' + str(row['input.run.geometry']) 

991 parameterfile_path = os.path.join(self.model_data_output_path, geometry_name) 

992 parameterfile_name = str(row['input.run.parameterfile']) 

993 label_name = str(row['input.run.label']) 

994 

995 fluxmodelCollection.append(ROHFmodel(filepath=parameterfile_path, filename=parameterfile_name, label=label_name)) 

996 

997 return fluxmodelCollection