Coverage for fiqus/getdp_runners/RunGetdpConductorAC_Rutherford.py: 72%
115 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-12-09 14:33 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-12-09 14:33 +0000
1import timeit
2import json
3import logging
4import math
5from enum import Enum
6import operator
7import itertools
8import os
9import pickle
10import subprocess
11import re
12import pandas as pd
14import gmsh
15import numpy as np
17from fiqus.data import RegionsModelFiQuS
18from fiqus.utils.Utils import GmshUtils, FilesAndFolders
19from fiqus.data.RegionsModelFiQuS import RegionsModel
20# import fiqus.data.DataConductorACGeom as geom
22from fiqus.pro_assemblers.ProAssembler import ASS_PRO
24logger = logging.getLogger('FiQuS')
26class Solve:
27 def __init__(self, fdm, GetDP_path, geometry_folder, mesh_folder, verbose=True):
28 self.fdm = fdm
29 self.cacdm = fdm.magnet
30 self.GetDP_path = GetDP_path
31 self.solution_folder = os.path.join(os.getcwd())
32 self.magnet_name = fdm.general.magnet_name
33 self.geometry_folder = geometry_folder
34 self.mesh_folder = mesh_folder
35 self.mesh_file = os.path.join(self.mesh_folder, f"{self.magnet_name}.msh")
36 self.pro_file = os.path.join(self.solution_folder, f"{self.magnet_name}.pro")
37 self.regions_file = os.path.join(mesh_folder, f"{self.magnet_name}.regions")
39 self.verbose = verbose
40 self.gu = GmshUtils(self.solution_folder, self.verbose)
41 self.gu.initialize(verbosity_Gmsh=fdm.run.verbosity_Gmsh)
43 self.ass_pro = ASS_PRO(os.path.join(self.solution_folder, self.magnet_name))
44 self.regions_model = FilesAndFolders.read_data_from_yaml(self.regions_file, RegionsModel)
45 self.material_properties_model = None
47 self.ed = {} # excitation dictionary
49 gmsh.option.setNumber("General.Terminal", verbose)
51 def read_excitation(self, inputs_folder_path):
52 """
53 Function for reading csv for the 'from_file' excitation case
54 :type inputs_folder_path: str
55 """
56 # if self.cacdm.solve.source_parameters.source_type == 'from_file':
57 # input_file = os.path.join(inputs_folder_path, self.cacdm.solve.source_parameters.source_csv_file)
58 # logger.info(f'Getting applied field and transport current from file: {input_file}')
59 # df = pd.read_csv(input_file, delimiter=',', engine='python')
60 # excitation_time = df['time'].to_numpy(dtype='float').tolist()
61 # self.ed['time'] = excitation_time
62 # excitation_b = df['b'].to_numpy(dtype='float').tolist()
63 # self.ed['b'] = excitation_b
64 # excitation_I = df['I'].to_numpy(dtype='float').tolist()
65 # self.ed['I'] = excitation_I
67 if self.cacdm.solve.source_parameters.excitation_coils.enable and self.cacdm.solve.source_parameters.excitation_coils.source_csv_file:
68 input_file = os.path.join(inputs_folder_path, self.cacdm.solve.source_parameters.excitation_coils.source_csv_file)
69 logger.info(f'Getting excitation coils currents from file: {input_file}')
70 df = pd.read_csv(input_file, delimiter=',', engine='python')
72 if( len(df.columns) != len(self.cacdm.geometry.excitation_coils.centers)+1):
73 logger.warning('Number of excitation coils in geometry ('+ str(len(self.cacdm.geometry.excitation_coils.centers))+') and input source file ('+str(len(df.columns))+') not compatible')
75 excitation_time = df['time'].to_numpy(dtype='float').tolist()
76 self.ed['time'] = excitation_time
77 for i in range(1, len(df.columns)):
78 Istr = 'I'+str(i)
79 excitation_value = df[Istr].to_numpy(dtype='float').tolist()
80 self.ed[Istr] = excitation_value
82 # def get_material_properties(self, inputs_folder_path):
83 # """
84 # Function for reading material properties from the geometry YAML file.
85 # This reads the 'solution' section of the YAML file and stores it in the solution folder.
86 # This could also be a place to change the material properties in the future.
87 # """
88 # if self.cacdm.geometry.io_settings.load.load_from_yaml:
89 # input_yaml_file = os.path.join(inputs_folder_path, self.cacdm.geometry.io_settings.load.filename)
90 # Conductor_dm = FilesAndFolders.read_data_from_yaml(input_yaml_file, geom.Conductor)
91 # solution_parameters = Conductor_dm.Solution
92 # FilesAndFolders.write_data_to_yaml(os.path.join(self.solution_folder, "MaterialProperties.yaml"), solution_parameters.dict())
93 # self.material_properties_model = solution_parameters
98 def assemble_pro(self):
99 logger.info("Assembling .pro file")
100 self.ass_pro.assemble_combined_pro(template = self.cacdm.solve.pro_template, rm = self.regions_model, dm = self.fdm, ed=self.ed, mp=self.material_properties_model)
102 def run_getdp(self, solve = True, postOperation = True, gui = False):
104 # f = self.cacdm.solve.source_parameters.frequency
105 # bmax = self.cacdm.solve.source_parameters.applied_field_amplitude
106 # Imax_ratio = self.cacdm.solve.source_parameters.ratio_of_max_imposed_current_amplitude
108 command = ["-v2", "-verbose", "3"]
109 if solve:
110 # command += ["-solve", "js_to_hs_2"] if not self.cacdm.solve.frequency_domain_solver.enable else ["-solve", "MagDyn_freq"] # for debugging purposes
111 command += ["-solve", "MagDyn"] if not self.cacdm.solve.frequency_domain_solver.enable else ["-solve", "MagDyn_freq"]
112 # Solve_only seems to always call postproc, here we only save .pos-files if specified in input file
113 # if (postOperation and not solve) or (solve and postOperation and self.cacdm.postproc.generate_pos_files):
114 # if self.cacdm.solve.formulation_parameters.dynamic_correction:
115 # command += " -pos MagDyn MagDyn_dynCorr"
116 # else:
117 command += ["-pos", "MagDyn"]
119 logger.info(f"Running GetDP with command: {command}")
120 startTime = timeit.default_timer()
121 # subprocess.run(f"{self.GetDP_path} {self.pro_file} {command} -msh {self.mesh_file}")
123 if self.cacdm.solve.general_parameters.noOfMPITasks:
124 mpi_prefix = ["mpiexec", "-np", str(self.cacdm.solve.general_parameters.noOfMPITasks)]
125 else:
126 mpi_prefix = []
128 getdpProcess = subprocess.Popen(mpi_prefix + [self.GetDP_path] + [self.pro_file] + command + ["-msh"] + [self.mesh_file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
130 with getdpProcess.stdout:
131 for line in iter(getdpProcess.stdout.readline, b""):
132 line = line.decode("utf-8").rstrip()
133 line = line.split("\r")[-1]
134 if not "Test" in line:
135 if line.startswith("Info"):
136 parsedLine = re.sub(r"Info\s+:\s+", "", line)
137 logger.info(parsedLine)
138 elif line.startswith("Warning"):
139 parsedLine = re.sub(r"Warning\s+:\s+", "", line)
140 logger.warning(parsedLine)
141 elif line.startswith("Error"):
142 parsedLine = re.sub(r"Error\s+:\s+", "", line)
143 logger.error(parsedLine)
144 logger.error("Solving CAC failed.")
145 # raise Exception(parsedLine)
146 elif re.match("##", line):
147 logger.critical(line)
148 else:
149 logger.info(line)
151 simulation_time = timeit.default_timer()-startTime
152 # Save simulation time:
153 if solve:
154 logger.info(f"Solving Rutherford has finished in {round(simulation_time, 3)} seconds.")
155 with open(self.solution_folder+f'\\txt_files\\simulation_time.txt', 'w') as file:
156 file.write(str(simulation_time))
159 if gui and ((postOperation and not solve) or (solve and postOperation and self.cacdm.postproc.generate_pos_files)):
160 # gmsh.option.setNumber("Geometry.Volumes", 1)
161 # gmsh.option.setNumber("Geometry.Surfaces", 1)
162 # gmsh.option.setNumber("Geometry.Curves", 1)
163 # gmsh.option.setNumber("Geometry.Points", 0)
164 posFiles = [
165 fileName
166 for fileName in os.listdir(self.solution_folder)
167 if fileName.endswith(".pos")
168 ]
169 for posFile in posFiles:
170 gmsh.open(os.path.join(self.solution_folder, posFile))
171 self.gu.launch_interactive_GUI()
172 else:
173 if gmsh.isInitialized():
174 gmsh.clear()
175 gmsh.finalize()
177 def cleanup(self):
178 """
179 This funtion is used to remove .pre and .res files from the solution folder, as they may be large and not needed.
180 """
181 magnet_name = self.fdm.general.magnet_name
182 cleanup = self.cacdm.postproc.cleanup
184 if cleanup.remove_res_file:
185 res_file_path = os.path.join(self.solution_folder, f"{magnet_name}.res")
186 if os.path.exists(res_file_path):
187 os.remove(res_file_path)
188 logger.info(f"Removed {magnet_name}.res")
190 if cleanup.remove_pre_file:
191 pre_file_path = os.path.join(self.solution_folder, f"{magnet_name}.pre")
192 if os.path.exists(pre_file_path):
193 os.remove(pre_file_path)
194 logger.info(f"Removed {magnet_name}.pre")
196 if cleanup.remove_msh_file:
197 msh_file_path = os.path.join(self.mesh_folder, f"{magnet_name}.msh")
198 if os.path.exists(msh_file_path):
199 os.remove(msh_file_path)
200 logger.info(f"Removed {magnet_name}.msh")