Source code for wolfhece.mesh2d.simple_2d_f32

import numpy as np
from tqdm import tqdm
from numba import jit
from scipy.optimize import root_scalar
from typing import Literal
from enum import Enum
import logging

"""
Calcul des equations shallow water en 1D section rectangulaire avec frottement selon Manning
-- Version float32 --

Continuity : dh/dt + d(q)/dx = 0
Momentum   : dq/dt + d(q^2/h + 1/2 g h^2)/dx + g h dz/dx = -g h J
Friction Slope : J = n^2 (q/h)^2 / h^(4/3)

Discretisation :

J = n^2 q_t2 q_t1 / h^(10/3)
ghJ = q_t2 (g n^2 q_t1 / h^(7/3))

h_t2 = h_t1 - dt/dx (q_t,r - q_t,l)
q_t2 = 1/(1 + dt g n^2 q_t1 / h^(7/3)) * (q_t1 - dt/dx (q_t,r^2/h_t,r - q_t,l^2/h_t,l + 1/2 g (h_t,r^2 - h_t,l^2) + g h_center_mean (z_t,r - z_t,l))

"""

[docs] def domain(length:float, dx:float, slope:float) -> np.ndarray: """ Create the domain :param length: Length of the domain :param dx: Space step :param slope: Slope of the domain """ nb = int(length/dx) dom = np.zeros(nb+2, dtype=np.float32) x = [-dx/2.] + [(float(i)+.5)*dx for i in range(0,nb)] + [length + dx/2.] z = -slope * np.array(x, dtype=np.float32) return dom, x, z
[docs] def _init_conditions(dom:np.ndarray, h0:float, q0:float) -> np.ndarray: """ Initial conditions :param dom: Domain :param h0: Initial water depth [m] :param q0: Initial discharge [m^2/s] """ h = np.zeros_like(dom) h[1:-1] = h0 q = np.zeros_like(dom) q[1:-1] = q0 return h, q
[docs] def get_friction_slope_2D_Manning(q:float, h:float, n:float) -> float: """ Friction slope based on Manning formula :param q: Discharge [m^2/s] :param h: Water depth [m] :param n: Manning coefficient [m^(1/3)/s] """ return n**2 * q**2 / h**(10/3)
[docs] def compute_dt(dx:float, h:np.ndarray, q:np.ndarray, CN:float) -> float: """ Compute the time step according to the Courant number anf the maximum velocity :param dx: Space step :param h: Water depth :param q: Discharge :param CN: Courant number """ h_pos = np.where(h > 0)[0] dt = CN * dx / (np.max(np.abs(q[h_pos]/h[h_pos]) + np.sqrt(h[h_pos]*np.float32(9.81)))) return dt
[docs] def all_unk_border(dom:np.ndarray, h0:float, q0:float) -> tuple[np.ndarray]: """ Initialize all arrays storing unknowns at center and borders :param dom: Domain :param h0: Initial water depth :param q0: Initial discharge """ h, q = _init_conditions(dom, h0, q0) h_pred = np.zeros_like(dom) h_corr = np.zeros_like(dom) q_pred = np.zeros_like(dom) q_corr = np.zeros_like(dom) q_border = np.zeros((len(dom), 2), dtype=np.float32) h_border = np.zeros((len(dom), 2), dtype=np.float32) z_border = np.zeros((len(dom), 2), dtype=np.float32) u_border = np.zeros(len(dom), dtype=np.float32) h_center = np.zeros(len(dom)-2, dtype=np.float32) u_center = np.zeros(len(dom)-2, dtype=np.float32) return h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center
[docs] def uniform_waterdepth(slope:float, q:float, n:float): """ Compute the uniform water depth for a given slope, discharge and Manning coefficient :param slope: Slope :param q: Discharge [m^2/s] :param n: Manning coefficient """ if n==0. or slope==0.: logging.error("Manning coefficient or slope cannot be null") logging.warning("Return 99999.") return 99999. return root_scalar(lambda h: slope - get_friction_slope_2D_Manning(q, h, n), bracket=[0.1, 100.]).root
# ----------------- # LOSS Coefficients # -----------------
[docs] def k_abrupt_enlargment(asmall:float, alarge:float) -> float: """ Compute the local head loss coefficient of the abrupt enlargment :params asmall: float, area of the section 1 -- smaller section :params alarge: float, area of the section 2 -- larger section """ return (1 - asmall/alarge)**2
[docs] def k_abrupt_contraction(alarge:float, asmall:float) -> float: """ Compute the local head loss coefficient of the abrupt contraction :params alarge: float, area of the section 1 -- larger section :params asmall: float, area of the section 2 -- smaller section """ return .5*(1 - asmall/alarge)
[docs] def head_loss_enlargment(q:float, asmall:float, alarge:float) -> float: """ Compute the head loss of the enlargment. Reference velocity is the velocity in the smaller section. :params q: float, discharge :params asmall: float, area of the section 1 -- smaller section :params alarge: float, area of the section 2 -- larger section """ return k_abrupt_enlargment(asmall, alarge) * (q/asmall)**2 / 2 / 9.81
[docs] def head_loss_contraction(q:float, alarge:float, asmall:float) -> float: """ Compute the head loss of the contraction. Reference velocity is the velocity in the smaller section. :params q: float, discharge :params alarge: float, area of the section 1 -- larger section :params asmall: float, area of the section 2 -- smaller section """ return k_abrupt_contraction(alarge, asmall) * (q/asmall)**2 / 2 / 9.81
[docs] def head_loss_contract_enlarge(q:float, a_up:float, asmall:float, a_down:float) -> float: """ Compute the head loss of the contraction/enlargment. Reference velocity is the velocity in the smaller section. :params q: float, discharge :params a_up: float, area of the section 1 -- larger section :params asmall: float, area of the section 2 -- smaller section :params a_down: float, area of the section 3 -- larger section """ return (k_abrupt_enlargment(asmall, a_down) + k_abrupt_contraction(a_up, asmall)) * (q/asmall)**2 / 2 / 9.81
# ---------------------------- # START JIT compiled functions # ---------------------------- @jit(nopython=True, cache=True)
[docs] def get_friction_slope_2D_Manning_semi_implicit(u:np.ndarray, h:np.ndarray, n:float) -> np.ndarray: """ Friction slope based on Manning formula -- Only semi-implicit formulea for the friction slope :param u: Velocity [m/s] :param h: Water depth [m] :param n: Manning coefficient [m^(1/3)/s] """ return n**2 * np.abs(u) / h**(np.float32(7)/np.float32(3))
@jit(nopython=True, cache=True)
[docs] def Euler_RK(h_t1:np.ndarray, h_t2:np.ndarray, q_t1:np.ndarray, q_t2:np.ndarray, h:np.ndarray, q:np.ndarray, h_border:np.ndarray, q_border:np.ndarray, z:np.ndarray, z_border:np.ndarray, dt:float, dx:float, CL_h:float, CL_q:float, n:float, u_border:np.ndarray, h_center:np.ndarray, u_center:np.ndarray) -> None: """ Solve the mass and momentum equations using a explicit Euler/Runge-Kutta scheme (only 1 step) :param h_t1: Water depth at time t :param h_t2: Water depth at time t+dt (or t_star or t_doublestar if RK) :param q_t1: Discharge at time t :param q_t2: Discharge at time t+dt (or t_star or t_doublestar if RK) :param h: Water depth at the mesh center :param q: Discharge at the mesh center :param h_border: Water depth at the mesh border :param q_border: Discharge at the mesh border :param z: Bed elevation :param z_border: Bed elevation at the mesh border :param dt: Time step :param dx: Space step :param CL_h: Downstream boudary condition for water depth :param CL_q: Upstream boundary condition for discharge :param n: Manning coefficient :param u_border: Velocity at the mesh border :param h_center: Water depth at the mesh center :param u_center: Velocity at the mesh center """ g = np.float32(9.81) slice_mesh = slice(1,-1) slice_right_border = slice(2,None) slice_left_border = slice(1,-1) up = 0 do = 1 # valeur à gauche du bord q_border[slice_right_border, up] = q[1:-1] q_border[1,up] = CL_q h_border[slice_right_border, up] = h[1:-1] h_border[1,up] = h[1] z_border[slice_right_border, up] = z[1:-1] z_border[1,up] = z[1] # valeur à droite du bord q_border[slice_left_border, do] = q[1:-1] q_border[-1,do] = q[-2] h_border[slice_left_border, do] = h[1:-1] h_border[-1,do] = CL_h z_border[slice_left_border, do] = z[1:-1] z_border[-1,do] = z[-2] u_border[1:] = q_border[1:,up]/h_border[1:,up] h_center = (h_border[slice_right_border,do] + h_border[slice_left_border,do])/np.float32(2.) u_center = q[slice_mesh]/h[slice_mesh] #Continuity h_t2[slice_mesh] = h_t1[slice_mesh] - dt/dx * (q_border[slice_right_border,up] - q_border[slice_left_border,up]) # Momentum J = get_friction_slope_2D_Manning_semi_implicit(u_center, h[slice_mesh], n) qm_right = u_border[slice_right_border]*q_border[slice_right_border,up] qm_left = u_border[slice_left_border]*q_border[slice_left_border,up] press_right = np.float32(0.5) * g * h_border[slice_right_border,do]**2 press_left = np.float32(0.5) * g * h_border[slice_left_border,do]**2 bed_right = g * h_center * z_border[slice_right_border,do] bed_left = g * h_center * z_border[slice_left_border,do] q_t2[slice_mesh] = np.float32(1.)/(np.float32(1.) + dt * g *h[slice_mesh] * J) * (q_t1[slice_mesh] - dt/dx * (qm_right - qm_left + press_right - press_left + bed_right - bed_left)) limit_h_q(h_t2, q_t2, hmin=np.float32(1e-3), Froudemax=np.float32(3.))
@jit(nopython=True, cache=True)
[docs] def Euler_RK_wb(h_t1:np.ndarray, h_t2:np.ndarray, q_t1:np.ndarray, q_t2:np.ndarray, h:np.ndarray, q:np.ndarray, h_border:np.ndarray, q_border:np.ndarray, z:np.ndarray, z_border:np.ndarray, dt:float, dx:float, CL_h:float, CL_q:float, n:float, u_border:np.ndarray, h_center:np.ndarray, u_center:np.ndarray) -> None: """ Well-balanced version of Euler_RK (float32) Replaces the separate pressure + bed terms with the factored form: 0.5 * g * (h_r + h_l) * (eta_r - eta_l), eta = h + z This avoids catastrophic cancellation between h^2 terms and bed-slope terms in float32 arithmetic, and yields the exact well-balanced property (zero momentum flux for a lake at rest). """ g = np.float32(9.81) slice_mesh = slice(1,-1) slice_right_border = slice(2,None) slice_left_border = slice(1,-1) up = 0 do = 1 # valeur à gauche du bord q_border[slice_right_border, up] = q[1:-1] q_border[1,up] = CL_q h_border[slice_right_border, up] = h[1:-1] h_border[1,up] = h[1] z_border[slice_right_border, up] = z[1:-1] z_border[1,up] = z[1] # valeur à droite du bord q_border[slice_left_border, do] = q[1:-1] q_border[-1,do] = q[-2] h_border[slice_left_border, do] = h[1:-1] h_border[-1,do] = CL_h z_border[slice_left_border, do] = z[1:-1] z_border[-1,do] = z[-2] u_border[1:] = q_border[1:,up]/h_border[1:,up] h_center = (h_border[slice_right_border,do] + h_border[slice_left_border,do])/np.float32(2.) u_center = q[slice_mesh]/h[slice_mesh] #Continuity h_t2[slice_mesh] = h_t1[slice_mesh] - dt/dx * (q_border[slice_right_border,up] - q_border[slice_left_border,up]) # Momentum J = get_friction_slope_2D_Manning_semi_implicit(u_center, h[slice_mesh], n) qm_right = u_border[slice_right_border]*q_border[slice_right_border,up] qm_left = u_border[slice_left_border]*q_border[slice_left_border,up] # Well-balanced: 0.5*g*(h_r+h_l)*(eta_r - eta_l), eta = h + z h_right = h_border[slice_right_border, do] h_left = h_border[slice_left_border, do] eta_right = h_right + z_border[slice_right_border, do] eta_left = h_left + z_border[slice_left_border, do] press_bed = np.float32(0.5) * g * (h_right + h_left) * (eta_right - eta_left) q_t2[slice_mesh] = np.float32(1.)/(np.float32(1.) + dt * g *h[slice_mesh] * J) * (q_t1[slice_mesh] - dt/dx * (qm_right - qm_left + press_bed)) limit_h_q(h_t2, q_t2, hmin=np.float32(1e-3), Froudemax=np.float32(3.))
@jit(nopython=True, cache=True)
[docs] def Euler_RK_hedge(h_t1:np.ndarray, h_t2:np.ndarray, q_t1:np.ndarray, q_t2:np.ndarray, h:np.ndarray, q:np.ndarray, h_border:np.ndarray, q_border:np.ndarray, z:np.ndarray, z_border:np.ndarray, dt:float, dx:float, CL_h:float, CL_q:float, n:float, u_border:np.ndarray, h_center:np.ndarray, u_center:np.ndarray, theta:np.ndarray, theta_border:np.ndarray) -> None: """ Solve the mass and momentum equations using a explicit Euler/Runge-Kutta scheme (only 1 step) :param h_t1: Water depth at time t :param h_t2: Water depth at time t+dt (or t_star or t_doublestar if RK) :param q_t1: Discharge at time t :param q_t2: Discharge at time t+dt (or t_star or t_doublestar if RK) :param h: Water depth at the mesh center :param q: Discharge at the mesh center :param h_border: Water depth at the mesh border :param q_border: Discharge at the mesh border :param z: Bed elevation :param z_border: Bed elevation at the mesh border :param dt: Time step :param dx: Space step :param CL_h: Downstream boudary condition for water depth :param CL_q: Upstream boundary condition for discharge :param n: Manning coefficient :param u_border: Velocity at the mesh border :param h_center: Water depth at the mesh center :param u_center: Velocity at the mesh center """ g = np.float32(9.81) slice_mesh = slice(1,-1) slice_right_border = slice(2,None) slice_left_border = slice(1,-1) up = 0 do = 1 # valeur à gauche du bord # ----------------------- q_border[slice_right_border, up] = q[1:-1] q_border[1,up] = CL_q h_border[slice_right_border, up] = h[1:-1] h_border[1,up] = h[1] z_border[slice_right_border, up] = z[1:-1] z_border[1,up] = z[1] theta_border[slice_right_border, up] = theta[1:-1] theta_border[1,up] = theta[1] # valeur à droite du bord # ------------------------ q_border[slice_left_border, do] = q[1:-1] q_border[-1,do] = q[-2] h_border[slice_left_border, do] = h[1:-1] h_border[-1,do] = CL_h z_border[slice_left_border, do] = z[1:-1] z_border[-1,do] = z[-2] theta_border[slice_left_border, do] = theta[1:-1] theta_border[-1,do] = theta[-2] u_border[1:] = q_border[1:,up]/(h_border[1:,up] * theta_border[1:,up]) h_center = (theta_border[slice_right_border,do] * h_border[slice_right_border,do] + theta_border[slice_left_border,do] * h_border[slice_left_border,do])/np.float32(2.) u_center = q[slice_mesh]/(h[slice_mesh]*theta[slice_mesh]) #Continuity h_t2[slice_mesh] = h_t1[slice_mesh] - dt/dx * (q_border[slice_right_border,up] - q_border[slice_left_border,up]) # Momentum J = get_friction_slope_2D_Manning_semi_implicit(u_center, h[slice_mesh], n) qm_right = u_border[slice_right_border]*q_border[slice_right_border,up] qm_left = u_border[slice_left_border ]*q_border[slice_left_border,up] press_right = np.float32(0.5) * g * (theta_border[slice_right_border,do] * h_border[slice_right_border,do])**2 press_left = np.float32(0.5) * g * (theta_border[slice_left_border ,do] * h_border[slice_left_border ,do])**2 bed_right = g * h_center * (z_border[slice_right_border,do] + (np.float32(1.)-theta_border[slice_right_border,do]) * h_border[slice_right_border,do]) bed_left = g * h_center * (z_border[slice_left_border,do] + (np.float32(1.)-theta_border[slice_left_border,do]) * h_border[slice_left_border,do]) q_t2[slice_mesh] = np.float32(1.)/(np.float32(1.) + dt * g *h[slice_mesh] * J) * (q_t1[slice_mesh] - dt/dx * (qm_right - qm_left + press_right - press_left + bed_right - bed_left)) limit_h_q(h_t2, q_t2, hmin=np.float32(1e-3), Froudemax=np.float32(3.))
@jit(nopython=True, cache=True)
[docs] def splitting(q_left:np.float32, q_right:np.float32, h_left:np.float32, h_right:np.float32, z_left:np.float32, z_right:np.float32, z_bridge_left:np.float32, z_bridge_right:np.float32) -> np.ndarray: """ Splitting of the unknowns at border between two nodes -- Based on the WOLF HECE original scheme :param q_left: Discharge at the left-side of the border :param q_right: Discharge at the right-side of the border :param h_left: Water depth at the left-side of the border :param h_right: Water depth at the right-side of the border :param z_left: Bed elevation at the left-side of the border :param z_right: Bed elevation at the right-side of the border :param z_bridge_left: Bridge elevation at the left-side of the border :param z_bridge_right: Bridge elevation at the right-side of the border :return: Array of the unknowns according to the WOLF HECE scheme """ prod_q = q_left * q_right sum_q = q_left + q_right if prod_q > 0.: if q_left > 0.: return np.asarray([q_left, min(h_left, z_bridge_left-z_left), h_right, z_right, z_bridge_right], dtype=np.float32) else: return np.asarray([q_right, min(h_right, z_bridge_right-z_right), h_left, z_left, z_bridge_left], dtype=np.float32) elif prod_q < 0.: if sum_q > 0.: return np.asarray([q_left, min(h_left, z_bridge_left-z_left), h_right, z_right, z_bridge_right], dtype=np.float32) elif sum_q < 0.: return np.asarray([q_right, min(h_right, z_bridge_right-z_right), h_left, z_left, z_bridge_left], dtype=np.float32) else: return np.asarray([np.float32(0.), np.float32(1.), (h_left + h_right) / np.float32(2.), (z_left + z_right) / np.float32(2.), (z_bridge_left + z_bridge_right) / np.float32(2.)], dtype=np.float32) else: if q_left<0.: return np.asarray([np.float32(0.), np.float32(1.), h_left, z_left, z_bridge_left], dtype=np.float32) elif q_right<0.: return np.asarray([np.float32(0.), np.float32(1.), h_right, z_right, z_bridge_right], dtype=np.float32) else: return np.asarray([sum_q / np.float32(2.), # q (min(h_left, z_bridge_left-z_left) + min(h_right, z_bridge_right-z_right)) / np.float32(2.), # h_vel (h_left + h_right) / np.float32(2.), # h (z_left + z_right) / np.float32(2.), # z (z_bridge_left + z_bridge_right) / np.float32(2.)], dtype=np.float32) # z_bridge
@jit(nopython=True, cache=True)
[docs] def Euler_RK_bridge(h_t1:np.ndarray, h_t2:np.ndarray, q_t1:np.ndarray, q_t2:np.ndarray, h:np.ndarray, q:np.ndarray, h_border:np.ndarray, q_border:np.ndarray, z:np.ndarray, z_border:np.ndarray, dt:float, dx:float, CL_h:float, CL_q:float, n:float, u_border:np.ndarray, h_center:np.ndarray, u_center:np.ndarray, z_bridge:np.ndarray, z_bridge_border:np.ndarray, infil_exfil=None) -> None: """ Solve the mass and momentum equations using a explicit Euler/Runge-Kutta scheme (only 1 step) applying source terms for infiltration/exfiltration and pressure at the roof. :param h_t1: Water depth at time t :param h_t2: Water depth at time t+dt (or t_star or t_doublestar if RK) :param q_t1: Discharge at time t :param q_t2: Discharge at time t+dt (or t_star or t_doublestar if RK) :param h: Water depth at the mesh center :param q: Discharge at the mesh center :param h_border: Water depth at the mesh border :param q_border: Discharge at the mesh border :param z: Bed elevation :param z_border: Bed elevation at the mesh border :param dt: Time step :param dx: Space step :param CL_h: Downstream boudary condition for water depth :param CL_q: Upstream boundary condition for discharge :param n: Manning coefficient :param u_border: Velocity at the mesh border :param h_center: Water depth at the mesh center :param u_center: Velocity at the mesh center :param z_bridge: Bridge elevation at the mesh center :param z_bridge_border: Bridge elevation at the mesh border :param infil_exfil: Infiltration/exfiltration parameters """ g = np.float32(9.81) slice_mesh = slice(1,-1) # L R L = Left border, R = Right border # l|r l|r l = left value, r = right value # | i | i = node indice # | | # i i+1 i, i+1 = associated borders slice_right_border = slice(2,None) # right border - left value - slice associated to the center values slice_left_border = slice(1,-1) # left border - right value - slice associated to the center values up = 0 do = 1 # altitude du pont z_bridge_copy = z_bridge.copy() fs_cells = np.where(h <= z_bridge - z)[0] # mailles à surface libre press_cells = np.where(h > z_bridge - z)[0] # mailles sous-pression z_bridge_copy[fs_cells] = z[fs_cells] + h[fs_cells] # on recopie l'altitude de SL # valeur à gauche du bord # ----------------------- # altitude du fond z_border[slice_right_border, up] = z[1:-1] z_border[1,up] = z[1] # tout en amont, recopie de la valeur intérieure h_border[slice_right_border, up] = h[1:-1] h_border[1,up] = h[1] # tout en amont, recopie de la valeur intérieure q_border[slice_right_border, up] = q[1:-1] q_border[1,up] = CL_q # condition limite de debit z_bridge_border[slice_right_border, up] = z_bridge_copy[1:-1] z_bridge_border[1,up] = z_bridge_copy[1] # valeur à droite du bord # ------------------------ z_border[slice_left_border, do] = z[1:-1] z_border[-1,do] = z[-2] # tout en aval, recopie de la valeur intérieure h_border[slice_left_border, do] = h[1:-1] h_border[-1,do] = CL_h # condition limite de hauteur q_border[slice_left_border, do] = q[1:-1] q_border[-1,do] = q[-2] # tout en aval, recopie de la valeur intérieure z_bridge_border[slice_left_border, do] = z_bridge_copy[1:-1] z_bridge_border[-1,do] = CL_h + z_border[-1,do] # z_bridge_copy[-2] for i in range(1, len(h)-1): qc_right, h4u_right, hc_right, zc_right, zbc_right = splitting(q_border[i+1,up], q_border[i+1, do], h_border[i+1,up], h_border[i+1,do], z_border[i+1,up], z_border[i+1,do], z_bridge_border[i+1,up], z_bridge_border[i+1,do]) qc_left, h4u_left, hc_left, zc_left, zbc_left = splitting(q_border[i,up], q_border[i, do], h_border[i,up], h_border[i,do], z_border[i,up], z_border[i,do], z_bridge_border[i,up], z_bridge_border[i,do]) # Continuity # ++++++++++ h_t2[i] = h_t1[i] - dt/dx * (qc_right - qc_left) # Momentum # ++++++++ # Limited section at the right border and at the left border -- decentred downstream d_right = zbc_right - zc_right d_left = zbc_left - zc_left # Pressure on the roof -- 0. if free surface press_roof_right = hc_right - d_right press_roof_left = hc_left - d_left # Pressure integral at the right border and at the left border press_right = np.float32(0.5) * g * (hc_right**np.float32(2.) - press_roof_right**np.float32(2.)) press_left = np.float32(0.5) * g * (hc_left**np.float32(2.) - press_roof_left**np.float32(2.)) # Friction slope based on center values u_center = q[i] / (z_bridge_copy[i] - z[i]) # Number of surfaces nb_frott = np.float32(2.) if h[i] > z_bridge[i] - z[i] else np.float32(1.) # Integration water depth h_frott = min(h[i], z_bridge[i] - z[i]) # Slope J = get_friction_slope_2D_Manning_semi_implicit(u_center, h_frott/nb_frott, n) # Velocity at the right border and at the left border -- decentred upstream u_right = qc_right / h4u_right u_left = qc_left / h4u_left # Momentum at the right border and at the left border qm_right = u_right * qc_right qm_left = u_left * qc_left # Mean pressure impacting bed reaction h_mean = (hc_right + hc_left)/np.float32(2.) bed_right = g * h_mean * zc_right bed_left = g * h_mean * zc_left # Mean pressure impacting roof reaction h_roof_right = max(hc_right + zc_right - zbc_right, np.float32(0.)) h_roof_left = max(hc_left + zc_left - zbc_left , np.float32(0.)) h_roof_mean = (h_roof_right + h_roof_left) / np.float32(2.) roof_right = g * h_roof_mean * zbc_right roof_left = g * h_roof_mean * zbc_left rhs = (qm_right - qm_left + press_right - press_left + bed_right - bed_left - roof_right + roof_left) if rhs !=0.: pass q_t2[i] = np.float32(1.)/(np.float32(1.) + dt * g * h_frott * J) * (q_t1[i] - dt/dx * rhs) if infil_exfil is not None: idx_up, idx_do, q_infil_exfil, u_infil_exfil, pond, k = infil_exfil i = idx_up qc_right, h4u_right, hc_right, zc_right, zbc_right = splitting(q_border[i+1,up], q_border[i+1, do], h_border[i+1,up], h_border[i+1,do], z_border[i+1,up], z_border[i+1,do], z_bridge_border[i+1,up], z_bridge_border[i+1,do]) qc_left, h4u_left, hc_left, zc_left, zbc_left = splitting(q_border[i,up], q_border[i, do], h_border[i,up], h_border[i,do], z_border[i,up], z_border[i,do], z_bridge_border[i,up], z_bridge_border[i,do]) # Limited section at the right border and at the left border -- decentred downstream d_right = zbc_right - zc_right d_left = zbc_left - zc_left # Pressure on the roof -- 0. if free surface press_roof_right = hc_right - d_right press_roof_left = hc_left - d_left # Pressure integral at the right border and at the left border press_right = np.float32(0.5) * g * (hc_right**np.float32(2.) - press_roof_right**np.float32(2.)) press_left = np.float32(0.5) * g * (hc_left**np.float32(2.) - press_roof_left**np.float32(2.)) # Friction slope based on center values u_center = q[i] / (z_bridge_copy[i] - z[i]) # Number of surfaces nb_frott = np.float32(2.) if h[i] > z_bridge[i] - z[i] else np.float32(1.) # Integration water depth h_frott = min(h[i], z_bridge[i] - z[i]) # Slope J = get_friction_slope_2D_Manning_semi_implicit(u_center, h_frott/nb_frott, n) # Velocity at the right border and at the left border -- decentred upstream u_right = qc_right / h4u_right u_left = qc_left / h4u_left # Momentum at the right border and at the left border qm_right = u_right * qc_right qm_left = u_left * qc_left # Mean pressure impacting bed reaction h_mean = (hc_right + hc_left)/np.float32(2.) bed_right = g * h_mean * zc_right bed_left = g * h_mean * zc_left # Mean pressure impacting roof reaction h_roof_right = max(hc_right + zc_right - zbc_right, np.float32(0.)) h_roof_left = max(hc_left + zc_left - zbc_left , np.float32(0.)) h_roof_mean = (h_roof_right + h_roof_left) / np.float32(2.) roof_right = g * h_roof_mean * zbc_right roof_left = g * h_roof_mean * zbc_left h_t2[i] = h_t1[i] - dt/dx * (qc_right - qc_left + q_infil_exfil) q_t2[i] = np.float32(1.)/(np.float32(1.) + dt * g * h_frott * J) * (q_t1[i] - dt/dx * (qm_right - qm_left + \ press_right - press_left + \ bed_right - bed_left - \ roof_right + roof_left + \ u_infil_exfil * q_infil_exfil)) i = idx_do qc_right, h4u_right, hc_right, zc_right, zbc_right = splitting(q_border[i+1,up], q_border[i+1, do], h_border[i+1,up], h_border[i+1,do], z_border[i+1,up], z_border[i+1,do], z_bridge_border[i+1,up], z_bridge_border[i+1,do]) qc_left, h4u_left, hc_left, zc_left, zbc_left = splitting(q_border[i,up], q_border[i, do], h_border[i,up], h_border[i,do], z_border[i,up], z_border[i,do], z_bridge_border[i,up], z_bridge_border[i,do]) # Limited section at the right border and at the left border -- decentred downstream d_right = zbc_right - zc_right d_left = zbc_left - zc_left # Pressure on the roof -- 0. if free surface press_roof_right = hc_right - d_right press_roof_left = hc_left - d_left # Pressure integral at the right border and at the left border press_right = np.float32(0.5) * g * (hc_right**np.float32(2.) - press_roof_right**np.float32(2.)) press_left = np.float32(0.5) * g * (hc_left**np.float32(2.) - press_roof_left**np.float32(2.)) # Friction slope based on center values u_center = q[i] / (z_bridge_copy[i] - z[i]) # Number of surfaces nb_frott = np.float32(2.) if h[i] > z_bridge[i] - z[i] else np.float32(1.) # Integration water depth h_frott = min(h[i], z_bridge[i] - z[i]) # Slope J = get_friction_slope_2D_Manning_semi_implicit(u_center, h_frott/nb_frott, n) # Velocity at the right border and at the left border -- decentred upstream u_right = qc_right / h4u_right u_left = qc_left / h4u_left # Momentum at the right border and at the left border qm_right = u_right * qc_right qm_left = u_left * qc_left # Mean pressure impacting bed reaction h_mean = (hc_right + hc_left)/np.float32(2.) bed_right = g * h_mean * zc_right bed_left = g * h_mean * zc_left # Mean pressure impacting roof reaction h_roof_right = max(hc_right + zc_right - zbc_right, np.float32(0.)) h_roof_left = max(hc_left + zc_left - zbc_left , np.float32(0.)) h_roof_mean = (h_roof_right + h_roof_left) / np.float32(2.) roof_right = g * h_roof_mean * zbc_right roof_left = g * h_roof_mean * zbc_left h_t2[i] = h_t1[i] - dt/dx * (qc_right - qc_left - q_infil_exfil) q_t2[i] = np.float32(1.)/(np.float32(1.) + dt * g * h_frott * J) * (q_t1[i] - dt/dx * (qm_right - qm_left + \ press_right - press_left + \ bed_right - bed_left - \ roof_right + roof_left -\ u_infil_exfil * q_infil_exfil)) limit_h_q(h_t2, q_t2, hmin=np.float32(1e-3), Froudemax=np.float32(3.))
@ jit(nopython=True, cache=True)
[docs] def limit_h_q(h:np.ndarray, q:np.ndarray, hmin:float = 0., Froudemax:float = 3.) -> None: """ Limit the water depth and the discharge :param h: Water depth [m] :param q: Discharge [m^2/s] :param hmin: Minimum water depth [m] :param Froudemax: Maximum Froude number [-] """ g = np.float32(9.81) # retrieve positive and negative values hpos = np.where(h > hmin) hneg = np.where(h <= hmin) # limit water depth h[hneg] = hmin q[hneg] = np.float32(0.) # limit discharge based on Froude number Fr = np.zeros_like(h) Fr[hpos] = np.abs(q[hpos]) / h[hpos] / np.sqrt(g * h[hpos]) q[Fr > Froudemax] = Froudemax * np.sqrt(g * h[Fr > Froudemax]) * h[Fr > Froudemax] * np.sign(q[Fr > Froudemax])
# -------------------------- # END JIT compiled functions # -------------------------- # ---------------------- # START Problems section # ----------------------
[docs] MAX_TIME = np.float32(3600)
[docs] def problem(dom:np.ndarray, z:np.ndarray, h0:float, q0:float, dx:float, CN:float, n:float, h_init:np.ndarray=None, q_init:np.ndarray=None): """ Solve the mass and momentum equations using a explicit Runge-Kutta scheme (2 steps - 2nd order) **NO BRIDGE** """ #Ensure dom, z is Float32 dom = dom.astype(np.float32) z = z.astype(np.float32) h0 = np.float32(h0) q0 = np.float32(q0) dx = np.float32(dx) CN = np.float32(CN) n = np.float32(n) h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center = all_unk_border(dom, h0, q0) if h_init is not None: h[:] = h_init[:] if q_init is not None: q[:] = q_init[:] totaltime = MAX_TIME eps = np.float32(1.) with tqdm(total=float(totaltime)) as pbar: t = 0. while eps > 2e-7 and t < totaltime: dt = compute_dt(dx, h, q, CN) # Predictor step Euler_RK(h, h_pred, q, q_pred, h, q, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center) # Corrector step Euler_RK(h, h_corr, q, q_corr, h_pred, q_pred, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center) # Update -- Mean for second order in time h = (h_pred + h_corr)/np.float32(2.) q = (q_pred + q_corr)/np.float32(2.) t+=dt # eps = np.max(np.abs(q - q_pred)) eps = np.max(np.abs(q[1:-1] - q0)) pbar.update(float(dt)) print("Total time : ", t) print("Residual : ", eps) return h, q
[docs] def problem_wb(dom:np.ndarray, z:np.ndarray, h0:float, q0:float, dx:float, CN:float, n:float, h_init:np.ndarray=None, q_init:np.ndarray=None): """ Solve the mass and momentum equations using a explicit Runge-Kutta scheme (2 steps - 2nd order) **NO BRIDGE** """ #Ensure dom, z is Float32 dom = dom.astype(np.float32) z = z.astype(np.float32) h0 = np.float32(h0) q0 = np.float32(q0) dx = np.float32(dx) CN = np.float32(CN) n = np.float32(n) h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center = all_unk_border(dom, h0, q0) if h_init is not None: h[:] = h_init[:] if q_init is not None: q[:] = q_init[:] totaltime = MAX_TIME eps = np.float32(1.) with tqdm(total=float(totaltime)) as pbar: t = 0. while eps > 2e-7 and t < totaltime: dt = compute_dt(dx, h, q, CN) # Predictor step Euler_RK_wb(h, h_pred, q, q_pred, h, q, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center) # Corrector step Euler_RK_wb(h, h_corr, q, q_corr, h_pred, q_pred, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center) # Update -- Mean for second order in time h = (h_pred + h_corr)/np.float32(2.) q = (q_pred + q_corr)/np.float32(2.) t+=dt # eps = np.max(np.abs(q - q_pred)) eps = np.max(np.abs(q[1:-1] - q0)) pbar.update(float(dt)) print("Total time : ", t) print("Residual : ", eps) return h, q
[docs] def _problem_convergence(dom:np.ndarray, z:np.ndarray, h0:float, q0:float, dx:float, CN:float, n:float): """ Solve the mass and momentum equations using a explicit Runge-Kutta scheme (2 steps - 2nd order) **NO BRIDGE** """ #Ensure dom, z is Float32 dom = dom.astype(np.float32) z = z.astype(np.float32) h0 = np.float32(h0) q0 = np.float32(q0) dx = np.float32(dx) CN = np.float32(CN) n = np.float32(n) h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center = all_unk_border(dom, h0, q0) totaltime = MAX_TIME eps = np.float32(1.) all_eps = np.zeros((1_000_000,2), dtype=np.float32) all_time = np.zeros(1_000_000, dtype=np.float32) with tqdm(total=float(totaltime)) as pbar: t = 0. k = 0 while eps > 1e-7 and t < totaltime: dt = compute_dt(dx, h, q, CN) # Predictor step Euler_RK(h, h_pred, q, q_pred, h, q, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center) # Corrector step Euler_RK(h, h_corr, q, q_corr, h_pred, q_pred, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center) # Update -- Mean for second order in time h = (h_pred + h_corr)/np.float32(2.) q = (q_pred + q_corr)/np.float32(2.) t+=dt eps_h = np.sum(np.abs(h - h_pred)) eps_q = np.sum(np.abs(q - q_pred)) eps = max(eps_h, eps_q) all_eps[k] = [eps_h, eps_q] all_time[k] = t k+=1 pbar.update(float(dt)) print("Total time : ", t) print("Residual : ", eps_q) return h, q, all_eps[:k], all_time[:k]
[docs] def _problem_convergence_wb(dom:np.ndarray, z:np.ndarray, h0:float, q0:float, dx:float, CN:float, n:float): """ Well-balanced variant of _problem_convergence (float32) Uses Euler_RK_wb which avoids catastrophic cancellation in the pressure + bed-slope gradient by computing: 0.5 * g * (h_r + h_l) * (eta_r - eta_l), eta = h + z """ dom = dom.astype(np.float32) z = z.astype(np.float32) h0 = np.float32(h0) q0 = np.float32(q0) dx = np.float32(dx) CN = np.float32(CN) n = np.float32(n) h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center = all_unk_border(dom, h0, q0) totaltime = MAX_TIME eps = np.float32(1.) all_eps = np.zeros((1_000_000, 2), dtype=np.float32) all_time = np.zeros(1_000_000, dtype=np.float32) with tqdm(total=float(totaltime)) as pbar: t = 0. k = 0 while eps > np.float32(1e-7) and t < totaltime: dt = compute_dt(dx, h, q, CN) # Predictor step Euler_RK_wb(h, h_pred, q, q_pred, h, q, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center) # Corrector step Euler_RK_wb(h, h_corr, q, q_corr, h_pred, q_pred, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center) # Update -- Mean for second order in time h = (h_pred + h_corr)/np.float32(2.) q = (q_pred + q_corr)/np.float32(2.) t += dt eps_h = np.max(np.abs(h - h_pred)) eps_q = np.max(np.abs(q - q_pred)) eps = max(eps_h, eps_q) all_eps[k] = [eps_h, eps_q] all_time[k] = t k += 1 pbar.update(float(dt)) print("Total time : ", t) print("Residual : ", eps) return h, q, all_eps[:k], all_time[:k]
[docs] def problem_hedge(dom:np.ndarray, z:np.ndarray, h0:float, q0:float, dx:float, CN:float, n:float): """ Solve the mass and momentum equations using a explicit Runge-Kutta scheme (2 steps - 2nd order) **NO BRIDGE but HEDGE in the middle** """ h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center = all_unk_border(dom, h0, q0) theta = np.ones_like(h) theta_border = np.ones_like(h_border) slice_hedge = slice(len(h) // 2-1, len(h) // 2+2) theta_val = np.float32(0.5) theta[slice_hedge] = theta_val totaltime = MAX_TIME eps = 1. with tqdm(total=float(totaltime)) as pbar: t = 0. while eps > 1e-7 and t < totaltime: dt = compute_dt(dx, h, q, CN) # Predictor step Euler_RK_hedge(h, h_pred, q, q_pred, h, q, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center, theta, theta_border) # Corrector step Euler_RK_hedge(h, h_corr, q, q_corr, h_pred, q_pred, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center, theta, theta_border) # Update -- Mean for second order in time h = (h_pred + h_corr)/np.float32(2.) q = (q_pred + q_corr)/np.float32(2.) t+=dt eps = np.sum(np.abs(q - q_pred)) pbar.update(float(dt)) print("Total time : ", t) print("Residual : ", eps) return h, q, theta
[docs] def problem_bridge(dom:np.ndarray, z:np.ndarray, z_bridge:np.ndarray, h0:float, q0:float, dx:float, CN:float, n:float, h_ini:np.ndarray = None, q_ini:np.ndarray = None) -> tuple[np.ndarray]: """ Solve the mass and momentum equations using a explicit Rung-Kutta scheme (2 steps - 2nd order) **WITH BRIDGE and NO OVERFLOW** """ h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center = all_unk_border(dom, h0, q0) z_bridge_border = np.zeros_like(z_border) if h_ini is not None: h[:] = h_ini[:] if q_ini is not None: q[:] = q_ini[:] totaltime = MAX_TIME eps = 1. with tqdm(total=float(totaltime)) as pbar: t = 0. while eps > 1e-7 and t < totaltime: dt = compute_dt(dx, h, q, CN) # Predictor step Euler_RK_bridge(h, h_pred, q, q_pred, h, q, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center, z_bridge, z_bridge_border) # Corrector step Euler_RK_bridge(h, h_corr, q, q_corr, h_pred, q_pred, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center, z_bridge, z_bridge_border) # Update -- Mean for second order in time h = (h_pred + h_corr)/np.float32(2.) q = (q_pred + q_corr)/np.float32(2.) t+=dt eps = np.sum(np.abs(q - q_pred)) pbar.update(float(dt)) print("Total time : ", t) print("Residual : ", eps) return h, q
[docs] def problem_bridge_multiple_steadystates(dom:np.ndarray, z:np.ndarray, z_bridge:np.ndarray, h0:float, qmin:float, qmax:float, dx:float, CN:float, n:float, ) -> list[tuple[float, np.ndarray, np.ndarray]]: """ Solve multiple steady states for a given discharge range """ all_q = np.arange(qmin, qmax+.1, (qmax-qmin)/10, dtype=np.float32) ret = [] h = None for curq in all_q: h, q = problem_bridge(dom, z, z_bridge, h0, curq, dx, CN, n, h_ini=h, q_ini=None) ret.append((0., h.copy(), q.copy())) return ret
# -------------------- # END Problems section # --------------------