Source code for jaxfluids.turb.turb_stats_manager

#*------------------------------------------------------------------------------*
#* JAX-FLUIDS -                                                                 *
#*                                                                              *
#* A fully-differentiable CFD solver for compressible two-phase flows.          *
#* Copyright (C) 2022  Deniz A. Bezgin, Aaron B. Buhendwa, Nikolaus A. Adams    *
#*                                                                              *
#* This program is free software: you can redistribute it and/or modify         *
#* it under the terms of the GNU General Public License as published by         *
#* the Free Software Foundation, either version 3 of the License, or            *
#* (at your option) any later version.                                          *
#*                                                                              *
#* This program is distributed in the hope that it will be useful,              *
#* but WITHOUT ANY WARRANTY; without even the implied warranty of               *
#* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the                *
#* GNU General Public License for more details.                                 *
#*                                                                              *
#* You should have received a copy of the GNU General Public License            *
#* along with this program.  If not, see <https://www.gnu.org/licenses/>.       *
#*                                                                              *
#*------------------------------------------------------------------------------*
#*                                                                              *
#* CONTACT                                                                      *
#*                                                                              *
#* deniz.bezgin@tum.de // aaron.buhendwa@tum.de // nikolaus.adams@tum.de        *
#*                                                                              *
#*------------------------------------------------------------------------------*
#*                                                                              *
#* Munich, April 15th, 2022                                                     *
#*                                                                              *
#*------------------------------------------------------------------------------*

from typing import Dict, Tuple

import jax.numpy as jnp

from jaxfluids.domain_information import DomainInformation
from jaxfluids.materials.material_manager import MaterialManager

