Coverage for fiqus/post_processors/PostProcessConductorAC.py: 9%
616 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-11-03 01:32 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-11-03 01:32 +0000
1import os, re
2import numpy as np
3import pandas as pd
4import matplotlib.pyplot as plt
6try:
7 import steammaterials # TESTING PURPOSES
9 steammaterials_flag = False
10except:
11 steammaterials_flag = True
13from scipy import constants, optimize, integrate, signal, interpolate
14from fiqus.data.DataFiQuSConductorAC_Strand import CACStrandGeometry, CACStrandMesh, CACStrandSolve
15from fiqus.plotters.PlotPythonConductorAC import BatchPlotPythonConductorAC, PlotPythonConductorAC, PDFreport
16from fiqus.utils.Utils import FilesAndFolders
17import ruamel.yaml
20class SimulationData:
21 """
22 Class used to store and manage data from a single simulation.
24 This class is responsible for loading and organizing the data from a single simulation.
25 It stores the data in various attributes and provides methods for retrieving and processing the data.
27 """
29 def __init__(self, model_data_output_path, geometry_name, mesh_name, solution_name, fdm) -> None:
30 self.model_data_output_path = model_data_output_path # This is the path to the folder where the model output data is stored (e.g. geometries)
31 self.geometry_name = geometry_name # Name of the geometry folder
32 self.mesh_name = mesh_name # Name of the mesh folder
33 self.solution_name = solution_name # Name of the solution folder
34 self.conductor_name = fdm.magnet.solve.conductor_name
36 # Organize the folders:
37 self.geometry_folder = os.path.join(self.model_data_output_path, geometry_name) # Path to the geometry folder
38 self.mesh_folder = os.path.join(self.geometry_folder, mesh_name) # Path to the mesh folder
39 self.solution_folder = os.path.join(self.mesh_folder, solution_name) # Path to the solution folder
41 # hacky solution for now ... pass conductor data through fdm because its not stored in geom yaml
42 self.conductor = fdm.conductors[self.conductor_name]
43 # Store the YAML input-files in a data model, fdm:
44 self.geometry, self.mesh, self.solve = self.retrieve_fiqusDataModel()
45 # Store losses, simulation time and check if the simulation crashed:
46 temp_file_path = os.path.join(self.solution_folder, 'text_output')
47 loss_file = [f for f in os.listdir(temp_file_path) if f.startswith('power') and f.endswith('.txt')][0]
48 self.power_columns = ['Time', 'FilamentLoss', 'CouplingLoss', 'EddyLoss', 'TotalLoss', 'CouplingLoss_dyn', 'TotalLoss_dyn'] # Only in the case dynamic correction is used, must be changed later
49 self.power = pd.read_csv(os.path.join(self.solution_folder, 'text_output', loss_file), sep=' ', names=self.power_columns) # Store instantaneous losses as pandas dataframe
50 # Add a row of zeros at the beginning of the dataframe to account for the initial condition:
51 self.power = pd.concat([pd.DataFrame({col: 0 for col in self.power_columns}, index=[0]), self.power]).reset_index(drop=True)
52 if self.solve.formulation_parameters.formulation == "CATI" and self.solve.diffusion_barriers: # Check if dm.solve has the entry for diffusion barriers
53 if self.solve.diffusion_barriers.enable: # Check if diffusion barriers are enabled
54 # Load and add the power dissipated in the diffusion barriers (if they are enabled):
55 self.power_diffusion_barriers = np.loadtxt(os.path.join(temp_file_path, 'power_diffusion_barrier.txt'), skiprows=1)
56 self.power_diffusion_barriers = np.sum(self.power_diffusion_barriers[:, 1:], axis=1)
57 self.power['CouplingLoss'] += self.power_diffusion_barriers
58 self.power['CouplingLoss_dyn'] += self.power_diffusion_barriers
59 self.power['TotalLoss'] += self.power_diffusion_barriers
60 self.power['TotalLoss_dyn'] += self.power_diffusion_barriers
61 self.crash = True if 'crash_report.txt' in os.listdir(temp_file_path) else False
62 # Store simulation time:
63 try:
64 with open(os.path.join(self.solution_folder, 'text_output', 'simulation_time.txt'), 'r') as f:
65 self.simulation_time = float(f.readline().strip())
66 except:
67 self.simulation_time = None # If the simulation time file does not exist, the simulation has not finished running.
69 # Store the rest of the post-processing data:
70 self.time = self.power['Time']
71 self.instantaneous_temperature = self.load_standard_data(os.path.join(temp_file_path, 'temperature.txt'), 1, add_initial_zero=True)
72 self.temperature = self.load_standard_data(os.path.join(temp_file_path, 'temperature.txt'), 2, add_initial_zero=True)
73 self.I_transport = self.load_standard_data(os.path.join(temp_file_path, 'I_transport.txt'), 1)
74 self.V_transport = self.load_standard_data(os.path.join(temp_file_path, 'V_transport.txt'), 1)
75 self.V_transport_unitlen = self.load_standard_data(os.path.join(temp_file_path, 'V_transport_unitlen.txt'), 1)
76 self.hs_val = self.load_standard_data(os.path.join(temp_file_path, 'hs_val.txt'), 1)
77 self.magn_fil = self.load_standard_data(os.path.join(temp_file_path, 'magn_fil.txt'), [1, 2, 3])
78 self.magn_matrix = self.load_standard_data(os.path.join(temp_file_path, 'magn_matrix.txt'), [1, 2, 3])
79 self.I = self.load_standard_data(os.path.join(temp_file_path, 'I.txt'), 1, len(self.time))
80 self.V = self.load_standard_data(os.path.join(temp_file_path, 'V.txt'), 1, len(self.time))
81 self.I_integral = self.load_standard_data(os.path.join(temp_file_path, 'I_integral.txt'), 1, len(self.time))
82 self.I_abs_integral = self.load_standard_data(os.path.join(temp_file_path, 'I_abs_integral.txt'), 1, len(self.time))
83 self.magnetic_energy_internal = self.load_standard_data(os.path.join(temp_file_path, 'magnetic_energy_internal.txt'), 1)
84 self.magnetic_energy_external = self.load_standard_data(os.path.join(temp_file_path, 'magnetic_energy_external.txt'), 1)
85 self.flux_external = self.load_standard_data(os.path.join(temp_file_path, 'flux_external.txt'), 1)
86 self.flux_internal = self.load_standard_data(os.path.join(temp_file_path, 'flux_internal.txt'), 1)
87 self.selffield_dfluxdt = self.load_standard_data(os.path.join(temp_file_path, 'selffield_dfluxdt.txt'), 1)
88 self.Ip = self.load_standard_data(os.path.join(temp_file_path, 'Ip.txt'), 1, len(self.time))
89 self.Vp = self.load_standard_data(os.path.join(temp_file_path, 'Vp.txt'), 1, len(self.time))
90 # try to init - otherwise NONE
91 self.f, self.B_background, self.I_amp, self.T_background = self.get_source_params(verbose=True)
92 # self.data_impedance = self.compute_data_impedance(verbose=False)
93 # self.analytic_RL = self.compute_analytic_RL()
94 # Integrate the losses to obtain the cumulative power and the total power per cycle:
95 self.cumulative_power, self.total_power_per_cycle = self.integrate_power() # Store cumulative power and total cumulative power per cycle
97 def export2txt(self, data_names, data_columns, filename):
98 """ This function can be used to export one or more arrays of simulation data in one txt file (stored in solution folder)."""
99 np_data = np.array(data_columns).T
100 np.savetxt(os.path.join(self.solution_folder, str(filename) + '.txt'), np_data, header=" ".join(data_names), comments="")
102 def rad_to_deg(self, radiants):
103 return radiants / np.pi * 180
105 def get_source_params(self, verbose=False):
106 source = self.solve.source_parameters.source_type
107 T = self.solve.general_parameters.temperature
108 if source == "rotating":
109 f = self.solve.source_parameters.rotating.frequency
110 B = self.solve.source_parameters.rotating.field_magnitude
111 I = 0
112 elif source == "sine":
113 f = self.solve.source_parameters.sine.frequency
114 I = self.solve.source_parameters.sine.current_amplitude
115 # background field amplitude
116 B = self.solve.source_parameters.sine.superimposed_DC.field_magnitude
117 if verbose: print("Loading Simulation: f=" + "{:.2f}".format(f) + " [Hz]; B_back=" + "{:.2f}".format(B) + " [T]; I=" + "{:.2f}".format(I) + " [A]")
118 else:
119 f = None
120 B = 0
121 I = max(abs(self.I_transport))
123 return f, B, I, T
125 def first_rise(self, current=False):
126 """ returns the time, current and flux on the first penetration interval until the first current peak/saddle is reached """
127 first_extrema_idx = signal.argrelextrema(self.I_transport if current else self.flux_internal, np.greater_equal)[0][0] # first rel extrema
128 return self.time[0:first_extrema_idx], self.I_transport[0:first_extrema_idx], self.flux_internal[0:first_extrema_idx]
130 def halfcycle(self, current=False):
131 """ returns the time, current and flux on the first halfcycle of transport current or flux """
132 data = self.I_transport if current else self.flux_internal
133 zero_crossings = np.where(np.diff(np.sign(data[1:])))[0] + 1
134 return self.time[0:zero_crossings[0]], self.I_transport[0:zero_crossings[0]], self.flux_internal[0:zero_crossings[0]]
136 def load_standard_data(self, file_path, columns, reshape=None, add_initial_zero=False):
137 """
138 There are many output .txt-files with similar format. This function loads the data from one of these files and returns it as a numpy array.
139 If the file does not exist, None is returned without raising an error.
140 """
141 try:
142 data = np.loadtxt(file_path, comments='#', usecols=columns)
143 if reshape:
144 data = data.reshape(-1, reshape).T
145 except IOError:
146 return None
148 if add_initial_zero:
149 if len(data.shape) == 1:
150 data = np.insert(data, 0, 0)
151 else:
152 zeros = np.zeros((1, data.shape[1]))
153 data = np.vstack((zeros, data))
154 return data
156 def retrieve_fiqusDataModel(self):
157 """
158 This function reads the YAML input-files for geometry, mesh and solve and stores them in three dictionaries which are returned.
159 This function is to be called only once, when the object is created.
160 """
161 geometry_dataModel = FilesAndFolders.read_data_from_yaml(os.path.join(self.geometry_folder, 'geometry.yaml'), CACStrandGeometry)
162 mesh_dataModel = FilesAndFolders.read_data_from_yaml(os.path.join(self.mesh_folder, 'mesh.yaml'), CACStrandMesh)
163 # Now that the solve.yaml file contains more than just the solve part, we query only that part here.
164 # solution_dataModel = FilesAndFolders.read_data_from_yaml(os.path.join(self.solution_folder, 'solve.yaml'), CACStrandSolve)
165 with open(os.path.join(self.solution_folder, 'solve.yaml'), 'r') as stream:
166 yaml = ruamel.yaml.YAML(typ='safe', pure=True)
167 yaml_str = yaml.load(stream)
168 solution_dataModel = CACStrandSolve(**yaml_str['solve'])
169 return geometry_dataModel, mesh_dataModel, solution_dataModel
171 def integrate_power(self):
172 """
173 This function integrates the instantaneous power over time to obtain the cumulative power.
174 It also calculates the total cumulative power per cycle.
175 The cumulative power is returned as a pandas dataframe and the total cumulative power per cycle is returned as a dictionary.
176 """
177 find_closest_idx = lambda arr, val: np.abs(arr - val).argmin()
179 t = np.array(self.power['Time'])
180 t_final = t[-1]
181 if self.f:
182 t_init = find_closest_idx(t, t_final - 1 / (self.f))
183 else:
184 t_init = find_closest_idx(t, t_final)
186 cumulative_power = pd.DataFrame(columns=self.power_columns)
187 total_power_per_cycle = {}
189 cumulative_power['Time'] = self.power["Time"]
190 for column in self.power_columns[1:]:
191 cumulative_power[column] = np.insert(integrate.cumulative_trapezoid(self.power[column], t), 0, 0)
192 total_power_per_cycle[column] = (cumulative_power[column].iloc[-1] - cumulative_power[column].iloc[t_init]) # / (np.pi*matrix_radius**2 * loss_factor) # Why do we divide by pi*matrix_radius**2*loss_factor?
194 return cumulative_power, total_power_per_cycle
196 def compute_data_impedance(self, verbose=False):
197 """
198 This function estimates a complex impedance per unit length for the OOP based on sinus fits for the transport voltage and current.
199 If no sinus fit is possible it returns NONE as impedance estimate.
200 """
201 if not self.f:
202 return None
204 I_mean = np.mean(self.I_transport)
205 V_mean = np.mean(self.V_transport_unitlen)
206 I_peak = max(abs(self.I_transport))
207 V_peak = max(abs(self.V_transport_unitlen))
208 # ignore values before this index - tweaking parameter depending on source ramping
209 N_h = round(len(self.I_transport) / 4)
210 # sine fit for current and voltage
211 optimize_func_current = lambda x: x[0] * np.sin(x[1] * self.time[N_h:] + x[2]) + x[3] - self.I_transport[N_h:]
212 optimize_func_voltage = lambda x: x[0] * np.sin(x[1] * self.time[N_h:] + x[2]) + x[3] - self.V_transport_unitlen[N_h:]
213 current_estimate = optimize.leastsq(optimize_func_current, [I_peak - I_mean, self.f, 0, I_mean])[0] # est_amp, est_freq, est_phase, est_mean
214 voltage_estimate = optimize.leastsq(optimize_func_voltage, [V_peak - V_mean, self.f, 0, V_mean])[0]
215 delta_phi = abs(voltage_estimate[2] - current_estimate[2]) % (2 * np.pi)
216 Z_magnitude = V_peak / I_peak # both sinus - no formfactor
217 L_inductance = abs(np.sin(delta_phi) * Z_magnitude) / (2 * np.pi * self.f)
218 R_resistance = abs(np.cos(delta_phi) * Z_magnitude)
219 Z_impedance = R_resistance + 2 * np.pi * self.f * L_inductance * 1j
220 if verbose:
221 print("estimated simulation Impedance: " + str(Z_magnitude) + " Ohm")
222 print(" -> L: " + str(L_inductance) + " Henry")
223 print(" -> R: " + str(R_resistance) + " Ohm")
224 print(" -> Phase: " + str(np.angle(Z_impedance, deg=True)) + " Deg")
225 return Z_impedance
227 def compute_analytic_RL(self):
228 """ This function calculates the analytic impedance parameters R & L per unit length based on reduced telegraphers equations with C, G = 0 """
230 rho_CU = 1.81e-10
231 coil_radius = self.geometry.coil_radius
232 air_radius = self.geometry.air_radius
233 strand_radius = self.conductor.strand.diameter / 2
234 filament_radius = self.conductor.strand.filament_diameter / 2
236 correction_factor = 0.9549 # CATI (needs to be corrected)
237 ell = correction_factor * self.conductor.strand.fil_twist_pitch / 6 # CATI
239 if self.geometry.type == 'strand_only':
240 # single filament in open space
241 inductance_L = constants.mu_0 / (2 * np.pi) * np.log(air_radius / filament_radius) # == coax cable
242 resistance_R = rho_CU / (np.pi * (self.conductor.strand.diameter / 2) ** 2) * ell if self.solve.general_parameters.superconductor_linear else 1e-12
243 elif self.geometry.type == 'coil':
244 # single coil winding closed in infinity
245 inductance_L = constants.mu_0 / np.pi * np.log((coil_radius * 2 - strand_radius) / strand_radius) # + ell * constants.mu_0 / (4 * np.pi)
246 resistance_R = 2 * rho_CU / (np.pi * (self.conductor.strand.diameter / 2) ** 2) * ell if self.solve.general_parameters.superconductor_linear else 1e-12
247 else:
248 return None
250 return [resistance_R, inductance_L]
253class ROHFmodel:
254 """ Base class for a Reduced Order Hysteretic Flux model. Defining a chain of rate dependent cells for flux and loss reconstruction in a homogenized strand model."""
256 def __init__(self, N=None, taus=None, kappas_bar=None, alphas=None, simulation=None, filepath=None, filename=None, spacing_type=None, label=''):
257 # direct parameter initialization
258 self.N = N
259 self.kappas_bar = kappas_bar
260 self.alphas = alphas
261 self.taus = taus
262 # Ambient sueprcon parameters
263 self.Ic_0 = 2978 # [A]
264 self.T_0 = 1.9 # [K]
265 self.Ec_0 = 1e-4 # [V/m]
266 self.n = 60
267 # internal variables
268 self.L0 = constants.mu_0 / (4 * np.pi) # lin. self inductance of cylindric wire
269 # optional info for batch plots
270 self.label = label
271 self.simulation = simulation
272 # initialization shortcuts
273 if filepath and filename:
274 self.import_params(filepath=filepath, filename=filename) # initialize from file
275 elif simulation: # fit params on the fly
276 self.fit_data(simulation=simulation, spacing_type=spacing_type, taus=taus, kappas_bar=kappas_bar, alphas=alphas)
278 def update(self, N, taus, kappas_bar, alphas):
279 self.N = N
280 self.taus = taus
281 self.kappas_bar = kappas_bar
282 self.alphas = alphas
284 def params(self):
285 return np.array([self.alphas, self.kappas_bar, self.taus]).T
287 def export_params(self, filepath, filename, txt_format=False):
288 """Exports the reduced order parameters kappa and alpha as txt file.
290 :param filepath: absolute path to storage location
291 :param filename: name of the file (without .txt ending)
292 """
293 if txt_format:
294 header = "# fitted on " + self.simulation.solution_name + "\nalphas kappas taus" if self.simulation else "# imported params \nalphas kappas taus"
295 np.savetxt(os.path.join(filepath, str(filename) + '.txt'), self.params(), header=header, comments="")
296 else:
297 df = pd.DataFrame(self.params())
298 df.to_csv(os.path.join(filepath, str(filename) + '.csv'), header=['alphas', 'kappas', 'taus'], index=None, float_format='{:e}'.format)
300 def import_params(self, filepath, filename, txt_format=False):
301 """This function reads the reduced order parameters kappa and alphas from a txt file.
303 :param filepath: absolute path to storage location
304 :param filename: name of the file (without .txt ending)
305 """
306 if txt_format:
307 data = np.loadtxt(os.path.join(filepath, filename + ".txt"), delimiter=" ", skiprows=2).T # alphas, kappas, taus
308 else:
309 data = np.array(pd.read_csv(os.path.join(filepath, filename + ".csv"), sep=',', header=1).values).T
310 print(data)
311 self.simulation = None
312 self.name = filename
313 self.update(N=len(data[0]), alphas=data[0], kappas_bar=data[1], taus=data[2])
315 def ohmic_loss(self, I, simulation=None):
316 """ This function calculates the ohmic loss in filaments and matrix of a homogenized conductor.
318 :param I: transport current
319 :param simulation: simulation data for conductor information, defaults to None
320 :return: array of [p_ohm_fil, p_ohm_mat, R_fil, R_mat] for all current steps
321 """
323 if steammaterials_flag or simulation is None:
324 # print("Assuming zero ohmic loss")
325 return np.zeros(len(I)), np.zeros(len(I))
327 # geometry parameters
328 surf_strand = np.pi * (simulation.conductor.strand.diameter / 2) ** 2
329 surf_filaments = simulation.conductor.strand.number_of_filaments * np.pi * (simulation.conductor.strand.filament_diameter / 2) ** 2
330 surf_matrix = surf_strand - surf_filaments
331 # matrix resistivity
332 matpath = os.path.join(os.path.dirname(steammaterials.__file__), 'CFUN')
333 Tspace = np.vstack((self.T_0, 0, simulation.conductor.strand.RRR)) # T, B, RRR
334 cu_space = steammaterials.SteamMaterials("CFUN_rhoCu_NIST_v2", Tspace.shape[0], Tspace.shape[1], matpath)
335 rho_matrix = cu_space.evaluate(Tspace)
336 R_matrix = rho_matrix / surf_matrix
338 # current sharing between matrix (R const) and filaments (power law) https://ieeexplore.ieee.org/document/8970335
339 grid = np.vstack((I / (surf_strand), self.Ec_0 * np.ones(len(I)), self.Ic_0 / (surf_filaments) * np.ones(len(I)), self.n * np.ones(len(I)), rho_matrix * np.ones(len(I)), surf_filaments * np.ones(len(I)), surf_matrix * np.ones(len(I))))
340 sharing_space = steammaterials.SteamMaterials("CFUN_HTS_CurrentSharing_homogenized_v1", grid.shape[0], grid.shape[1], matpath)
341 lambdas = sharing_space.evaluate(grid)
342 # current split associated ohmic losses
343 I_filaments = lambdas * I
344 I_matrix = (np.ones(len(lambdas)) - lambdas) * I
345 p_ohm_filaments = self.Ec_0 * ((abs(I_filaments)) / (self.Ic_0)) ** (self.n) * abs(I_filaments)
346 p_ohm_matrix = R_matrix * I_matrix ** 2
348 return p_ohm_filaments, p_ohm_matrix
350 def fit_scaling(self, simulation):
351 """ This method fits the internal kappa_scaling parameter of the ROHFmodel to a given magnetic background field magnitude for NbTi filaments.
353 :param B_background: magnitude of the magnetic background field in Tesla.
354 """
355 verbose = False
356 # Scaling params
357 B_background = simulation.B_background
358 T_background = simulation.T_background
359 if verbose: print("ROHF scaling: B=" + str(B_background) + " T=" + str(T_background))
361 filament_d = simulation.conductor.strand.filament_diameter
362 strand_surf = np.pi * (simulation.conductor.strand.diameter / 2) ** 2
363 filament_surf = simulation.conductor.strand.number_of_filaments * np.pi * (filament_d / 2) ** 2
364 fill_factor = filament_surf / strand_surf
365 Jc_0 = self.Ic_0 / filament_surf
367 if steammaterials_flag or B_background is None:
368 raise ValueError('cant perform field scaling fit. Need steammaterials and background field amplitude.')
369 matpath = os.path.join(os.path.dirname(steammaterials.__file__), 'CFUN')
371 # Determine ambient field with no background and max current density
372 B_space = np.linspace(0, 14, 5000)
373 numpy2d_Bspace = np.vstack((self.T_0 * np.ones(B_space.shape), B_space, 1 * np.ones(B_space.shape))) # T, B, A
374 nbti_space = steammaterials.SteamMaterials("CFUN_IcNbTi_v1", numpy2d_Bspace.shape[0], numpy2d_Bspace.shape[1], matpath)
375 jc_nbti_space = nbti_space.evaluate(numpy2d_Bspace)
376 ambient_idx = np.abs(jc_nbti_space - Jc_0).argmin()
377 B_ambient = B_space[ambient_idx]
378 if verbose: print(" Ambient field: " + str(B_ambient) + " [T]")
380 # determine absolute field value including a scaled down version of the strand ambient field
381 B_abs = B_background + B_ambient
382 # determine first scaling factor for the magnetic field
383 jc_nbti1 = steammaterials.SteamMaterials("CFUN_IcNbTi_v1", 3, 1, matpath)
384 jc_Babs = jc_nbti1.evaluate((T_background, B_abs, 1))
385 scaling = jc_Babs / Jc_0
386 # determine temperature scaling with finer ambient estimate
387 if T_background != self.T_0:
388 B_abs = float(B_background + scaling * B_ambient)
389 jc_nbti2 = steammaterials.SteamMaterials("CFUN_IcNbTi_v1", 3, 1, matpath)
390 jc_Babs = jc_nbti2.evaluate((T_background, B_abs, 1))
391 scaling = jc_Babs / Jc_0
392 if verbose: print(" Field scaling : " + str(scaling))
393 return scaling, fill_factor
395 def fit_kappas(self, simulation, spacing_type=None, verbose=False):
396 """
397 This method fits the coercivity parameters 'kappas_bar' according to a 'spacing_type' and given number of cells 'self.N'.
398 The kappas are spread on the virgin rise from zero interval of a given strand simulation and can be seen as list of activation levels for the N cells.
400 :param simulation: SimulationData object which the ROHF will get fitted to
401 :param spacing_type: Enforce a specfic spacing of the kappas (i.e. 'linear'), defaults to None
402 :param verbose: print additional information, defaults to False
403 :raises ValueError: Unknown options for spacing_type
404 :return: kappas_bar ndarray of N kappas
405 """
406 _, I_interval, flux_interval = simulation.first_rise()
407 if spacing_type:
408 if spacing_type == "exponential":
409 [a, b], res1 = optimize.curve_fit(lambda x1, a, b: a * np.exp(b * x1), I_interval / max(I_interval), flux_interval / max(flux_interval))
410 kappa_spacing = max(I_interval) * np.flip(np.exp(b) - np.exp(b * np.linspace(0, 1, num=self.N + 1))) / (np.exp(b) - 1)
411 elif spacing_type == "linear":
412 kappa_spacing = np.linspace(min(I_interval), max(I_interval), self.N + 1)
413 elif spacing_type == "invlog":
414 kappa_spacing = np.insert(max(I_interval) * (1.5 - np.geomspace(1, .5, num=self.N + 1)), 0, 0)
415 elif spacing_type == "log":
416 kappa_spacing = np.insert(np.geomspace(I_interval[1], max(I_interval), self.N), 0, 0)
417 else:
418 raise ValueError('Unknown kappa spacing for S_chain fit')
419 # get closest matching indices in I_interval as kappas
420 kappas = I_interval[self.__closest_matches(I_interval, kappa_spacing)]
421 else:
422 # calculate kappas based on minimization of linear interpolation error
423 def max_abs_error(kappas, flux_spline, I_samples):
424 # computes maximum absolute error between rohf and reference simulation on sample points
425 rohf_flux = np.interp(I_samples, kappas, flux_spline(kappas))
426 return max(abs(rohf_flux - flux_spline(I_samples)))
428 # interpolate
429 I_samples = np.linspace(0, max(I_interval), 500)
430 flux_spline = interpolate.CubicSpline(I_interval, flux_interval, extrapolate=True)
431 # minimize on interval
432 cons = (optimize.LinearConstraint([np.concatenate(([1], np.zeros(self.N)))], lb=0, ub=0),
433 optimize.LinearConstraint([np.concatenate((np.zeros(self.N), [1]))], lb=max(I_interval), ub=max(I_interval)))
434 bounds = ((0, max(I_interval)),) * (self.N + 1)
435 kappas = optimize.differential_evolution(max_abs_error, args=(flux_spline, I_samples), x0=np.linspace(0, max(I_interval), self.N + 1),
436 disp=True, constraints=cons, bounds=bounds, tol=1e-8).x
437 # kappas = np.insert(kappas, 1, 0.08*max(I_interval))
438 if verbose:
439 plt.figure()
440 plt.title("kappa distribution")
441 plt.plot(I_interval, flux_interval, '-b')
442 plt.plot(kappas, flux_spline(kappas), 'gX')
443 plt.show()
445 return kappas[:-1]
447 def fit_alphas(self, simulation, kappas_bar, dual=False, verbose=False):
448 """Calculates the cell weights alpha for each kappa by cellwise recursion, fitting the simulation data given through I_interval and flux_interval.
450 :param simulation: SimulationData object which the ROHF will get fitted to
451 :param kappas_bar: coercivity currents on virgin I_interval which are used to define N multicell ROHF
452 :param verbose: display all information, defaults to False
453 :return: kappas_bar, alphas. List of updated kappas and their matching weights. Kappas only change in case of a dual fitting strategy (dual=True)
454 """
455 _, I_interval, flux_interval = simulation.first_rise(current=True)
457 flux_spline = interpolate.CubicSpline(I_interval, flux_interval, extrapolate=True)
458 kappas = np.concatenate((kappas_bar, [I_interval[-1]])) # N + 1 kappas with interval boundary
460 # classic coercivity-weight linkage (undershoot PCloss)
461 alphas = np.zeros(len(kappas_bar))
462 delta_flux = np.diff(flux_spline(kappas))
463 delta_current = np.diff(kappas)
464 for k in range(0, self.N):
465 alphas[k] = 0 if delta_current[k] == 0 else abs(delta_flux[k] / (self.L0 * delta_current[k]) - sum(alphas))
466 if verbose: print(" optimized alphas: " + str(alphas))
468 if False:
469 # tangential coercivity-weight linkage (overshoot PCloss)
470 dflux_dI_spline = flux_spline.derivative(nu=1)
471 fluxes = flux_spline(kappas)
472 m_slopes = dflux_dI_spline(kappas)
473 alphas_tan = np.diff(m_slopes) / self.L0
474 m_slopes[0] = 0
475 b_offsets = np.zeros(self.N + 1)
476 kappas_tan = np.zeros(self.N + 1)
477 for k in range(1, self.N + 1):
478 b_offsets[k] = fluxes[k] - m_slopes[k] * kappas[k]
479 kappas_tan[k] = (b_offsets[k] - b_offsets[k - 1]) / (m_slopes[k - 1] - m_slopes[k])
480 kappas_bar_tan = kappas_tan[1:]
481 if True:
482 plt.figure()
483 plt.plot(I_interval, flux_interval, label='reference')
484 plt.scatter(kappas_bar, fluxes[:-1], label='k-a-linkage kappas')
485 plt.scatter(kappas_bar_tan, np.zeros(len(kappas_bar_tan)), label='new kappas', marker='x')
486 for k in range(0, self.N + 1):
487 plt.plot(I_interval, b_offsets[k] + m_slopes[k] * I_interval)
488 plt.ylim([0, max(flux_interval)])
489 plt.legend()
490 plt.plot()
491 plt.show()
493 return kappas_bar, 0.97 * alphas
495 def fit_data(self, simulation, spacing_type=None, dual=False, taus=None, kappas_bar=None, alphas=None, verbose=False):
496 """Fits the reduced order model params to a given simulation data by linear interpolation between kappas.
498 :param current: transport current of the model
499 :param flux: internal flux of the model
500 :param spacing_type: kappa distribution type (lin, log, invlog, exponential), defaults to None
501 :param verbose: display all information, defaults to False
502 """
503 if self.N is None:
504 if kappas_bar is not None:
505 self.N = len(kappas_bar)
506 elif taus is not None:
507 self.N = len(taus)
508 elif alphas is not None:
509 self.N = len(alphas)
510 else:
511 raise ValueError(f'Undefined number of cells. Cant fit to data.')
513 # fit taus
514 if taus is None:
515 taus = [1.5e-5] + (self.N - 1) * [2e-4]
516 # taus = self.N*[0.0] if tau_sweep is None else self.fit_taus(tau_sweep=tau_sweep, verbose=verbose)
518 # fit kappas
519 if kappas_bar is None:
520 kappas_bar = self.fit_kappas(simulation, spacing_type=spacing_type, verbose=verbose)
522 # fit alphas
523 if alphas is None:
524 kappas_bar, alphas = self.fit_alphas(simulation, kappas_bar, verbose=verbose, dual=dual)
526 # save fitted params
527 self.simulation = simulation
528 self.update(len(taus), taus, kappas_bar, alphas)
530 def __closest_matches(self, dataset, values):
531 return [min(range(len(dataset)), key=lambda i: abs(dataset[i] - value)) for value in values]
533 def __flux_multicells(self, Irev):
534 flux = 0
535 for k in range(self.N):
536 flux += self.alphas[k] * self.L0 * Irev[k]
537 return flux
539 def __g(self, I, Irev_prev, kappa):
540 """ 1Dvector/scalar play model for g (=Irev+Ieddy) with coercivity kappas """
541 diff = I - Irev_prev
542 diff_unit = diff / abs(diff) if diff != 0 else 0
543 if abs(diff) >= kappa or diff * (diff - kappa * diff_unit) > 0:
544 return I - kappa * diff_unit
545 else:
546 return Irev_prev
548 def compute(self, time, I, sim=None, v=0):
549 """ Computing flux and losses on a given interval """
550 if (self.N or self.alphas[0] or self.kappas_bar[0] or self.taus[0]) is None:
551 raise ValueError(f'Undefined parameters. Cant compute solution.')
553 # Background field scaling
554 kappa_scaling, fill_factor = [1.0, 0.5] if sim is None else self.fit_scaling(sim)
556 # INPUT PARAMETERS -----------------------------------------------
557 nbSteps = len(I)
558 rel_tol = 1e-5 # Iterations are needed because of the field-dependent parameters.
559 iter_max = 100
560 weightsum = np.cumsum(self.alphas)
561 # RESOLUTION -----------------------------------------------------
562 flux = np.zeros((nbSteps,))
563 Irev = np.zeros((nbSteps, self.N))
564 Iirr = np.zeros((nbSteps, self.N))
565 Ieddy = np.zeros((nbSteps, self.N))
566 g = np.zeros((nbSteps, self.N))
567 iter_tot = 0
568 # Computation of the associated flux density
569 for i in range(1, nbSteps):
570 # Vector hysteresis
571 flux[i] = flux[i - 1]
572 dt = time[i] - time[i - 1]
573 conv_crit = 2
574 iter = 0
575 # scalar play model
576 while conv_crit > 1 and iter < iter_max:
577 for k in range(self.N):
578 # update g = Irev + Ieddy with "scalar" play model
579 diff = I[i] - Irev[i - 1, k]
580 diff_unit = diff / abs(diff) if diff != 0 else 0
581 # overcritical
582 if abs(I[i]) >= kappa_scaling * self.Ic_0:
583 Irev[i, k] = Irev[i - 1, k] + (1 - fill_factor) * (I[i] - I[i - 1]) / (2 * weightsum[k])
584 Ieddy[i, k] = Ieddy[i - 1, k] + self.taus[k] * (Irev[i - 1, k] - Irev[i, k]) / (dt)
585 Iirr[i, k] = I[i] - Irev[i, k] - Ieddy[i, k]
586 # undercritical
587 else:
588 if abs(diff) >= kappa_scaling * self.kappas_bar[k] or diff * (diff - kappa_scaling * self.kappas_bar[k] * diff_unit) > 0:
589 g[i, k] = I[i] - kappa_scaling * self.kappas_bar[k] * diff_unit
590 else:
591 g[i, k] = Irev[i - 1, k]
592 # g[i,k] = self.__g(I[i], Irev[i-1,k], self.kappas_bar[k])
593 # update irreversible
594 Iirr[i, k] = I[i] - g[i, k]
595 # update eddy
596 Ieddy[i, k] = self.taus[k] / (dt + self.taus[k]) * (g[i, k] - Irev[i - 1, k])
597 # calculate reversible
598 Irev[i, k] = g[i, k] - Ieddy[i, k]
600 flux_new = self.__flux_multicells(Irev[i, :])
601 conv_crit = abs((flux_new - flux[i]) / (flux[i] + 1e-10)) / rel_tol
602 flux[i] = flux_new
603 iter += 1
604 iter_tot += iter
605 if iter == iter_max:
606 print(f"Step {i} reached maximum of {iter} iterations.")
607 if v == 1:
608 print(f'Finished with {iter_tot / nbSteps} iterations per step on average.')
609 # POST-PROCESSING ------------------------------------------------
610 # Computation of the stored and dissipated power quantities
611 p_ohm_fil, p_ohm_mat = self.ohmic_loss(I, simulation=sim)
612 p_rev = 0
613 p_irr = 0
614 p_eddy = 0
615 for k in range(self.N):
616 dfluxdt = self.L0 * np.concatenate(([0], np.diff(Irev[:, k]) / np.diff(time))) # np.gradient(Irev[:,k], time)
617 # Instantaneous power. Stored: \sum_k ( alpha_k * Irev_k * dphi_k/dt ). Dissipated: \sum_k ( alpha_k * Iirr_k * dphi_k/dt )
618 p_rev += self.alphas[k] * Irev[:, k] * dfluxdt # See Appendix B.2. of J. Dular's thesis (https://orbi.uliege.be/handle/2268/298054)
619 p_irr += self.alphas[k] * Iirr[:, k] * dfluxdt
620 p_eddy += self.alphas[k] * Ieddy[:, k] * dfluxdt
622 # Per cycle loss
623 p_fil_cycle = None
624 p_mat_cycle = None
625 if sim and sim.f:
626 t_end = np.array(time)[-1]
627 # Cumulative integration over one cycle
628 x_init = self.__closest_matches(time, [t_end - 1 / (sim.f)])[0]
629 cumul_p_fil = integrate.cumulative_trapezoid(p_irr + p_ohm_fil, time, initial=0)
630 cumul_p_mat = integrate.cumulative_trapezoid(p_eddy + p_ohm_mat, time, initial=0)
631 p_fil_cycle = (cumul_p_fil[-1] - cumul_p_fil[x_init])
632 p_mat_cycle = (cumul_p_mat[-1] - cumul_p_mat[x_init])
634 return flux, p_irr + p_ohm_fil, p_eddy + p_ohm_mat, p_fil_cycle, p_mat_cycle
637class PostProcess:
638 """
639 This class loads and stores the data from a simulation and can apply various postprocessing operations on the data.
640 The simulation data is saved as a SimulationData object.
641 """
643 def __init__(self, fdm, model_data_output_path) -> None:
644 self.fdm = fdm
645 self.model_data_output_path = model_data_output_path
646 self.geometry_name = f"Geometry_{self.fdm.run.geometry}"
647 self.mesh_name = f"Mesh_{self.fdm.run.mesh}"
648 self.solution_name = f"Solution_{self.fdm.run.solution}"
650 self.simulation_data = SimulationData(model_data_output_path, self.geometry_name, self.mesh_name, self.solution_name, self.fdm)
651 self.flux_model = ROHFmodel(N=self.fdm.magnet.postproc.plot_flux.rohf_N,
652 spacing_type=self.fdm.magnet.postproc.plot_flux.rohf_kappa_spacing,
653 simulation=self.simulation_data,
654 filename=self.fdm.magnet.postproc.plot_flux.rohf_file,
655 filepath=os.path.join(self.model_data_output_path, self.geometry_name),
656 ) if self.fdm.magnet.postproc.plot_flux.rohf else None
657 self.plotter = PlotPythonConductorAC(fdm, model_data_output_path, self.simulation_data, self.flux_model)
658 self.report = PDFreport(filepath=model_data_output_path,
659 title=f'{self.fdm.run.geometry}_{self.fdm.run.mesh}_{self.fdm.run.solution}' if self.fdm.magnet.postproc.generate_report else None)
661 def rohf_background(self):
662 """ Test for ROHFmodel background field scaling """
663 if steammaterials_flag or self.flux_model is None:
664 # skip
665 return
666 matpath = os.path.join(os.path.dirname(steammaterials.__file__), 'CFUN')
668 Ic_0 = 2900
669 B_background = self.simulation_data.B_background
671 # analytic self-field on strand boundary
672 CATI_surf = 54 * np.pi * (4.5e-05) ** 2 # 54 filament surface in CATI paper strand
673 Jc_0 = Ic_0 / CATI_surf
674 B_space = np.linspace(0, 14, 5000)
675 # Determine ambient field with no background field and max current density
676 numpy2d_Bspace = np.vstack((1.9 * np.ones(B_space.shape), B_space, 1 * np.ones(B_space.shape))) # T, B, A
677 nbti_space = steammaterials.SteamMaterials("CFUN_IcNbTi_v1", numpy2d_Bspace.shape[0], numpy2d_Bspace.shape[1], matpath)
678 jc_nbti_space = nbti_space.evaluate(numpy2d_Bspace)
679 B_ambient = B_space[np.abs(jc_nbti_space - Jc_0).argmin()]
680 print("B_ambient: " + str(B_ambient) + " [T]")
682 # Determine absolute field value which the strand experiences
683 B_abs = B_background + (B_ambient / B_background) * B_ambient if B_background > B_ambient else B_background + B_ambient
684 # material Jc(B) fit
685 jc_nbti = steammaterials.SteamMaterials("CFUN_IcNbTi_v1", 3, 1, matpath)
686 jc_Babs = jc_nbti.evaluate((1.9, B_abs, 1))[0]
688 # PLOT
689 plt.figure()
690 plt.plot(B_space, jc_nbti_space, label='CFUN_IcNbTi_v1')
691 plt.scatter((B_ambient, B_abs), (Jc_0, jc_Babs), label='Jc(B)', )
692 plt.title('Nb-Ti')
693 plt.grid()
694 plt.legend()
695 plt.xlabel("Field (T)")
696 plt.ylabel("jc (A/m2)")
698 kappa_scaling = jc_Babs / Jc_0
699 print(" Kappa scaling factor: " + str(kappa_scaling))
700 self.flux_model.kappa_scaling = kappa_scaling
701 plt.show()
703 def internal_flux(self):
704 """ Postproc routine for the internal flux and the related ROHF model """
705 self.plotter.plot_transport()
706 self.plotter.plot_flux()
707 self.plotter.plot_power_loss()
708 self.plotter.plot_resistivity()
710 # self.flux_model.export_params(filepath=os.path.join(self.model_data_output_path, self.geometry_name), filename="TESTFILE")
711 # self.flux_model.import_params(filepath=os.path.join(self.model_data_output_path, self.geometry_name), filename="TESTFILE")
713 if self.report and self.flux_model:
714 self.report.add_table('ROHF parameters', self.flux_model.params())
716 def instantaneous_loss(self):
717 print("Total loss: ", self.simulation_data.cumulative_power["TotalLoss"].iloc[-1])
718 print("Total filament loss: ", self.simulation_data.cumulative_power["FilamentLoss"].iloc[-1])
719 print("Total coupling loss: ", self.simulation_data.cumulative_power["CouplingLoss"].iloc[-1])
720 print("Total eddy loss: ", self.simulation_data.cumulative_power["EddyLoss"].iloc[-1])
721 plot_options = self.fdm.magnet.postproc.plot_instantaneous_power
722 self.plotter.plot_instantaneous_power(
723 show=plot_options.show,
724 title=plot_options.title,
725 save_plot=plot_options.save,
726 save_folder_path=os.path.join(self.model_data_output_path, self.geometry_name, self.mesh_name, self.solution_name),
727 save_file_name=plot_options.save_file_name,
728 overwrite=self.fdm.run.overwrite
729 )
731 def plot_impedance(self):
732 """ Plots the impedance estimate and transport quantities """
733 conductor = self.fdm.conductors[self.fdm.magnet.solve.conductor_name]
734 frequency_range = np.logspace(-2, 6, 1000)
736 self.simulation_data.plot_transport()
738 #### VOLTAGE ####
739 # self.simulation_data.plot_phaseshift_voltage(90)
740 # self.simulation_data.plot_voltage_decomp()
742 #### IMPEDANCE ####
743 self.simulation_data.compare_impedance(frequency_range)
746class BatchPostProcess:
747 """
748 This class loads and stores data from multiple simulations (+flux models) and performs operations on them
749 """
751 def __init__(self, fdm, lossMap_gridData_folder, inputs_folder_path, outputs_folder_path) -> None:
752 self.fdm = fdm
753 self.inputs_folder_path = inputs_folder_path # This is the path to the folder where the input data is stored
754 self.model_data_output_path = outputs_folder_path # This is the path to the folder where the model output data is stored (e.g. geometries)
755 self.outputs_folder_path = os.path.join(outputs_folder_path, fdm.magnet.postproc.output_folder) # This is the path to the folder where the postprocessed data is written
756 self.simulation_collection = self.retrieve_simulation_data()
757 self.fluxmodel_collection = self.retrieve_fluxmodel_data()
758 self.plotter = BatchPlotPythonConductorAC(
759 fdm=self.fdm, lossMap_gridData_folder=lossMap_gridData_folder, outputs_folder_path=outputs_folder_path,
760 simulation_collection=self.simulation_collection, fluxmodel_collection=self.fluxmodel_collection
761 )
762 self.report = PDFreport(filepath=outputs_folder_path, title='BatchPostprocReport') if self.fdm.magnet.postproc.generate_report else None
763 self.avg_simulation_time = np.mean([sd.simulation_time for sd in self.simulation_collection])
764 self.total_simulation_time = np.sum([sd.simulation_time for sd in self.simulation_collection])
765 print('Number of simulations considered: ', len(self.simulation_collection))
766 print('Average simulation time: ', self.avg_simulation_time, 's')
767 print('Total simulation time: ', self.total_simulation_time, 's')
769 def get_base_sim(self):
770 """ Find simulation with lowest frequency and highest amplitude"""
771 base_sim = self.simulation_collection[0]
772 for sim in self.simulation_collection[1:]:
773 if sim.f <= base_sim.f and sim.I_amp >= base_sim.I_amp: base_sim = sim
774 return base_sim
776 def rohf_on_grid(self):
777 """ Checks the fit of an existing model to a grid of simulations or fits a new ROHF model to a grid of simulations """
779 optimized_rohf = None
780 if self.fdm.magnet.postproc.batch_postproc.rohf_on_grid.fit_rohf:
782 tausweep_IIC = self.fdm.magnet.postproc.batch_postproc.rohf_on_grid.fit_rohf_tausweep_IIC
783 N = self.fdm.magnet.postproc.batch_postproc.rohf_on_grid.fit_rohf_N
784 print("FITTING ROHF TO SIMULATIONS:")
785 print("-> number of cells N = " + str(N))
786 print("-> tausweep IIC = " + str(tausweep_IIC))
788 taus = self.optimize_taus(tausweep_IIC=tausweep_IIC, N=N)
789 kappas, alphas = self.optimize_kappas(base_taus=taus, N=N)
790 # export resulting ROHF
791 optimized_rohf = ROHFmodel(N=N, kappas_bar=kappas, alphas=alphas, taus=taus)
792 print(optimized_rohf.taus)
793 print(optimized_rohf.alphas)
794 print(optimized_rohf.kappas_bar)
795 self.plotter.plot_flux_time_triplet(flux_model=optimized_rohf, title='Flux in time triplet')
797 optimized_rohf.export_params(self.outputs_folder_path, 'optimized')
799 if self.report:
800 self.report.filename = 'ROHFoptimization.pdf'
801 self.report.add_table('Parameter results', optimized_rohf.params())
803 if self.fdm.magnet.postproc.batch_postproc.rohf_on_grid.produce_error_map:
804 flux_model = optimized_rohf if optimized_rohf is not None else self.fluxmodel_collection[0]
805 self.plotter.create_errorMap(flux_model=flux_model)
807 if self.report:
808 self.report.add_table('Error map parameter', optimized_rohf.params())
810 def optimize_kappas(self, N, base_kappas=None, base_taus=None):
812 base_sim = self.get_base_sim()
813 if base_kappas is None:
814 base_kappas = np.linspace(0, base_sim.I_amp, N + 1)[:-1] # linear spaced over max I range
815 # base_kappas = np.geomspace(1, base_sim.I_amp, N+1)[:-1]-1
816 if base_taus is None:
817 base_taus = np.zeros(N)
818 baseROHF = ROHFmodel(simulation=base_sim, taus=base_taus, kappas_bar=base_kappas)
820 print(" RUNNING KAPPA & WEIGHT OPTIMIZATION ...")
822 # ERROR -------------------------------------------------------------------------------------------------------------------------------
823 def fit_error_kappas(normalized_kappas):
824 # static alpha fit based on recursion of kappas -> piecewise linear fit
825 baseROHF.fit_data(taus=base_taus, simulation=base_sim, kappas_bar=normalized_kappas * base_sim.I_amp)
826 flux_error = 0
827 for simulation in self.simulation_collection:
828 time, I, flux = simulation.halfcycle(current=True)
829 rohf_flux, _, _, _, _ = baseROHF.compute(time, I)
830 flux_error = flux_error + sum(np.diff(time, prepend=0) * np.absolute(flux - rohf_flux))
831 return flux_error
832 # BOUNDARIES --------------------------------------------------------------------------------------------------------------------------
834 bounds = optimize.Bounds(lb=0, ub=1, keep_feasible=True)
835 # OPTIMIZATION 1: KAPPA DISTRIBUTION & WEIGHTS ----------------------------------------------------------------------------------------
836 kappa_optimization = optimize.minimize(fit_error_kappas, method='L-BFGS-B', x0=base_kappas / base_sim.I_amp, bounds=bounds, options={"disp": True})
837 optimized_kappas = np.sort(kappa_optimization.x * base_sim.I_amp)
838 print(" --> OPTIMIZED KAPPAS: " + str(optimized_kappas))
840 # ERROR -------------------------------------------------------------------------------------------------------------------------------
841 def fit_error_alphas(alphas):
842 baseROHF.alphas = alphas
843 flux_error = 0
844 for simulation in self.simulation_collection:
845 time, I, flux = simulation.halfcycle(current=True)
846 rohf_flux, _, _, _, _ = baseROHF.compute(time, I)
847 flux_error = flux_error + sum(abs(np.diff(I, prepend=0)) * np.diff(time, prepend=0) * np.absolute(flux - rohf_flux))
848 return flux_error
849 # BOUNDARIES --------------------------------------------------------------------------------------------------------------------------
851 bounds = optimize.Bounds(lb=0, ub=10, keep_feasible=True)
852 # OPTIMIZATION 2: WEIGHTS ONLY ---------------------------------------------------------------------------------------------------------
853 alpha_optimization = optimize.minimize(fit_error_alphas, method='L-BFGS-B', x0=baseROHF.alphas, bounds=bounds, options={"disp": True})
854 optimized_alphas = alpha_optimization.x
855 print(" --> OPTIMIZED ALPHAS: " + str(optimized_alphas))
857 return [optimized_kappas, optimized_alphas]
859 def optimize_taus(self, tausweep_IIC, N, base_taus=None, base_kappas=None):
861 force_tau1_zero = False
862 base_sim = self.get_base_sim()
864 amplitudes = sorted(list(set([sim.I_amp for sim in self.simulation_collection])))
865 tausweep_I = min(amplitudes, key=lambda x: abs(x - tausweep_IIC * amplitudes[-1])) # get grid current closest to tausweep IIC
866 tau_sweep = [sim for sim in self.simulation_collection if sim.I_amp == tausweep_I] # get asociated sweep
868 freq = sorted(list(set([sim.f for sim in tau_sweep])))
869 if base_kappas is None:
870 base_kappas = np.linspace(0, amplitudes[-1], N + 1)[:-1] # start with linear spaced over max_I range
871 if base_taus is None:
872 # Estimate initial tau values with lowpass approximation
873 loss = [sim.total_power_per_cycle['FilamentLoss'] for sim in tau_sweep]
874 resamp_freq = np.linspace(freq[0], freq[-1], 500)
875 resamp_loss = np.interp(resamp_freq, freq, loss)
876 idx = np.abs(loss - resamp_loss[0] * 0.707).argmin() # -3dB mark -> crit freq
877 tau_star = 1 / (2 * np.pi * resamp_freq[idx]) # f_c of lowpass
878 base_taus = base_kappas / (N * tausweep_I) * tau_star # scale estimate down w. cell level
880 baseROHF = ROHFmodel(N=N, simulation=base_sim, kappas_bar=base_kappas, taus=base_taus)
881 self.plotter.plot_percycle_losses(sims=tau_sweep, flux_model=baseROHF, title='STARTING TAUS')
883 if True:
884 print(" RUNNING TAU OPTIMIZATION ... ")
886 # ERROR -------------------------------------------------------------------------------------------------------------------------------
887 def fit_error(taus):
888 baseROHF.taus = np.concatenate(([0], taus)) if force_tau1_zero else taus
889 error = 0
890 for idx, sim in enumerate(self.simulation_collection):
891 time, I, flux = sim.halfcycle(current=True)
892 flux_rohf, fill_rohf, eddy_rohf, _, _ = baseROHF.compute(time=time, I=I)
893 # errors[idx] = abs(sim.total_power_per_cycle['EddyLoss']-eddy_pc_rohf) + abs(sim.total_power_per_cycle['FilamentLoss']-fill_pc_rohf)
894 error = error + sum(abs((sim.power['EddyLoss'][:len(I)] - eddy_rohf)) / np.mean(sim.power['EddyLoss']) + abs((sim.power['FilamentLoss'][:len(I)] - fill_rohf)) / np.mean(sim.power['FilamentLoss']))
895 return error
896 # BOUNDARIES --------------------------------------------------------------------------------------------------------------------------
898 bounds = optimize.Bounds(lb=0.0, ub=0.1)
899 # OPTIMIZATION ------------------------------------------------------------------------------------------------------------------------
900 optimization = optimize.minimize(fit_error, x0=base_taus, bounds=bounds, options={'disp': True}) # , method='SLSQP'
901 optimized_taus = np.concatenate(([0], optimization.x[1:])) if force_tau1_zero else optimization.x
903 print(" --> OPTIMIZED TAUS: " + str(optimized_taus))
904 baseROHF.fit_data(simulation=base_sim, spacing_type='linear', taus=np.sort(optimized_taus))
905 self.plotter.plot_percycle_losses(sims=tau_sweep, flux_model=baseROHF, title='OPTIMIZED TAUS')
906 # plt.show()
907 return np.sort(optimized_taus)
909 else:
910 return baseROHF.taus
912 def retrieve_simulation_data(self):
913 """
914 This function iterates over the input CSV-file (specifying which simulations to postprocess) and returns a list of SimulationData objects
915 containing all the simulation data. If no CSV-file is specified, the data from the single simulation specified in the input YAML-file is returned.
916 """
918 simulations_csv = self.fdm.magnet.postproc.batch_postproc.simulations_csv
919 if simulations_csv is not None:
920 try:
921 self.simulations_csv = pd.read_csv(os.path.join(self.inputs_folder_path, f'{simulations_csv}.csv')) # Read the csv file with the input data
922 except:
923 raise FileNotFoundError(f'No csv file with the name {simulations_csv}.csv was found in the inputs folder.')
925 if self.simulations_csv is not None:
926 simulationCollection = []
927 for index, row in self.simulations_csv.iterrows():
928 if pd.isna(row['input.run.geometry']) and pd.isna(row['input.run.mesh']) and pd.isna(row['input.run.solution']):
929 continue
930 geometry_name = 'Geometry_' + str(row['input.run.geometry'])
931 mesh_name = 'Mesh_' + str(row['input.run.mesh'])
933 if isinstance(row['input.run.solution'], float) and row['input.run.solution'].is_integer():
934 solution_name = 'Solution_' + str(int(row['input.run.solution']))
935 else:
936 solution_name = 'Solution_' + str(row['input.run.solution'])
938 # Check if the row refers to a valid simulation by checking if the solution folder exists:
939 # solution_folder = os.path.join(os.getcwd(), 'tests', '_outputs', self.fdm.general.magnet_name, geometry_name, mesh_name, solution_name)
940 solution_folder = os.path.join(self.model_data_output_path, geometry_name, mesh_name, solution_name)
941 if os.path.exists(solution_folder): # If the solution folder exists, add the simulation to the simulationCollection
942 sd = SimulationData(self.model_data_output_path, geometry_name, mesh_name, solution_name, self.fdm)
943 if sd.simulation_time is not None: # Only add the simulation if it has finished running (and therefore has written the simulation time to a file)
944 simulationCollection.append(sd)
945 else:
946 simulationCollection = [SimulationData(self.model_data_output_path, 'Geometry_' + self.fdm.run.geometry, 'Mesh_' + self.fdm.run.mesh, 'Solution_' + self.fdm.run.solution)]
948 return self.sort_simulationCollection(self.filter_simulationCollection(simulationCollection))
950 def filter_simulationCollection(self, simulationCollection):
951 """
952 This function is used to filter the simulationCollection based on the filter criterion specified in the yaml input file.
953 An example of a filter criterion is '<<solve.source_parameters.sine.frequency>> == 18', which will disregard all simulations with frequency != 18Hz.
954 """
955 if self.fdm.magnet.postproc.batch_postproc.filter.apply_filter:
956 filter_criterion = self.fdm.magnet.postproc.batch_postproc.filter.filter_criterion
957 class_params = re.findall('<<(.*?)>>', filter_criterion)
958 for cp in class_params:
959 filter_criterion = filter_criterion.replace(f"<<{cp}>>", 'sd.' + cp)
960 filtering_function = eval(f'lambda sd: {filter_criterion}')
961 return list(filter(filtering_function, simulationCollection))
962 else:
963 return simulationCollection
965 def sort_simulationCollection(self, simulationCollection):
966 """
967 This function is used to sort the simulationCollection based on the sort key specified in the yaml input file.
968 An example of a sort key is 'solve.source_parameters.sine.frequency', which will sort the simulations based on frequency.
969 """
970 if self.fdm.magnet.postproc.batch_postproc.sort.apply_sort:
971 sorting_function = eval(f'lambda sd: sd.{self.fdm.magnet.postproc.batch_postproc.sort.sort_key}')
972 return sorted(simulationCollection, key=sorting_function)
973 else:
974 return simulationCollection
976 def retrieve_fluxmodel_data(self):
977 """ Analog function to retrieve_simulation_data but for fluxmodels (Glock-Thesis). """
978 fluxmodels_csv = self.fdm.magnet.postproc.batch_postproc.fluxmodels_csv
979 if fluxmodels_csv is not None:
980 try:
981 self.fluxmodels_csv = pd.read_csv(os.path.join(self.inputs_folder_path, f'{fluxmodels_csv}.csv')) # Read the csv file with the input data
982 except:
983 raise FileNotFoundError(f'No csv file with the name {fluxmodels_csv}.csv was found in the inputs folder.')
985 fluxmodelCollection = []
986 if fluxmodels_csv is not None:
987 for index, row in self.fluxmodels_csv.iterrows():
988 if pd.isna(row['input.run.geometry']) and pd.isna(row['input.run.mesh']) and pd.isna(row['input.run.solution']):
989 continue
990 geometry_name = 'Geometry_' + str(row['input.run.geometry'])
991 parameterfile_path = os.path.join(self.model_data_output_path, geometry_name)
992 parameterfile_name = str(row['input.run.parameterfile'])
993 label_name = str(row['input.run.label'])
995 fluxmodelCollection.append(ROHFmodel(filepath=parameterfile_path, filename=parameterfile_name, label=label_name))
997 return fluxmodelCollection