Source code for jaxfluids.input_reader
#*------------------------------------------------------------------------------*
#* JAX-FLUIDS - *
#* *
#* A fully-differentiable CFD solver for compressible two-phase flows. *
#* Copyright (C) 2022 Deniz A. Bezgin, Aaron B. Buhendwa, Nikolaus A. Adams *
#* *
#* This program is free software: you can redistribute it and/or modify *
#* it under the terms of the GNU General Public License as published by *
#* the Free Software Foundation, either version 3 of the License, or *
#* (at your option) any later version. *
#* *
#* This program is distributed in the hope that it will be useful, *
#* but WITHOUT ANY WARRANTY; without even the implied warranty of *
#* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
#* GNU General Public License for more details. *
#* *
#* You should have received a copy of the GNU General Public License *
#* along with this program. If not, see <https://www.gnu.org/licenses/>. *
#* *
#*------------------------------------------------------------------------------*
#* *
#* CONTACT *
#* *
#* deniz.bezgin@tum.de // aaron.buhendwa@tum.de // nikolaus.adams@tum.de *
#* *
#*------------------------------------------------------------------------------*
#* *
#* Munich, April 15th, 2022 *
#* *
#*------------------------------------------------------------------------------*
import os
import types
from typing import Dict, Union, Tuple
import numpy as np
import jax.numpy as jnp
import json
from jaxfluids.solvers.riemann_solvers import DICT_RIEMANN_SOLVER, DICT_SIGNAL_SPEEDS
from jaxfluids.stencils import DICT_DERIVATIVE_REINITIALIZATION, DICT_DERIVATIVE_LEVELSET_ADVECTION, DICT_SPATIAL_RECONSTRUCTION, DICT_DERIVATIVE_QUANTITY_EXTENDER, DICT_DERIVATIVE_FACE, DICT_FIRST_DERIVATIVE_CENTER, DICT_CENTRAL_RECONSTRUCTION
from jaxfluids.time_integration import DICT_TIME_INTEGRATION
from jaxfluids.materials import DICT_MATERIAL
[docs]
class InputReader:
""" The InputReader class to reads a case setup
and a numerical setup for setting up a JAX-FLUIDS simulation. Case setup
and numerical setup can be provided as either a path to a json file or
as a preloaded dictionary.
"""
def __init__(self, case_setup: Union[str, Dict], numerical_setup: Union[str, Dict]) -> None:
# READ CASE SETUP AND NUMERICAL_SETUP
if type(case_setup) == str:
self.case_setup = json.load(open(case_setup))
elif type(case_setup) == dict:
self.case_setup = case_setup
if type(numerical_setup) == str:
self.numerical_setup = json.load(open(numerical_setup))
elif type(numerical_setup) == dict:
self.numerical_setup = numerical_setup
# GENERAL
self.case_name = self.case_setup["general"]["case_name"]
self.end_time = self.case_setup["general"]["end_time"]
self.save_path = self.case_setup["general"]["save_path"]
self.save_dt = self.case_setup["general"]["save_dt"]
# LEVELSET INTERFACE INTERACTION TYPE
if "levelset" in self.numerical_setup.keys():
self.levelset_type = self.numerical_setup["levelset"]["interface_interaction"]
else:
self.levelset_type = None
# DOMAIN
self.nx = self.case_setup["domain"]["x"]["cells"]
self.ny = self.case_setup["domain"]["y"]["cells"]
self.nz = self.case_setup["domain"]["z"]["cells"]
self.nh_conservatives = self.numerical_setup["conservatives"]["halo_cells"]
self.nh_geometry = self.numerical_setup["levelset"]["halo_cells"] if self.levelset_type != None else None
self.number_of_cells = jnp.array([self.nx, self.ny, self.nz])
self.dim = sum([1 if n > 1 else 0 for n in self.number_of_cells])
self.domain_size = { "x": self.case_setup["domain"]["x"]["range"],
"y": self.case_setup["domain"]["y"]["range"],
"z": self.case_setup["domain"]["z"]["range"] }
self.axis = np.array(["x", "y", "z"])
self.active_axis = []
self.inactive_axis = []
self.active_axis_indices = []
self.inactive_axis_indices = []
for i, axis in enumerate(["x", "y", "z"]):
self.active_axis.append( axis ) if self.number_of_cells[i] > 1 else None
self.inactive_axis.append( axis ) if self.number_of_cells[i] == 1 else None
self.active_axis_indices.append( i ) if self.number_of_cells[i] > 1 else None
self.inactive_axis_indices.append( i ) if self.number_of_cells[i] == 1 else None
# BOUNDARIES - SYMMETRY, PERIODIC, WALL, DIRICHLET, NEUMANN, INACTIVE
self.boundary_location_types = self.case_setup["boundary_condition"]["types"]
self.location_to_wall_velocity = { "east": {"v": 0.0, "w": 0.0}, "west": {"v": 0.0, "w": 0.0},
"north": {"u": 0.0, "w": 0.0}, "south": {"u": 0.0, "w": 0.0},
"top": {"u": 0.0, "v": 0.0}, "bottom": {"u": 0.0, "v": 0.0} }
self.wall_velocity_functions = dict((location, self.location_to_wall_velocity[location]) for location in ["east", "west", "north", "south", "top", "bottom"])
self.dirichlet_functions = dict((location, {"rho": 1.0, "u": 0.0, "v": 0.0, "w": 0.0, "p": 1.0}) for location in ["east", "west", "north", "south", "top", "bottom"])
self.neumann_functions = dict((location, {"rho": 0.0, "u": 0.0, "v": 0.0, "w": 0.0, "p": 0.0}) for location in ["east", "west", "north", "south", "top", "bottom"])
# TODO SPLIT UP ALL OF THIS
for location in ["east", "west", "north", "south", "top", "bottom"]:
# TRANSFORM WALL VELOCITY FUNCTIONS
if "wall_velocity_functions" in self.case_setup["boundary_condition"].keys():
if location in self.case_setup["boundary_condition"]["wall_velocity_functions"].keys():
wall_velocity_functions = self.case_setup["boundary_condition"]["wall_velocity_functions"][location]
# MULTIPLE BOUNDARY TYPES AT LOCATION
if type(wall_velocity_functions) == list:
self.wall_velocity_functions[location] = []
for functions in wall_velocity_functions:
wall_velocity_dict = {}
for velocity in functions.keys():
wall_velocity_dict[velocity] = eval(functions[velocity]) if type(functions[velocity]) == str else functions[velocity]
self.wall_velocity_functions[location].append(wall_velocity_dict)
# SINGLE BOUNDARY TYPE AT LOCATION
else:
self.wall_velocity_functions[location] = {}
for velocity in wall_velocity_functions.keys():
if type(wall_velocity_functions[velocity]) == str:
self.wall_velocity_functions[location][velocity] = eval(wall_velocity_functions[velocity])
else:
self.wall_velocity_functions[location][velocity] = wall_velocity_functions[velocity]
# TRANSFORM DIRICHLET FUNCTIONS
if "dirichlet_functions" in self.case_setup["boundary_condition"].keys():
if location in self.case_setup["boundary_condition"]["dirichlet_functions"].keys():
dirichlet_functions = self.case_setup["boundary_condition"]["dirichlet_functions"][location]
# MULTIPLE BOUNDARY TYPES AT LOCATION
if type(dirichlet_functions) == list:
self.dirichlet_functions[location] = []
for functions in dirichlet_functions:
prime_dict = {}
for prime in ["rho", "u", "v", "w", "p"]:
prime_dict[prime] = eval(functions[prime]) if type(functions[prime]) == str else functions[prime]
self.dirichlet_functions[location].append(prime_dict)
# SINGLE BOUNDARY TYPE AT LOCATION
else:
for prime in ["rho", "u", "v", "w", "p"]:
if type(dirichlet_functions[prime]) == str:
self.dirichlet_functions[location][prime] = eval(dirichlet_functions[prime])
else:
self.dirichlet_functions[location][prime] = dirichlet_functions[prime]
# TRANSFORM NEUMANN FUNCTIONS
if "neumann_functions" in self.case_setup["boundary_condition"].keys():
if location in self.case_setup["boundary_condition"]["neumann_functions"].keys():
neumann_functions = self.case_setup["boundary_condition"]["neumann_functions"][location]
# MULTIPLE BOUNDARY TYPES AT LOCATION
if type(neumann_functions) == list:
self.neumann_functions[location] = []
for functions in neumann_functions:
prime_dict = {}
for prime in ["rho", "u", "v", "w", "p"]:
prime_dict[prime] = eval(functions[prime]) if type(functions[prime]) == str else functions[prime]
self.neumann_functions[location].append(prime_dict)
# SINGLE BOUNDARY TYPE AT LOCATION
else:
for prime in ["rho", "u", "v", "w", "p"]:
if type(neumann_functions[prime]) == str:
self.neumann_functions[location][prime] = eval(neumann_functions[prime])
else:
self.neumann_functions[location][prime] = neumann_functions[prime]
# INITIAL CONDITION FROM RESTART FILE
self.restart_flag = self.case_setup["restart"]["flag"] if "restart" in self.case_setup.keys() else False
self.restart_file_path = self.case_setup["restart"]["file_path"] if "restart" in self.case_setup.keys() else None
# TURBULENT INITIAL CONDITION
self.is_turb_init = True if "turb_init_params" in self.case_setup["initial_condition"].keys() else False
self.turb_init_params = self.case_setup["initial_condition"]["turb_init_params"] if "turb_init_params" in self.case_setup["initial_condition"].keys() else None
assert not (self.is_turb_init and self.levelset_type != None), "Turbulent initialization not possible with active levelset."
# INITIAL CONDITION FROM CASE SETUP FILE - ONLY USED IF RESTART AND TURBULENT INITIAL CONDITION ARE FALSE
self.initial_condition_from_case_setup = False
if self.is_turb_init == False and self.restart_flag == False:
self.initial_condition_from_case_setup = True
# INITIAL PRIMITIVE VARIABLES
if self.levelset_type != None:
assert "primes" in self.case_setup["initial_condition"].keys(), "No primes in initial condition"
if self.levelset_type == "FLUID-FLUID":
self.initial_condition = {}
for fluid in ["positive", "negative"]:
self.initial_condition[fluid] = dict((prime, eval(value)) if type(value) == str else (prime, value) for prime, value in self.case_setup["initial_condition"]["primes"][fluid].items())
elif self.levelset_type in ["FLUID-SOLID-STATIC", "FLUID-SOLID-DYNAMIC"]:
self.initial_condition = dict((prime, eval(value)) if type(value) == str else (prime, value) for prime, value in self.case_setup["initial_condition"]["primes"].items())
else:
self.initial_condition = dict((prime, eval(value)) if type(value) == str else (prime, value) for prime, value in self.case_setup["initial_condition"].items())
# INITIAL LEVELSET
if self.levelset_type != None:
assert "levelset" in self.case_setup["initial_condition"].keys(), "Levelset is active, however, no initial levelset is provided."
if type(self.case_setup["initial_condition"]["levelset"]) == str:
self.initial_levelset = eval(self.case_setup["initial_condition"]["levelset"])
elif type(self.case_setup["initial_condition"]["levelset"] == list):
self.initial_levelset = []
for i, levelset_object in enumerate(self.case_setup["initial_condition"]["levelset"]):
temp = {}
temp["shape"] = levelset_object["shape"]
temp["parameters"] = levelset_object["parameters"]
if type(levelset_object["bounding_domain"]) == str:
temp["bounding_domain"] = eval(levelset_object["bounding_domain"])
else:
assert False, "Wrong type for bounding_domain of solid levelset object %d. Must be lambda (str)." % (i)
self.initial_levelset.append(temp)
else:
assert False, "Type of initial_levelset must be lambda (str) or a list of solid levelset objects."
else:
self.initial_levelset = None
# SOLID INTERFACE VELOCITY
if self.levelset_type == "FLUID-SOLID-DYNAMIC":
assert "solid_interface_velocity" in self.case_setup.keys(), "Levelset is FLUID-SOLID-DYNAMIC, however, no solid interface velocity is provided."
if type(self.case_setup["solid_interface_velocity"]) == str:
self.solid_interface_velocity = eval(self.case_setup["solid_interface_velocity"])
elif type(self.case_setup["solid_interface_velocity"] == list):
self.solid_interface_velocity = []
for i, velocity_object in enumerate(self.case_setup["solid_interface_velocity"]):
temp = {}
if type(velocity_object["function"]) == str:
temp["function"] = eval(velocity_object["function"])
else:
assert False, "Wrong type for function of solid velocity object %d. Must be lambda (str)." % (i)
if type(velocity_object["bounding_domain"]) == str:
temp["bounding_domain"] = eval(velocity_object["bounding_domain"])
else:
assert False, "Wrong type for bounding_domain of solid velocity object %d. Must be lambda (str)." % (i)
self.solid_interface_velocity.append(temp)
else:
assert False, "Type of solid_levelset must be lambda (str) or a list of solid velocity objects."
else:
self.solid_interface_velocity = None
# PHYSICAL PARAMETERS
self.gravity = jnp.array(self.case_setup["gravity"]) if "gravity" in self.case_setup.keys() else jnp.array([0.0, 0.0, 0.0])
# MATERIAL PROPERTIES
self.material_properties = {}
if self.levelset_type == "FLUID-FLUID":
for fluid in ["positive", "negative"]:
self.material_properties[fluid] = {}
self.material_properties[fluid]["type"] = self.case_setup["material_properties"][fluid]["type"]
self.material_properties[fluid]["dynamic_viscosity"] = eval(self.case_setup["material_properties"][fluid]["dynamic_viscosity"]) if type(self.case_setup["material_properties"][fluid]["dynamic_viscosity"]) == str and self.case_setup["material_properties"][fluid]["dynamic_viscosity"] != "Sutherland" else self.case_setup["material_properties"][fluid]["dynamic_viscosity"]
self.material_properties[fluid]["sutherland_parameters"] = self.case_setup["material_properties"][fluid]["sutherland_parameters"] if "sutherland_parameters" in self.case_setup["material_properties"][fluid].keys() else None
self.material_properties[fluid]["bulk_viscosity"] = self.case_setup["material_properties"][fluid]["bulk_viscosity"]
self.material_properties[fluid]["thermal_conductivity"] = eval(self.case_setup["material_properties"][fluid]["thermal_conductivity"]) if type(self.case_setup["material_properties"][fluid]["thermal_conductivity"]) == str and self.case_setup["material_properties"][fluid]["thermal_conductivity"] != "Prandtl" else self.case_setup["material_properties"][fluid]["thermal_conductivity"]
self.material_properties[fluid]["prandtl_number"] = self.case_setup["material_properties"][fluid]["prandtl_number"] if "prandtl_number" in self.case_setup["material_properties"][fluid].keys() else None
self.material_properties[fluid]["specific_heat_ratio"] = self.case_setup["material_properties"][fluid]["specific_heat_ratio"]
self.material_properties[fluid]["specific_gas_constant"] = self.case_setup["material_properties"][fluid]["specific_gas_constant"]
self.material_properties[fluid]["background_pressure"] = self.case_setup["material_properties"][fluid]["background_pressure"] if "background_pressure" in self.case_setup["material_properties"][fluid].keys() else None
self.material_properties["pairing"] = {}
self.material_properties["pairing"]["surface_tension_coefficient"] = 0.0
if "pairing" in self.case_setup["material_properties"].keys():
if "surface_tension_coefficient" in self.case_setup["material_properties"]["pairing"].keys():
self.material_properties["pairing"]["surface_tension_coefficient"] = self.case_setup["material_properties"]["pairing"]["surface_tension_coefficient"]
else:
self.material_properties = {}
self.material_properties["type"] = self.case_setup["material_properties"]["type"]
self.material_properties["dynamic_viscosity"] = eval(self.case_setup["material_properties"]["dynamic_viscosity"]) if type(self.case_setup["material_properties"]["dynamic_viscosity"]) == str and self.case_setup["material_properties"]["dynamic_viscosity"] != "Sutherland" else self.case_setup["material_properties"]["dynamic_viscosity"]
self.material_properties["sutherland_parameters"] = self.case_setup["material_properties"]["sutherland_parameters"] if "sutherland_parameters" in self.case_setup["material_properties"].keys() else None
self.material_properties["bulk_viscosity"] = self.case_setup["material_properties"]["bulk_viscosity"]
self.material_properties["thermal_conductivity"] = eval(self.case_setup["material_properties"]["thermal_conductivity"]) if type(self.case_setup["material_properties"]["thermal_conductivity"]) == str and self.case_setup["material_properties"]["thermal_conductivity"] != "Prandtl" else self.case_setup["material_properties"]["thermal_conductivity"]
self.material_properties["prandtl_number"] = self.case_setup["material_properties"]["prandtl_number"] if "prandtl_number" in self.case_setup["material_properties"].keys() else None
self.material_properties["specific_heat_ratio"] = self.case_setup["material_properties"]["specific_heat_ratio"]
self.material_properties["specific_gas_constant"] = self.case_setup["material_properties"]["specific_gas_constant"]
self.material_properties["background_pressure"] = self.case_setup["material_properties"]["background_pressure"] if "background_pressure" in self.case_setup["material_properties"].keys() else None
# NONDIMENSIONALIZATION PARAMETERS
self.nondimensionalization_parameters = {}
self.nondimensionalization_parameters["density_reference"] = self.case_setup["nondimensionalization_parameters"]["density_reference"]
self.nondimensionalization_parameters["length_reference"] = self.case_setup["nondimensionalization_parameters"]["length_reference"]
self.nondimensionalization_parameters["velocity_reference"] = self.case_setup["nondimensionalization_parameters"]["velocity_reference"]
self.nondimensionalization_parameters["temperature_reference"] = self.case_setup["nondimensionalization_parameters"]["temperature_reference"]
# FORCINGS
if "forcings" in self.case_setup.keys():
self.mass_flow_target = eval(self.case_setup["forcings"]["mass_flow_target"]) if "mass_flow_target" in self.case_setup["forcings"].keys() and type(self.case_setup["forcings"]["mass_flow_target"]) == str else self.case_setup["forcings"]["mass_flow_target"] if "mass_flow_target" in self.case_setup["forcings"].keys() else None
self.mass_flow_direction = self.case_setup["forcings"]["mass_flow_direction"] if "mass_flow_direction" in self.case_setup["forcings"].keys() else None
self.temperature_target = eval(self.case_setup["forcings"]["temperature_target"]) if "temperature_target" in self.case_setup["forcings"].keys() and type(self.case_setup["forcings"]["temperature_target"]) == str else self.case_setup["forcings"]["temperature_target"] if "temperature_target" in self.case_setup["forcings"].keys() else None
else:
self.mass_flow_target = None
self.mass_flow_direction = None
self.temperature_target = None
# NUMERICAL SETUP DEFAULT VALUES
self.numerical_setup["output"]["logging"] = "INFO" if not "logging" in self.numerical_setup["output"].keys() else self.numerical_setup["output"]["logging"]
if "active_forcings" in self.numerical_setup.keys():
self.numerical_setup["active_forcings"]["is_mass_flow_forcing"] = False if not "is_mass_flow_forcing" in self.numerical_setup["active_forcings"].keys() else self.numerical_setup["active_forcings"]["is_mass_flow_forcing"]
self.numerical_setup["active_forcings"]["is_temperature_forcing"] = False if not "is_temperature_forcing" in self.numerical_setup["active_forcings"].keys() else self.numerical_setup["active_forcings"]["is_temperature_forcing"]
self.numerical_setup["active_forcings"]["is_turb_hit_forcing"] = False if not "is_turb_hit_forcing" in self.numerical_setup["active_forcings"].keys() else self.numerical_setup["active_forcings"]["is_turb_hit_forcing"]
else:
self.numerical_setup["active_forcings"] = {}
self.numerical_setup["active_forcings"]["is_mass_flow_forcing"] = False
self.numerical_setup["active_forcings"]["is_temperature_forcing"] = False
self.numerical_setup["active_forcings"]["is_turb_hit_forcing"] = False
self.active_forcings = jnp.array([self.numerical_setup["active_forcings"][forcing] for forcing in self.numerical_setup["active_forcings"]]).any()
self.numerical_setup["output"]["is_double_precision_compute"] = True if not "is_double_precision_compute" in self.numerical_setup["output"].keys() else self.numerical_setup["output"]["is_double_precision_compute"]
self.numerical_setup["output"]["is_double_precision_output"] = True if not "is_double_precision_output" in self.numerical_setup["output"].keys() else self.numerical_setup["output"]["is_double_precision_output"]
self.numerical_setup["output"]["is_xdmf"] = False if not "is_xdmf" in self.numerical_setup["output"].keys() else self.numerical_setup["output"]["is_xdmf"]
self.numerical_setup["output"]["derivative_stencil"] = "DC4" if not "derivative_stencil" in self.numerical_setup["output"].keys() else self.numerical_setup["output"]["derivative_stencil"]
self.numerical_setup["output"]["quantities"] = {"primes": ["density", "velocityX", "velocityY", "velocityZ", "pressure", "temperature"]} if not "quantities" in self.numerical_setup["output"].keys() else self.numerical_setup["output"]["quantities"]
# AVAILABLE OUTPUT QUANTITIES
self.available_quantities = {
"primes" : ["density", "velocity", "velocityX", "velocityY", "velocityZ", "pressure", "temperature"],
"cons": ["mass", "momentum", "momentumX", "momentumY", "momentumZ", "energy"],
"levelset": ["levelset", "volume_fraction", "mask_real", "normal", "interface_pressure", "interface_velocity"],
"real_fluid": [ "density", "velocity", "velocityX", "velocityY", "velocityZ", "pressure", "temperature",
"mass", "momentum", "momentumX", "momentumY", "momentumZ", "energy" ],
"miscellaneous": ["mach_number", "schlieren", "absolute_velocity", "vorticity", "absolute_vorticity"],
}
# THOROUGH SANITY CHECK
self.sanity_check()
[docs]
def info(self) -> Tuple[Dict, Dict]:
numerical_setup_dict = {}
for key0, item0 in self.numerical_setup.items():
if isinstance(item0, dict):
key0_ = str(key0).replace("_", " ").upper()
numerical_setup_dict[key0_] = {}
for key1, item1 in item0.items():
key1_ = str(key1).replace("_", " ").upper()
if isinstance(item1, dict):
numerical_setup_dict[key0_][key1_] = {}
for key2, item2 in item1.items():
key2_ = str(key2).replace("_", " ").upper()
numerical_setup_dict[key0_][key1_][key2_] = item2
else:
numerical_setup_dict[key0_][key1_] = item1
case_setup_dict = {}
for key0, item0 in self.case_setup.items():
if isinstance(item0, dict):
key0_ = str(key0).replace("_", " ").upper()
case_setup_dict[key0_] = {}
for key1, item1 in item0.items():
key1_ = str(key1).replace("_", " ").upper()
if isinstance(item1, dict):
case_setup_dict[key0_][key1_] = {}
for key2, item2 in item1.items():
key2_ = str(key2).replace("_", " ").upper()
if isinstance(item2, dict):
case_setup_dict[key0_][key1_][key2_] = {}
for key3, item3 in item2.items():
key3_ = str(key3).replace("_", " ").upper()
case_setup_dict[key0_][key1_][key2_][key3_] = item3
else:
case_setup_dict[key0_][key1_][key2_] = item2
else:
case_setup_dict[key0_][key1_] = item1
return numerical_setup_dict, case_setup_dict
[docs]
def sanity_check(self) -> None:
"""Checks if the case setup and numerical setup is consistent.
"""
# DOMAIN
for i in self.active_axis_indices:
assert self.number_of_cells[i] >= self.nh_conservatives, "Number of cells in %d direction must be >= halo cells"
if self.levelset_type != None:
assert self.nh_conservatives > self.nh_geometry, "halo cells for conservatives must be > halo cells for geometry"
# TIME INTEGRATOR
assert self.numerical_setup["conservatives"]["time_integration"]["integrator"] in DICT_TIME_INTEGRATION.keys(), "Time integrator %s not implemented. Choose from %s." %(self.numerical_setup["conservatives"]["time_integration"]["time_integrator"], ", ".join(DICT_TIME_INTEGRATION.keys()))
assert type(self.numerical_setup["conservatives"]["time_integration"]["CFL"]) in [float, np.float32, np.float64, jnp.float32, jnp.float64], "CFL type must be float."
assert self.numerical_setup["conservatives"]["time_integration"]["CFL"] > 0.0, "CFL number must be > 0.0."
if "fixed_timestep" in self.numerical_setup["conservatives"]["time_integration"].keys():
assert type(self.numerical_setup["conservatives"]["time_integration"]["fixed_timestep"]) in [float, np.float32, np.float64, jnp.float32, jnp.float64], "CFL type must be float."
assert self.numerical_setup["conservatives"]["time_integration"]["fixed_timestep"] > 0.0, "Fixed timestep must be > 0.0."
# CONVECTIVE SOLVER
assert self.numerical_setup["conservatives"]["convective_fluxes"]["convective_solver"] in ["GODUNOV", "FLUX-SPLITTING", "ALDM"], "Convective solver %s not implemented. Choose from GODUNOV, FLUX-SPLITTING, ALDM." %(self.numerical_setup["conservatives"]["convective_fluxes"]["convective_solver"])
# GODUNOV
if self.numerical_setup["conservatives"]["convective_fluxes"]["convective_solver"] == "GODUNOV":
assert self.numerical_setup["conservatives"]["convective_fluxes"]["riemann_solver"] in DICT_RIEMANN_SOLVER.keys(), "Riemann solver %s not implemented for GODUNOV convective solver. Choose from %s." %(self.numerical_setup["conservatives"]["convective_fluxes"]["riemann_solver"], ", ".join(DICT_RIEMANN_SOLVER.keys()))
assert self.numerical_setup["conservatives"]["convective_fluxes"]["signal_speed"] in DICT_SIGNAL_SPEEDS.keys(), "Signal speed %s not implemented for GODUNOV convective solver. Choose from %s." %(self.numerical_setup["conservatives"]["convective_fluxes"]["signal_speed"], ", ".join(DICT_SIGNAL_SPEEDS.keys()))
assert self.numerical_setup["conservatives"]["convective_fluxes"]["reconstruction_var"] in ["PRIMITIVE", "CONSERVATIVE", "CHAR-PRIMITIVE", "CHAR-CONSERVATIVE"], "Reconstruction var %s not implemented. Choose from %s." %(self.numerical_setup["conservatives"]["convective_fluxes"]["reconstruction_var"], ", ".join(["PRIMITIVE", "CONSERVATIVE", "CHAR-PRIMITIVE", "CHAR-CONSERVATIIVE"]))
# FLUX SPLITTING
elif self.numerical_setup["conservatives"]["convective_fluxes"]["convective_solver"] == "FLUX-SPLITTING":
assert self.numerical_setup["conservatives"]["convective_fluxes"]["flux_splitting"] in ["ROE", "CLLF", "LLF", "GLF"], "Flux splitting %s not implemented for FLUX-SPLITTING convective solver. Choose from %s." %(self.numerical_setup["conservatives"]["convective_fluxes"]["riemann_solver"], ", ".join(["ROE", "CLLF", "LLF", "GLF"]))
# SPATIAL RECONSTRUCTOR
if self.numerical_setup["conservatives"]["convective_fluxes"]["convective_solver"] not in ["ALDM"]:
assert self.numerical_setup["conservatives"]["convective_fluxes"]["spatial_reconstructor"] in DICT_SPATIAL_RECONSTRUCTION.keys(), "Spatial reconstruction %s not implemented. Choose from %s." %(self.numerical_setup["conservatives"]["convective_fluxes"]["spatial_reconstructor"], ", ".join(DICT_SPATIAL_RECONSTRUCTION))
# CHECK DERIVATIVE STENCILS
if self.numerical_setup["active_physics"]["is_viscous_flux"] or self.numerical_setup["active_physics"]["is_heat_flux"]:
assert self.numerical_setup["conservatives"]["dissipative_fluxes"]["derivative_stencil_face"] in DICT_DERIVATIVE_FACE.keys(), "Derivative stencil %s for cell-face derivative not implemented. Choose from %s." % (self.numerical_setup["conservatives"]["dissipative_fluxes"]["derivative_stencil_face"], ", ".join(DICT_DERIVATIVE_FACE))
assert self.numerical_setup["conservatives"]["dissipative_fluxes"]["derivative_stencil_center"] in DICT_FIRST_DERIVATIVE_CENTER.keys(), "Derivative stencil %s for cell-center derivative not implemented. Choose from %s." %(self.numerical_setup["conservatives"]["dissipative_fluxes"]["derivative_stencil_center"], ", ".join(DICT_FIRST_DERIVATIVE_CENTER))
assert self.numerical_setup["conservatives"]["dissipative_fluxes"]["reconstruction_stencil"] in DICT_CENTRAL_RECONSTRUCTION.keys(), "Central reconstruction %s not implemented. Choose from %s." %(self.numerical_setup["conservatives"]["dissipative_fluxes"]["reconstruction_stencil"], ", ".join(DICT_CENTRAL_RECONSTRUCTION))
assert self.numerical_setup["output"]["derivative_stencil"] in DICT_FIRST_DERIVATIVE_CENTER.keys(), "Derivative stencil %s for outut writer not implemented. Choose from %s." %(self.numerical_setup["output"]["derivative_stencil"], ", ".join(DICT_FIRST_DERIVATIVE_CENTER))
# CELLS AND DOMAIN LENGTH CHECK
for i, n in enumerate(self.number_of_cells):
if n < 1:
assert False, "Number of cells in %s direction is smaller than 1" % self.axis[i]
for key, item in self.domain_size.items():
assert item[0] < item[1], "Domain bound %s_max is smaller than %s_min." % (key, key)
# DIMENSION CHECK
assert self.dim == len([n for n in self.number_of_cells if n > 1]), "Dimension and number of cells do not match"
# GENERAL BOUNDARY TYPE CHECK
for i in range(2 if self.levelset_type != None else 1):
quantity = ["primes", "levelset"][i]
boundary_location_types = self.boundary_location_types[quantity] if self.levelset_type != None else self.boundary_location_types
for location_types in boundary_location_types.items():
location = location_types[0]
types_and_ranges = location_types[1]
# MULTIPLE BOUNDARIES AT SAME LOCATION - TYPE AND DIMENSION CHECK
if type(types_and_ranges) == list:
assert quantity != "levelset", "Multiple boundary types at same location for levelset not implemented."
assert self.dim == 2, "Multiple boundary types at same location only implemented in 2D."
b_types = types_and_ranges[0]
for b_type in b_types:
assert b_type in ["symmetry", "periodic", "dirichlet", "neumann", "wall", "inactive"], "Boundary type %s does not exist." % b_type
assert b_type != "periodic", "It is not possible to use boundary type periodic at a location with multiple types."
else:
b_type = location_types[1]
# GENERAL TYPE CHECK
if quantity == "primes":
assert b_type in ["symmetry", "periodic", "dirichlet", "neumann", "wall", "inactive"], "Boundary type %s does not exist for primes" % b_type
elif quantity == "levelset":
assert b_type in ["symmetry", "periodic", "neumann", "inactive"], "Boundary type %s does not exist for levelset" % b_type
# PERIODIC AT OPPOSITE LOCATION CHECK
if b_type == "periodic":
if location in ["east", "west"]:
assert boundary_location_types["east"] == boundary_location_types["west"], "%s %s boundary is periodic, however the opposite boundary is not." % (quantity, location)
if location in ["north", "south"]:
assert boundary_location_types["north"] == boundary_location_types["south"], "%s %s boundary is periodic, however the opposite boundary is not." % (quantity, location)
if location in ["top", "bottom"]:
assert boundary_location_types["top"] == boundary_location_types["bottom"], "%s %s boundary is periodic, however the opposite boundary is not." % (quantity, location)
# INACTIVE AXIS BOUNDARY TYPE CHECK
for axis in self.inactive_axis:
if axis == "x":
for location in ["east", "west"]:
if boundary_location_types[location] != "inactive":
assert False, "Axis %s is inactive, however the boundary type for the %s boundary is %s. Change to inactive" % (axis, location, boundary_location_types[location])
if axis == "y":
for location in ["north", "south"]:
if boundary_location_types[location] != "inactive":
assert False, "Axis %s is inactive, however the boundary type for the %s boundary is %s. Change to inactive" % (axis, location, boundary_location_types[location])
if axis == "z":
for location in ["top", "bottom"]:
if boundary_location_types[location] != "inactive":
assert False, "Axis %s is inactive, however the boundary type for the %s boundary is %s. Change to inactive" % (axis, location, boundary_location_types[location])
# ACTIVE AXIS BOUNDARY TYPE CHECK
for axis in self.active_axis:
if axis == "x":
for location in ["east", "west"]:
if boundary_location_types[location] == "inactive":
assert False, "Axis %s is active, however the boundary type for the %s boundary is inactive." % (axis, location)
if axis == "y":
for location in ["north", "south"]:
if boundary_location_types[location] == "inactive":
assert False, "Axis %s is active, however the boundary type for the %s boundary is inactive." % (axis, location)
if axis == "z":
for location in ["top", "bottom"]:
if boundary_location_types[location] == "inactive":
assert False, "Axis %s is active, however the boundary type for the %s boundary is inactive." % (axis, location)
boundary_types_location = self.boundary_location_types["primes"] if self.levelset_type != None else self.boundary_location_types
# WALL VELOCITY FUNCTION CHECK
for location in self.wall_velocity_functions:
wall_velocity_functions = self.wall_velocity_functions[location]
if type(wall_velocity_functions) == list:
assert type(boundary_types_location[location]) == list, "Amount of wall velocity functions and wall types at location %s does not match." % location
assert boundary_types_location[location][0].count("wall") == len(wall_velocity_functions), "Amount of wall velocity functions and wall types at location %s does not match." % location
for function in wall_velocity_functions:
assert function.keys() == self.location_to_wall_velocity[location].keys(), "Wall velocity function at location %s has keys %s, however, the keys must be %s" % (location, function.keys(), self.location_to_wall_velocity[location].keys())
for velocity in function:
if type(function[velocity]) == types.LambdaType:
argcount = function[velocity].__code__.co_argcount
assert argcount == 1, "Wall velocity lambda at location %s for velocity %s takes %d arguments, however, must take 1 argument (time). Use float for constant wall velocity." % (location, velocity, argcount)
else:
for velocity in wall_velocity_functions:
assert wall_velocity_functions.keys() == self.location_to_wall_velocity[location].keys(), "Wall velocity function at location %s has keys %s, however, the keys must be %s" % (location, wall_velocity_functions.keys(), self.location_to_wall_velocity[location].keys())
if type(wall_velocity_functions[velocity]) == types.LambdaType:
argcount = wall_velocity_functions[velocity].__code__.co_argcount
assert argcount == 1, "Wall velocity lambda at location %s for velocity %s takes %d arguments, however, must take 1 argument (time). Use float for constant wall velocity." % (location, velocity, argcount)
# DIRICHLET TYPE AND FUNCTION CHECK
for location in self.dirichlet_functions:
dirichlet_functions = self.dirichlet_functions[location]
if type(dirichlet_functions) == list:
assert type(boundary_types_location[location]) == list, "Amount of dirichlet functions and dirichlet types at location %s does not match." % location
assert boundary_types_location[location][0].count("dirichlet") == len(dirichlet_functions), "Amount of dirichlet functions and dirichlet types at location %s does not match." % location
for function in dirichlet_functions:
for prime in function:
if type(function[prime]) == types.LambdaType:
argcount = function[prime].__code__.co_argcount
if argcount != self.dim:
assert False, "Dirichlet lambda at location %s for prime state %s takes %d arguments, however the dimension is %d." % (location, prime, argcount, self.dim)
else:
for prime in dirichlet_functions:
if type(dirichlet_functions[prime]) == types.LambdaType:
argcount = dirichlet_functions[prime].__code__.co_argcount
if argcount != self.dim:
assert False, "Dirichlet lambda at location %s for prime state %s takes %d arguments, however the dimension is %d." % (location, prime, argcount, self.dim)
# NEUMANN TYPE AND FUNCTION CHECK
for location in self.neumann_functions:
neumann_functions = self.neumann_functions[location]
if type(neumann_functions) == list:
assert type(boundary_types_location[location]) == list, "Amount of neumann functions and neumann types at location %s does not match." % location
assert boundary_types_location[location][0].count("neumann") == len(neumann_functions), "Amount of neumann functions and neumann types at location %s does not match." % location
for function in neumann_functions:
for prime in function:
if type(function[prime]) == types.LambdaType:
argcount = function[prime].__code__.co_argcount
if argcount != self.dim:
assert False, "Neumann lambda at location %s for prime state %s takes %d arguments, however the dimension is %d." % (location, prime, argcount, self.dim)
else:
for prime in neumann_functions:
if type(neumann_functions[prime]) == types.LambdaType:
argcount = neumann_functions[prime].__code__.co_argcount
if argcount != self.dim:
assert False, "Neumann lambda at location %s for prime state %s takes %d arguments, however the dimension is %d." % (location, prime, argcount, self.dim)
# ARGUMENTS INITIAL CONDITION CHECK
if self.initial_condition_from_case_setup:
no_phases = 2 if self.levelset_type == "FLUID-FLUID" else 1
for i in range(no_phases):
initial_condition = self.initial_condition[["positive", "negative"][i]] if self.levelset_type == "FLUID-FLUID" else self.initial_condition
for prime in initial_condition:
initial_prime_value = initial_condition[prime]
if type(initial_prime_value) == types.LambdaType:
argcount = initial_prime_value.__code__.co_argcount
assert argcount == self.dim, "Initial condition lambda for prime state %s takes %d arguments, however the dimension is %d" % (key, argcount, self.dim)
else:
assert type(initial_prime_value) in [float, np.float32, np.float64, jnp.float32, jnp.float64], "Type of initial condition must be float or lambda (str)"
# MATERIAL PARAMETER CHECK
for fluid in ["positive", "negative"]:
material_properties = self.material_properties[fluid] if self.levelset_type == "FLUID-FLUID" else self.material_properties
assert material_properties["type"] in DICT_MATERIAL.keys(), "Material type %s not implemented. Choose from %s" % (material_properties["type"], DICT_MATERIAL.keys())
assert material_properties["specific_gas_constant"] >= 0.0, "Specific heat ratio must be >= 0.0."
assert material_properties["specific_heat_ratio"] >= 0.0, "Specific heat ratio must be >= 0.0."
if material_properties["type"] == "StiffenedGas":
assert material_properties["background_pressure"] != None, "Material is StiffenedGas, but no background pressure is provided."
if type(material_properties["dynamic_viscosity"]) in [float, np.float32, np.float64]:
assert material_properties["dynamic_viscosity"] >= 0.0, "Dynamic viscosity must be >= 0.0."
elif material_properties["dynamic_viscosity"] == "Sutherland":
assert type(material_properties["sutherland_parameters"]) == list, "Sutherland parameters must be a list with [mu_0, T_0, C]."
assert len(material_properties["sutherland_parameters"]) == 3, "Sutherland parameters must be a list with [mu_0, T_0, C]."
else:
assert type(material_properties["dynamic_viscosity"]) == types.LambdaType, "Dynamic viscosity model must either be float, 'Sutherland', LambdaType."
if type(material_properties["thermal_conductivity"]) in [float, np.float32, np.float64]:
assert material_properties["thermal_conductivity"] >= 0.0, "Thermal conductivity must be >= 0.0."
elif material_properties["thermal_conductivity"] == "Prandtl":
assert material_properties["prandtl_number"] >= 0.0 if type(material_properties["prandtl_number"]) != None else False, "Prandtl number must be >= 0.0."
else:
assert type(material_properties["thermal_conductivity"]) == types.LambdaType, "Thermal conductivity model must either be float, 'Prandtl', LambdaType."
assert material_properties["bulk_viscosity"] >= 0.0, "Bulk viscosity must be >= 0.0."
# TURB INIT CHECK
if self.is_turb_init:
assert self.turb_init_params["turb_case"] in ["RISTORCELLI", "CHANNEL", "TGV"], "Turbulent initial condition %s does not exist." %(self.turb_init_params["turb_case"])
assert self.turb_init_params is not None, "Please provide parameters for the chosen turbulent initial condition."
# MASS FLOW FORCING
if self.numerical_setup["active_forcings"]["is_mass_flow_forcing"]:
assert self.mass_flow_target != None, "Mass flow forcing is true, however, mass flow target is None."
if type(self.mass_flow_target) == types.LambdaType:
argcount = self.mass_flow_target.__code__.co_argcount
assert argcount == 1, "Mass flow target function has more than one input argument" % (argcount)
# TEMPERATURE FORCING
if self.numerical_setup["active_forcings"]["is_temperature_forcing"]:
assert self.temperature_target != None, "Temperature forcing is true, however, temperature target is None."
if type(self.temperature_target) == types.LambdaType:
argcount = self.temperature_target.__code__.co_argcount
assert argcount == self.dim + 1, "Temperature forcing is a lambda function. Dimension is %d, however, the amount of arguments for the lambda is %d." % (self.dim, argcount)
# RESTART
if self.restart_flag:
assert os.path.exists(self.restart_file_path), "Restart flag is true, however restart file path does not exist."
# LEVELSET
if self.levelset_type != None:
# GENERAL
assert self.numerical_setup["levelset"]["interface_interaction"] in [None, "FLUID-SOLID-STATIC", "FLUID-SOLID-DYNAMIC", "FLUID-FLUID"], "interface_interaction must be in [None, FLUID-SOLID-STATIC, FLUID-SOLID-DYNAMIC, FLUID-FLUID]"
assert self.initial_levelset != None, "Levelset is active, however, initial_levelset is None."
if type(self.initial_levelset) == types.LambdaType:
argcount = self.initial_levelset.__code__.co_argcount
assert argcount == self.dim, "Initial levelset lambda takes %d arguments, however the dimension is %d." % (argcount, self.dim)
elif type(self.initial_levelset) == list:
for i, levelset_object in enumerate(self.initial_levelset):
argcount = levelset_object["bounding_domain"].__code__.co_argcount
assert argcount == self.dim, "Initial levelset object %d bounding domain lambda takes %d arguments, however the dimension is %d." % (i, argcount, self.dim)
assert self.numerical_setup["levelset"]["geometry_calculator_stencil"] in DICT_FIRST_DERIVATIVE_CENTER.keys(), "Geometry calculator stencil %s not implemented. Choose from %s." %(self.numerical_setup["geometry_calculator_stencil"], ", ".join(DICT_FIRST_DERIVATIVE_CENTER))
assert self.numerical_setup["levelset"]["volume_fraction_threshold"] >= 0.0, "Volume fraction threshold must be > 0.0."
assert self.numerical_setup["levelset"]["levelset_advection_stencil"] in DICT_DERIVATIVE_LEVELSET_ADVECTION.keys(), "Levelset advection stencil %s not implemented. Choose from %s." % (self.numerical_setup["levelset_advection_stencil"], ", ".join(DICT_DERIVATIVE_LEVELSET_ADVECTION))
assert self.numerical_setup["levelset"]["narrow_band_cutoff"] > 0, "Levelset cutoff narrow band must be > 0."
assert self.numerical_setup["levelset"]["narrow_band_computations"] > 0, "Levelset computations narrow band must be > 0."
# QUANTITY EXTENDER
assert self.numerical_setup["levelset"]["extension"]["time_integrator"] in DICT_TIME_INTEGRATION.keys(), "Quantity extender time integrator %s not implemented. Choose from %s." %(self.numerical_setup["quantity_extender_time_integrator"], ", ".join(DICT_TIME_INTEGRATION.keys()))
assert self.numerical_setup["levelset"]["extension"]["spatial_stencil"] in DICT_DERIVATIVE_QUANTITY_EXTENDER.keys(), "Quantity extender stencil %s not implemented. Choose from %s." %(self.numerical_setup["quantity_extender_stencil"], ", ".join(DICT_DERIVATIVE_QUANTITY_EXTENDER))
assert self.numerical_setup["levelset"]["extension"]["CFL_primes"] > 0.0, "Primes extension CFL must be > 0."
assert self.numerical_setup["levelset"]["extension"]["CFL_interface"] > 0.0, "Interface extension CFL must be > 0."
assert self.numerical_setup["levelset"]["extension"]["steps_primes"] >= 0, "Primes extension steps must be > 0."
assert self.numerical_setup["levelset"]["extension"]["steps_interface"] >= 0, "Interface extension steps must be > 0."
# LEVELSET REINITIALIZER
assert self.numerical_setup["levelset"]["reinitialization"]["time_integrator"] in DICT_TIME_INTEGRATION.keys(), "Levelset reinitializr time integrator %s not implemented. Choose from %s." %(self.numerical_setup["levelset_reinitializer_time_integrator"], ", ".join(DICT_TIME_INTEGRATION.keys()))
assert self.numerical_setup["levelset"]["reinitialization"]["spatial_stencil"] in DICT_DERIVATIVE_REINITIALIZATION.keys(), "Levelset reinitializr stencil %s not implemented. Choose from %s." %(self.numerical_setup["levelset_reinitializer_stencil"], ", ".join(DICT_DERIVATIVE_REINITIALIZATION))
assert self.numerical_setup["levelset"]["reinitialization"]["CFL"] > 0.0, "Reinitialization CFL must be > 0."
assert self.numerical_setup["levelset"]["reinitialization"]["interval"] > 0, "Reinitialization interval must be > 0."
assert self.numerical_setup["levelset"]["reinitialization"]["steps"] >= 0, "Reinitialization steps must be >= 0."
# SOLID VELOCITY
dummy_cell_centers = [jnp.zeros(self.number_of_cells[i]) for i in range(3)]
dummy_mesh_grid = [jnp.meshgrid(*dummy_cell_centers, indexing="ij")[i] for i in self.active_axis_indices]
if type(self.solid_interface_velocity) == types.LambdaType:
argcount = self.solid_interface_velocity.__code__.co_argcount
assert argcount == self.dim + 1, "Solid velocity lambda takes %d arguments, however the dimension is %d." % (argcount, self.dim)
output = self.solid_interface_velocity(*dummy_mesh_grid, 0.0)
assert type(output) == tuple, "Solid velocity lambda output must be tuple with velocities for each active axis."
assert len(output) == self.dim, "Solid velocity lambda outputs %d velocities, however the dimension is %d." % (len(output), self.dim)
elif type(self.solid_interface_velocity) == list:
for i, velocity_object in enumerate(self.solid_interface_velocity):
velocity_function = velocity_object["function"]
argcount = velocity_function.__code__.co_argcount
assert argcount == self.dim + 1, "Solid velocity object %d lambda takes %d arguments, however the dimension is %d." % (i, argcount, self.dim)
output = velocity_function(*dummy_mesh_grid, 0.0)
assert type(output) == tuple, "Solid velocity object %d lambda output must be tuple with velocities for each active axis." % (i)
assert len(output) == self.dim, "Solid velocity object %d lambda outputs %d velocities, however the dimension is %d." % (i, len(output), self.dim)
argcount = velocity_object["bounding_domain"].__code__.co_argcount
assert argcount == self.dim + 1, "Solid velocity object %d bounding domain lambda takes %d arguments, however the dimension is %d." % (i, argcount, self.dim)
# OUTPUT WRITER
quantity_type_list = ["primes", "cons", "real_fluid", "miscellaneous", "levelset"]
for quantity_type in self.numerical_setup["output"]["quantities"].keys():
assert quantity_type in quantity_type_list, "Output quantity type must be in %s." % quantity_type_list
if quantity_type == "real_fluid":
assert self.levelset_type == "FLUID-FLUID", "Requested real fluid output, however, levelset_type is not FLUID-FLUID."
for quantity in self.numerical_setup["output"]["quantities"][quantity_type]:
if quantity == "interface_pressure":
assert self.levelset_type == "FLUID-FLUID", "Requested interface_pressure output, however, levelset_type is not FLUID-FLUID."
if quantity == "interface_velocity":
assert self.levelset_type != "FLUID-SOLID-STATIC", "Requested interface_velocity output, however, levelset_type is FLUID-SOLID-STATIC."
assert quantity in self.available_quantities[quantity_type], "Requested output quantity %s of type %s does not exist. Choose from %s." % (quantity, quantity_type, self.available_quantities[quantity_type])
# LOGGER
assert self.numerical_setup["output"]["logging"] in ["NONE", "INFO", "DEBUG", "INFO_TO_FILE", "DEBUG_TO_FILE"], "Logging must be in [NONE, INFO, DEBUG, INFO_TO_FILE, DEBUG_TO_FILE]"