[docs] class TurbStatsManager: """ Provides functionality to calculate statistics of turbulent flows. The TurbStatsManager provides turbulent statistics of the initial flow field as well as cumulative statistics over the course of a simulation. """ # TODO Include functionality for postprocessing. def __init__(self, domain_information: DomainInformation, material_manager: MaterialManager = None) -> None: self.domain_information = domain_information self.material_manager = material_manager self.eps = 1e-8 self.Nx, self.Ny, self.Nz = domain_information.number_of_cells assert (self.Nx == self.Ny and self.Ny == self.Nz), "The present implementation only works for cubic domains." self.initialize()
[docs] def initialize(self) -> None: self.k_field, self.k_vec = self._get_real_wavenumber_grid(self.Nx) self.k2_field = self.k_field[0]*self.k_field[0] + self.k_field[1]*self.k_field[1] + self.k_field[2]*self.k_field[2] self.one_k2_field = 1.0 / (self.k2_field + self.eps) self.k_mag_vec = jnp.sqrt( self.k_vec**2 ) self.k_mag_field = jnp.sqrt( self.k2_field ) self.shell = (self.k_mag_field + 0.5).astype(int) self.fact = 2 * (self.k_field[0] > 0) * (self.k_field[0] < self.Nx//2) + 1 * (self.k_field[0] == 0) + 1 * (self.k_field[0] == self.Nx//2)
def _get_real_wavenumber_grid(self, N: int) -> Tuple[jnp.ndarray, jnp.ndarray]: """Initializes wavenumber grid and wavenumber vector. :param N: Resolution. :type N: int :return: Wavenumber grid and wavenumber vector. :rtype: Tuple[jnp.ndarray, jnp.ndarray] """ Nf = N//2 + 1 k = jnp.fft.fftfreq(N, 1./N) # for y and z direction kx = k[:Nf] kx = kx.at[-1].mul(-1) k_field = jnp.array(jnp.meshgrid(kx, k, k, indexing="ij"), dtype=int) return k_field, k
[docs] def energy_spectrum_spectral(self, velocity_hat: jnp.ndarray) -> jnp.ndarray: """Calculates the three-dimensional spectral energy spectrum of the input velocity. :param velocity_hat: Velocity vector in spectral space. :type velocity_hat: jnp.ndarray :return: Spectral energy spectrum. :rtype: jnp.ndarray """ ek = jnp.zeros(self.Nx) n_samples = jnp.zeros(self.Nx) uu = self.fact * 0.5 * (jnp.abs(velocity_hat[0]*velocity_hat[0]) + jnp.abs(velocity_hat[1]*velocity_hat[1]) + jnp.abs(velocity_hat[2]*velocity_hat[2])) ek = ek.at[self.shell.flatten()].add(uu.flatten()) n_samples = n_samples.at[self.shell.flatten()].add(self.fact.flatten()) ek = ek * 4 * jnp.pi * self.k_vec**2 / (n_samples + self.eps) / (self.Nx**3) / (self.Nx**3) return ek
[docs] def energy_spectrum_physical(self, velocity: jnp.ndarray) -> jnp.ndarray: """Calculates the three-dimensional spectral energy spectrum of the input velocity. Wrapper around self.energy_spectrum_spectral :param velocity: Velocity vector in physical space. :type velocity: jnp.ndarray :return: Spectral energy spectrum. :rtype: jnp.ndarray """ velocity_hat = jnp.stack([jnp.fft.rfftn(velocity[ii], axes=(2,1,0)) for ii in range(3)]) return self.energy_spectrum_spectral(velocity_hat)
[docs] def get_turbulent_statistics(self, primes: jnp.ndarray) -> Dict: """Computes the turbulent statistics for the given primitive buffer. :param primes: Buffer of primitive variables. :type primes: jnp.ndarray :return: Dictionary with turbulent statistics. :rtype: Dict """ # TODO Include other setups besides HIT return self.hit_statistics(primes)
[docs] def hit_statistics(self, primes: jnp.ndarray) -> Dict: """Calculates statistics for homogeneous isotropic turbulence. :param primes: Buffer of primitive variables. :type primes: jnp.ndarray :return: Dictionary with information on the HIT statistics. :rtype: Dict """ rho = primes[0] velocity = primes[1:4] pressure = primes[4] velocity_hat = jnp.stack([jnp.fft.rfftn(velocity[ii], axes=(2,1,0)) for ii in range(3)]) Nx, Ny, Nz = self.Nx, self.Ny, self.Nz temperature = self.material_manager.get_temperature(p=pressure, rho=rho) rho_mean = jnp.mean(rho) rho_rms = jnp.std(rho) p_mean = jnp.mean(pressure) p_rms = jnp.std(pressure) T_mean = jnp.mean(temperature) T_rms = jnp.std(temperature) speed_of_sound = self.material_manager.get_speed_of_sound(p=pressure, rho=rho) speed_of_sound_mean_1 = self.material_manager.get_speed_of_sound(p=p_mean, rho=rho_mean) speed_of_sound_mean_2 = jnp.mean(speed_of_sound) mu = self.material_manager.get_dynamic_viscosity(temperature) mu_mean = jnp.mean(mu) nu = mu / rho nu_mean = jnp.mean(nu) u_rms = u_prime = jnp.sqrt( jnp.mean( velocity[0]**2 + velocity[1]**2 + velocity[2]**2 ) /3) q_rms = jnp.sqrt( jnp.mean( velocity[0]**2 + velocity[1]**2 + velocity[2]**2 ) ) Ma_t = jnp.sqrt( jnp.mean( velocity[0]**2 + velocity[1]**2 + velocity[2]**2 ) ) / speed_of_sound_mean_1 Ma_rms = jnp.sqrt( jnp.mean( (velocity[0]**2 + velocity[1]**2 + velocity[2]**2) / speed_of_sound ) ) TKE = 0.5 * jnp.mean(rho * (velocity[0]**2 + velocity[1]**2 + velocity[2]**2)) duidj = self.calculate_sheartensor_spectral(velocity_hat) S_ij = 0.5 * ( duidj + jnp.transpose(duidj, axes=(1,0,2,3,4)) ) SijSij_bar = jnp.mean(jnp.sum(S_ij**2, axis=(0,1))) eps = 2 * mu_mean / rho_mean * SijSij_bar vorticity = self.calculate_vorticity_spectral(velocity_hat) vorticity_rms = jnp.sqrt(jnp.mean(vorticity[0]**2 + vorticity[1]**2 + vorticity[2]**2)/3) enstrophy_mean = jnp.mean(vorticity[0]**2 + vorticity[1]**2 + vorticity[2]**2) dilatation = self.calculate_dilatation_spectral(velocity_hat) dilatation_rms = jnp.std(dilatation) divergence_rms = jnp.sqrt(jnp.mean( duidj[0,0]**2 + duidj[1,1]**2 + duidj[2,2]**2 )) ek = self.energy_spectrum_spectral(velocity_hat) # LENGTH SCALES # Integral length scale int_Ek = jnp.trapz(ek) int_Ek_over_k = jnp.trapz(ek / (self.k_vec + 1e-10)) L_I = 0.75 * jnp.pi * int_Ek_over_k / int_Ek # Longitudinal and lateral Taylor length scale lambda_ = jnp.sqrt( u_rms**2 / jnp.mean(duidj[0,0]**2) ) lambda_g = jnp.sqrt(2 * u_rms**2 / jnp.mean(duidj[0,0]**2) ) lambda_f = jnp.sqrt(2 * u_rms**2 / jnp.mean(duidj[0,1]**2) ) # Kolmogorov eta = (nu_mean**3 / eps)**0.25 kmax = jnp.sqrt(2) * Nx / 3 eta_kmax = eta * kmax # TIME SCALES tau_LI = L_I / u_rms # Large-eddy-turnover time tau_lambda = lambda_f / u_rms # Eddy turnover time # REYNOLDS NUMBER Re_turb = rho_mean * q_rms**4 / eps / mu_mean Re_lambda = lambda_f * u_rms * rho_mean / mu_mean Re_lambda_2 = lambda_ * u_rms * rho_mean / mu_mean Re_int = u_rms * L_I * rho_mean / mu_mean turb_stats_dict = { "FLOW STATE": { "P MEAN": p_mean, "p_rms": p_rms, "RHO MEAN": rho_mean, "rho_rms": rho_rms, "T MEAN": T_mean, "T_rms": T_rms, "C MEAN": speed_of_sound_mean_1, "MU MEAN": mu_mean, "Ma T": Ma_t, "Ma_rms": Ma_rms, "U RMS": u_rms, "TKE": TKE, "ENSTROPHY MEAN": enstrophy_mean, "DILATATION RMS": dilatation_rms, "DIVERGENCE RMS": divergence_rms, }, "REYNOLDS NUMBERS": { "RE LAMBDA": Re_lambda, "RE LAMBDA2": Re_lambda_2, "RE INTEGRAL": Re_int, "RE TURBULENT": Re_turb, }, "LENGTH SCALES": { "INTEGRAL": L_I, "TAYLOR MICRO": lambda_, "LONGITUDINAL TAYLOR": lambda_g, "LATERAL TAYLOR": lambda_f, "KOLMOGOROV": eta, "KOLMOGOROV RESOLUTION": eta_kmax, }, "TIME SCALES": { "LARGE EDDY TURNOVER": tau_LI, "EDDY TURNOVER": tau_lambda, }, } return turb_stats_dict
[docs] def calculate_vorticity_spectral(self, velocity_hat: jnp.ndarray) -> jnp.ndarray: """Calculates the vortiticy of the input velocity field. Calculation done in spectral space. omega = (du3/dx2 - du2/dx3, du1/dx3 - du3/dx1, du2/dx1 - du1/dx2) :param velocity_hat: Buffer of velocities in spectral space. :type velocity_hat: jnp.ndarray :return: Vorticity vector in physical space. :rtype: jnp.ndarray """ omega_0 = jnp.fft.irfftn(1j * (self.k_field[1] * velocity_hat[2] - self.k_field[2] * velocity_hat[1]), axes=(2,1,0)) omega_1 = jnp.fft.irfftn(1j * (self.k_field[2] * velocity_hat[0] - self.k_field[0] * velocity_hat[2]), axes=(2,1,0)) omega_2 = jnp.fft.irfftn(1j * (self.k_field[0] * velocity_hat[1] - self.k_field[1] * velocity_hat[0]), axes=(2,1,0)) omega = jnp.stack([omega_0, omega_1, omega_2]) return omega
[docs] def calculate_vorticity(self, velocity: jnp.ndarray) -> jnp.ndarray: """Calculates the vortiticy of the input velocity field. Calculation done in spectral space. omega = [ du3/dx2 - du2/dx3 du1/dx3 - du3/dx1 du2/dx1 - du1/dx2] :param velocity: Buffer of velocities in physical space. :type velocity: jnp.ndarray :return: Vorticity vector in physical space. :rtype: jnp.ndarray """ velocity_hat = jnp.stack([jnp.fft.rfftn(velocity[ii], axes=(2,1,0)) for ii in range(3)]) return self.calculate_vorticity_spectral(velocity_hat)
[docs] def calculate_sheartensor_spectral(self, velocity_hat: jnp.ndarray) -> jnp.ndarray: """Calculates the shear tensor in spectral space. dui/dxj = IFFT ( 1j * k_j * u_i_hat ) :param velocity_hat: Buffer of velocities in spectral space. :type velocity_hat: jnp.ndarray :return: Buffer of the shear tensor. :rtype: jnp.ndarray """ duidj = [[], [], []] for ii in range(3): for jj in range(3): duidj[ii].append(jnp.fft.irfftn(1j * self.k_field[jj] * velocity_hat[ii], axes=(2,1,0))) return jnp.array(duidj)
[docs] def calculate_sheartensor(self, velocity: jnp.ndarray) -> jnp.ndarray: """Calculates the shear tensor in spectral space. Wrapper around self.calculate_sheartensor_spectral(). duidj = [ du1/dx1 du1/dx2 du1/dx3 du2/dx1 du2/dx2 du2/dx3 du3/dx1 du3/dx2 du3/dx3 ] :param velocity: Buffer of velocities in physical space. :type velocity: jnp.ndarray :return: Buffer of the shear tensor. :rtype: jnp.ndarray """ velocity_hat = jnp.stack([jnp.fft.rfftn(velocity[ii], axes=(2,1,0)) for ii in range(3)]) return self.calculate_sheartensor_spectral(velocity_hat)
[docs] def calculate_dilatation_spectral(self, velocity_hat: jnp.ndarray) -> jnp.ndarray: """Calculates the dilatation of the given velocity field in spectral space. :param velocity_hat: Buffer of velocities in spectral space. :type velocity_hat: jnp.ndarray :return: Buffer of the dilatational field. :rtype: jnp.ndarray """ dilatation_spectral = 1j * (self.k_field[0] * velocity_hat[0] + self.k_field[1] * velocity_hat[1] + self.k_field[2] * velocity_hat[2]) dilatation_real = jnp.fft.irfftn(dilatation_spectral, axes=(2,1,0)) return dilatation_real
[docs] def calculate_dilatation(self, velocity: jnp.ndarray) -> jnp.ndarray: """_summary_ Calculates dilatation in spectral space velocity: (3, Nx, Ny, Nz) array dilatation: (Nx, Ny, Nz) array dilatation = du1/dx1 + du2/dx2 + du3/dx3 :param velocity: Buffer of velocities in physical space. :type velocity: jnp.ndarray :return: Buffer of dilatational field. :rtype: jnp.ndarray """ velocity_hat = jnp.stack([jnp.fft.rfftn(velocity[ii], axes=(2,1,0)) for ii in range(3)]) return self.calculate_dilatation_spectral(velocity_hat)
[docs] def calculate_strain(duidj: jnp.ndarray) -> jnp.ndarray: """Calculates the strain given the velocity gradient tensor. :param duidj: Buffer of velocity gradient. :type duidj: jnp.ndarray :return: Buffer of strain tensor. :rtype: jnp.ndarray """ S_ij = 0.5 * ( duidj + jnp.transpose(duidj, axes=(1,0,2,3,4)) ) return S_ij