import numpy as np
from tqdm import tqdm
from numba import jit
from scipy.optimize import root_scalar
from typing import Literal
from enum import Enum
import logging
"""
Calcul des equations shallow water en 1D section rectangulaire avec frottement selon Manning
-- Version float32 --
Continuity : dh/dt + d(q)/dx = 0
Momentum : dq/dt + d(q^2/h + 1/2 g h^2)/dx + g h dz/dx = -g h J
Friction Slope : J = n^2 (q/h)^2 / h^(4/3)
Discretisation :
J = n^2 q_t2 q_t1 / h^(10/3)
ghJ = q_t2 (g n^2 q_t1 / h^(7/3))
h_t2 = h_t1 - dt/dx (q_t,r - q_t,l)
q_t2 = 1/(1 + dt g n^2 q_t1 / h^(7/3)) * (q_t1 - dt/dx (q_t,r^2/h_t,r - q_t,l^2/h_t,l + 1/2 g (h_t,r^2 - h_t,l^2) + g h_center_mean (z_t,r - z_t,l))
"""
[docs]
def domain(length:float, dx:float, slope:float) -> np.ndarray:
""" Create the domain
:param length: Length of the domain
:param dx: Space step
:param slope: Slope of the domain
"""
nb = int(length/dx)
dom = np.zeros(nb+2, dtype=np.float32)
x = [-dx/2.] + [(float(i)+.5)*dx for i in range(0,nb)] + [length + dx/2.]
z = -slope * np.array(x, dtype=np.float32)
return dom, x, z
[docs]
def _init_conditions(dom:np.ndarray, h0:float, q0:float) -> np.ndarray:
""" Initial conditions
:param dom: Domain
:param h0: Initial water depth [m]
:param q0: Initial discharge [m^2/s]
"""
h = np.zeros_like(dom)
h[1:-1] = h0
q = np.zeros_like(dom)
q[1:-1] = q0
return h, q
[docs]
def get_friction_slope_2D_Manning(q:float, h:float, n:float) -> float:
""" Friction slope based on Manning formula
:param q: Discharge [m^2/s]
:param h: Water depth [m]
:param n: Manning coefficient [m^(1/3)/s]
"""
return n**2 * q**2 / h**(10/3)
[docs]
def compute_dt(dx:float, h:np.ndarray, q:np.ndarray, CN:float) -> float:
""" Compute the time step according to the Courant number anf the maximum velocity
:param dx: Space step
:param h: Water depth
:param q: Discharge
:param CN: Courant number
"""
h_pos = np.where(h > 0)[0]
dt = CN * dx / (np.max(np.abs(q[h_pos]/h[h_pos]) + np.sqrt(h[h_pos]*np.float32(9.81))))
return dt
[docs]
def all_unk_border(dom:np.ndarray, h0:float, q0:float) -> tuple[np.ndarray]:
""" Initialize all arrays storing unknowns at center and borders
:param dom: Domain
:param h0: Initial water depth
:param q0: Initial discharge
"""
h, q = _init_conditions(dom, h0, q0)
h_pred = np.zeros_like(dom)
h_corr = np.zeros_like(dom)
q_pred = np.zeros_like(dom)
q_corr = np.zeros_like(dom)
q_border = np.zeros((len(dom), 2), dtype=np.float32)
h_border = np.zeros((len(dom), 2), dtype=np.float32)
z_border = np.zeros((len(dom), 2), dtype=np.float32)
u_border = np.zeros(len(dom), dtype=np.float32)
h_center = np.zeros(len(dom)-2, dtype=np.float32)
u_center = np.zeros(len(dom)-2, dtype=np.float32)
return h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center
# -----------------
# LOSS Coefficients
# -----------------
[docs]
def k_abrupt_enlargment(asmall:float, alarge:float) -> float:
""" Compute the local head loss coefficient of the abrupt enlargment
:params asmall: float, area of the section 1 -- smaller section
:params alarge: float, area of the section 2 -- larger section
"""
return (1 - asmall/alarge)**2
[docs]
def k_abrupt_contraction(alarge:float, asmall:float) -> float:
""" Compute the local head loss coefficient of the abrupt contraction
:params alarge: float, area of the section 1 -- larger section
:params asmall: float, area of the section 2 -- smaller section
"""
return .5*(1 - asmall/alarge)
[docs]
def head_loss_enlargment(q:float, asmall:float, alarge:float) -> float:
""" Compute the head loss of the enlargment.
Reference velocity is the velocity in the smaller section.
:params q: float, discharge
:params asmall: float, area of the section 1 -- smaller section
:params alarge: float, area of the section 2 -- larger section
"""
return k_abrupt_enlargment(asmall, alarge) * (q/asmall)**2 / 2 / 9.81
[docs]
def head_loss_contraction(q:float, alarge:float, asmall:float) -> float:
""" Compute the head loss of the contraction.
Reference velocity is the velocity in the smaller section.
:params q: float, discharge
:params alarge: float, area of the section 1 -- larger section
:params asmall: float, area of the section 2 -- smaller section
"""
return k_abrupt_contraction(alarge, asmall) * (q/asmall)**2 / 2 / 9.81
[docs]
def head_loss_contract_enlarge(q:float, a_up:float, asmall:float, a_down:float) -> float:
""" Compute the head loss of the contraction/enlargment.
Reference velocity is the velocity in the smaller section.
:params q: float, discharge
:params a_up: float, area of the section 1 -- larger section
:params asmall: float, area of the section 2 -- smaller section
:params a_down: float, area of the section 3 -- larger section
"""
return (k_abrupt_enlargment(asmall, a_down) + k_abrupt_contraction(a_up, asmall)) * (q/asmall)**2 / 2 / 9.81
# ----------------------------
# START JIT compiled functions
# ----------------------------
@jit(nopython=True, cache=True)
[docs]
def get_friction_slope_2D_Manning_semi_implicit(u:np.ndarray, h:np.ndarray, n:float) -> np.ndarray:
""" Friction slope based on Manning formula -- Only semi-implicit formulea for the friction slope
:param u: Velocity [m/s]
:param h: Water depth [m]
:param n: Manning coefficient [m^(1/3)/s]
"""
return n**2 * np.abs(u) / h**(np.float32(7)/np.float32(3))
@jit(nopython=True, cache=True)
[docs]
def Euler_RK(h_t1:np.ndarray, h_t2:np.ndarray,
q_t1:np.ndarray, q_t2:np.ndarray,
h:np.ndarray, q:np.ndarray,
h_border:np.ndarray, q_border:np.ndarray,
z:np.ndarray, z_border:np.ndarray,
dt:float, dx:float,
CL_h:float, CL_q:float,
n:float, u_border:np.ndarray,
h_center:np.ndarray, u_center:np.ndarray) -> None:
""" Solve the mass and momentum equations using a explicit Euler/Runge-Kutta scheme (only 1 step)
:param h_t1: Water depth at time t
:param h_t2: Water depth at time t+dt (or t_star or t_doublestar if RK)
:param q_t1: Discharge at time t
:param q_t2: Discharge at time t+dt (or t_star or t_doublestar if RK)
:param h: Water depth at the mesh center
:param q: Discharge at the mesh center
:param h_border: Water depth at the mesh border
:param q_border: Discharge at the mesh border
:param z: Bed elevation
:param z_border: Bed elevation at the mesh border
:param dt: Time step
:param dx: Space step
:param CL_h: Downstream boudary condition for water depth
:param CL_q: Upstream boundary condition for discharge
:param n: Manning coefficient
:param u_border: Velocity at the mesh border
:param h_center: Water depth at the mesh center
:param u_center: Velocity at the mesh center
"""
g = np.float32(9.81)
slice_mesh = slice(1,-1)
slice_right_border = slice(2,None)
slice_left_border = slice(1,-1)
up = 0
do = 1
# valeur à gauche du bord
q_border[slice_right_border, up] = q[1:-1]
q_border[1,up] = CL_q
h_border[slice_right_border, up] = h[1:-1]
h_border[1,up] = h[1]
z_border[slice_right_border, up] = z[1:-1]
z_border[1,up] = z[1]
# valeur à droite du bord
q_border[slice_left_border, do] = q[1:-1]
q_border[-1,do] = q[-2]
h_border[slice_left_border, do] = h[1:-1]
h_border[-1,do] = CL_h
z_border[slice_left_border, do] = z[1:-1]
z_border[-1,do] = z[-2]
u_border[1:] = q_border[1:,up]/h_border[1:,up]
h_center = (h_border[slice_right_border,do] + h_border[slice_left_border,do])/np.float32(2.)
u_center = q[slice_mesh]/h[slice_mesh]
#Continuity
h_t2[slice_mesh] = h_t1[slice_mesh] - dt/dx * (q_border[slice_right_border,up] - q_border[slice_left_border,up])
# Momentum
J = get_friction_slope_2D_Manning_semi_implicit(u_center, h[slice_mesh], n)
qm_right = u_border[slice_right_border]*q_border[slice_right_border,up]
qm_left = u_border[slice_left_border]*q_border[slice_left_border,up]
press_right = np.float32(0.5) * g * h_border[slice_right_border,do]**2
press_left = np.float32(0.5) * g * h_border[slice_left_border,do]**2
bed_right = g * h_center * z_border[slice_right_border,do]
bed_left = g * h_center * z_border[slice_left_border,do]
q_t2[slice_mesh] = np.float32(1.)/(np.float32(1.) + dt * g *h[slice_mesh] * J) * (q_t1[slice_mesh] - dt/dx * (qm_right - qm_left + press_right - press_left + bed_right - bed_left))
limit_h_q(h_t2, q_t2, hmin=np.float32(1e-3), Froudemax=np.float32(3.))
@jit(nopython=True, cache=True)
[docs]
def Euler_RK_wb(h_t1:np.ndarray, h_t2:np.ndarray,
q_t1:np.ndarray, q_t2:np.ndarray,
h:np.ndarray, q:np.ndarray,
h_border:np.ndarray, q_border:np.ndarray,
z:np.ndarray, z_border:np.ndarray,
dt:float, dx:float,
CL_h:float, CL_q:float,
n:float, u_border:np.ndarray,
h_center:np.ndarray, u_center:np.ndarray) -> None:
""" Well-balanced version of Euler_RK (float32)
Replaces the separate pressure + bed terms with the factored form:
0.5 * g * (h_r + h_l) * (eta_r - eta_l), eta = h + z
This avoids catastrophic cancellation between h^2 terms and bed-slope
terms in float32 arithmetic, and yields the exact well-balanced property
(zero momentum flux for a lake at rest).
"""
g = np.float32(9.81)
slice_mesh = slice(1,-1)
slice_right_border = slice(2,None)
slice_left_border = slice(1,-1)
up = 0
do = 1
# valeur à gauche du bord
q_border[slice_right_border, up] = q[1:-1]
q_border[1,up] = CL_q
h_border[slice_right_border, up] = h[1:-1]
h_border[1,up] = h[1]
z_border[slice_right_border, up] = z[1:-1]
z_border[1,up] = z[1]
# valeur à droite du bord
q_border[slice_left_border, do] = q[1:-1]
q_border[-1,do] = q[-2]
h_border[slice_left_border, do] = h[1:-1]
h_border[-1,do] = CL_h
z_border[slice_left_border, do] = z[1:-1]
z_border[-1,do] = z[-2]
u_border[1:] = q_border[1:,up]/h_border[1:,up]
h_center = (h_border[slice_right_border,do] + h_border[slice_left_border,do])/np.float32(2.)
u_center = q[slice_mesh]/h[slice_mesh]
#Continuity
h_t2[slice_mesh] = h_t1[slice_mesh] - dt/dx * (q_border[slice_right_border,up] - q_border[slice_left_border,up])
# Momentum
J = get_friction_slope_2D_Manning_semi_implicit(u_center, h[slice_mesh], n)
qm_right = u_border[slice_right_border]*q_border[slice_right_border,up]
qm_left = u_border[slice_left_border]*q_border[slice_left_border,up]
# Well-balanced: 0.5*g*(h_r+h_l)*(eta_r - eta_l), eta = h + z
h_right = h_border[slice_right_border, do]
h_left = h_border[slice_left_border, do]
eta_right = h_right + z_border[slice_right_border, do]
eta_left = h_left + z_border[slice_left_border, do]
press_bed = np.float32(0.5) * g * (h_right + h_left) * (eta_right - eta_left)
q_t2[slice_mesh] = np.float32(1.)/(np.float32(1.) + dt * g *h[slice_mesh] * J) * (q_t1[slice_mesh] - dt/dx * (qm_right - qm_left + press_bed))
limit_h_q(h_t2, q_t2, hmin=np.float32(1e-3), Froudemax=np.float32(3.))
@jit(nopython=True, cache=True)
[docs]
def Euler_RK_hedge(h_t1:np.ndarray, h_t2:np.ndarray,
q_t1:np.ndarray, q_t2:np.ndarray,
h:np.ndarray, q:np.ndarray,
h_border:np.ndarray, q_border:np.ndarray,
z:np.ndarray, z_border:np.ndarray,
dt:float, dx:float,
CL_h:float, CL_q:float,
n:float, u_border:np.ndarray,
h_center:np.ndarray, u_center:np.ndarray,
theta:np.ndarray, theta_border:np.ndarray) -> None:
""" Solve the mass and momentum equations using a explicit Euler/Runge-Kutta scheme (only 1 step)
:param h_t1: Water depth at time t
:param h_t2: Water depth at time t+dt (or t_star or t_doublestar if RK)
:param q_t1: Discharge at time t
:param q_t2: Discharge at time t+dt (or t_star or t_doublestar if RK)
:param h: Water depth at the mesh center
:param q: Discharge at the mesh center
:param h_border: Water depth at the mesh border
:param q_border: Discharge at the mesh border
:param z: Bed elevation
:param z_border: Bed elevation at the mesh border
:param dt: Time step
:param dx: Space step
:param CL_h: Downstream boudary condition for water depth
:param CL_q: Upstream boundary condition for discharge
:param n: Manning coefficient
:param u_border: Velocity at the mesh border
:param h_center: Water depth at the mesh center
:param u_center: Velocity at the mesh center
"""
g = np.float32(9.81)
slice_mesh = slice(1,-1)
slice_right_border = slice(2,None)
slice_left_border = slice(1,-1)
up = 0
do = 1
# valeur à gauche du bord
# -----------------------
q_border[slice_right_border, up] = q[1:-1]
q_border[1,up] = CL_q
h_border[slice_right_border, up] = h[1:-1]
h_border[1,up] = h[1]
z_border[slice_right_border, up] = z[1:-1]
z_border[1,up] = z[1]
theta_border[slice_right_border, up] = theta[1:-1]
theta_border[1,up] = theta[1]
# valeur à droite du bord
# ------------------------
q_border[slice_left_border, do] = q[1:-1]
q_border[-1,do] = q[-2]
h_border[slice_left_border, do] = h[1:-1]
h_border[-1,do] = CL_h
z_border[slice_left_border, do] = z[1:-1]
z_border[-1,do] = z[-2]
theta_border[slice_left_border, do] = theta[1:-1]
theta_border[-1,do] = theta[-2]
u_border[1:] = q_border[1:,up]/(h_border[1:,up] * theta_border[1:,up])
h_center = (theta_border[slice_right_border,do] * h_border[slice_right_border,do] + theta_border[slice_left_border,do] * h_border[slice_left_border,do])/np.float32(2.)
u_center = q[slice_mesh]/(h[slice_mesh]*theta[slice_mesh])
#Continuity
h_t2[slice_mesh] = h_t1[slice_mesh] - dt/dx * (q_border[slice_right_border,up] - q_border[slice_left_border,up])
# Momentum
J = get_friction_slope_2D_Manning_semi_implicit(u_center, h[slice_mesh], n)
qm_right = u_border[slice_right_border]*q_border[slice_right_border,up]
qm_left = u_border[slice_left_border ]*q_border[slice_left_border,up]
press_right = np.float32(0.5) * g * (theta_border[slice_right_border,do] * h_border[slice_right_border,do])**2
press_left = np.float32(0.5) * g * (theta_border[slice_left_border ,do] * h_border[slice_left_border ,do])**2
bed_right = g * h_center * (z_border[slice_right_border,do] + (np.float32(1.)-theta_border[slice_right_border,do]) * h_border[slice_right_border,do])
bed_left = g * h_center * (z_border[slice_left_border,do] + (np.float32(1.)-theta_border[slice_left_border,do]) * h_border[slice_left_border,do])
q_t2[slice_mesh] = np.float32(1.)/(np.float32(1.) + dt * g *h[slice_mesh] * J) * (q_t1[slice_mesh] - dt/dx * (qm_right - qm_left + press_right - press_left + bed_right - bed_left))
limit_h_q(h_t2, q_t2, hmin=np.float32(1e-3), Froudemax=np.float32(3.))
@jit(nopython=True, cache=True)
[docs]
def splitting(q_left:np.float32, q_right:np.float32,
h_left:np.float32, h_right:np.float32,
z_left:np.float32, z_right:np.float32,
z_bridge_left:np.float32, z_bridge_right:np.float32) -> np.ndarray:
""" Splitting of the unknowns at border between two nodes
-- Based on the WOLF HECE original scheme
:param q_left: Discharge at the left-side of the border
:param q_right: Discharge at the right-side of the border
:param h_left: Water depth at the left-side of the border
:param h_right: Water depth at the right-side of the border
:param z_left: Bed elevation at the left-side of the border
:param z_right: Bed elevation at the right-side of the border
:param z_bridge_left: Bridge elevation at the left-side of the border
:param z_bridge_right: Bridge elevation at the right-side of the border
:return: Array of the unknowns according to the WOLF HECE scheme
"""
prod_q = q_left * q_right
sum_q = q_left + q_right
if prod_q > 0.:
if q_left > 0.:
return np.asarray([q_left, min(h_left, z_bridge_left-z_left), h_right, z_right, z_bridge_right], dtype=np.float32)
else:
return np.asarray([q_right, min(h_right, z_bridge_right-z_right), h_left, z_left, z_bridge_left], dtype=np.float32)
elif prod_q < 0.:
if sum_q > 0.:
return np.asarray([q_left, min(h_left, z_bridge_left-z_left), h_right, z_right, z_bridge_right], dtype=np.float32)
elif sum_q < 0.:
return np.asarray([q_right, min(h_right, z_bridge_right-z_right), h_left, z_left, z_bridge_left], dtype=np.float32)
else:
return np.asarray([np.float32(0.), np.float32(1.), (h_left + h_right) / np.float32(2.), (z_left + z_right) / np.float32(2.), (z_bridge_left + z_bridge_right) / np.float32(2.)], dtype=np.float32)
else:
if q_left<0.:
return np.asarray([np.float32(0.), np.float32(1.), h_left, z_left, z_bridge_left], dtype=np.float32)
elif q_right<0.:
return np.asarray([np.float32(0.), np.float32(1.), h_right, z_right, z_bridge_right], dtype=np.float32)
else:
return np.asarray([sum_q / np.float32(2.), # q
(min(h_left, z_bridge_left-z_left) + min(h_right, z_bridge_right-z_right)) / np.float32(2.), # h_vel
(h_left + h_right) / np.float32(2.), # h
(z_left + z_right) / np.float32(2.), # z
(z_bridge_left + z_bridge_right) / np.float32(2.)], dtype=np.float32) # z_bridge
@jit(nopython=True, cache=True)
[docs]
def Euler_RK_bridge(h_t1:np.ndarray, h_t2:np.ndarray,
q_t1:np.ndarray, q_t2:np.ndarray,
h:np.ndarray, q:np.ndarray,
h_border:np.ndarray, q_border:np.ndarray,
z:np.ndarray, z_border:np.ndarray,
dt:float, dx:float,
CL_h:float, CL_q:float,
n:float, u_border:np.ndarray,
h_center:np.ndarray, u_center:np.ndarray,
z_bridge:np.ndarray, z_bridge_border:np.ndarray,
infil_exfil=None) -> None:
"""
Solve the mass and momentum equations using a explicit Euler/Runge-Kutta scheme (only 1 step)
applying source terms for infiltration/exfiltration and pressure at the roof.
:param h_t1: Water depth at time t
:param h_t2: Water depth at time t+dt (or t_star or t_doublestar if RK)
:param q_t1: Discharge at time t
:param q_t2: Discharge at time t+dt (or t_star or t_doublestar if RK)
:param h: Water depth at the mesh center
:param q: Discharge at the mesh center
:param h_border: Water depth at the mesh border
:param q_border: Discharge at the mesh border
:param z: Bed elevation
:param z_border: Bed elevation at the mesh border
:param dt: Time step
:param dx: Space step
:param CL_h: Downstream boudary condition for water depth
:param CL_q: Upstream boundary condition for discharge
:param n: Manning coefficient
:param u_border: Velocity at the mesh border
:param h_center: Water depth at the mesh center
:param u_center: Velocity at the mesh center
:param z_bridge: Bridge elevation at the mesh center
:param z_bridge_border: Bridge elevation at the mesh border
:param infil_exfil: Infiltration/exfiltration parameters
"""
g = np.float32(9.81)
slice_mesh = slice(1,-1)
# L R L = Left border, R = Right border
# l|r l|r l = left value, r = right value
# | i | i = node indice
# | |
# i i+1 i, i+1 = associated borders
slice_right_border = slice(2,None) # right border - left value - slice associated to the center values
slice_left_border = slice(1,-1) # left border - right value - slice associated to the center values
up = 0
do = 1
# altitude du pont
z_bridge_copy = z_bridge.copy()
fs_cells = np.where(h <= z_bridge - z)[0] # mailles à surface libre
press_cells = np.where(h > z_bridge - z)[0] # mailles sous-pression
z_bridge_copy[fs_cells] = z[fs_cells] + h[fs_cells] # on recopie l'altitude de SL
# valeur à gauche du bord
# -----------------------
# altitude du fond
z_border[slice_right_border, up] = z[1:-1]
z_border[1,up] = z[1] # tout en amont, recopie de la valeur intérieure
h_border[slice_right_border, up] = h[1:-1]
h_border[1,up] = h[1] # tout en amont, recopie de la valeur intérieure
q_border[slice_right_border, up] = q[1:-1]
q_border[1,up] = CL_q # condition limite de debit
z_bridge_border[slice_right_border, up] = z_bridge_copy[1:-1]
z_bridge_border[1,up] = z_bridge_copy[1]
# valeur à droite du bord
# ------------------------
z_border[slice_left_border, do] = z[1:-1]
z_border[-1,do] = z[-2] # tout en aval, recopie de la valeur intérieure
h_border[slice_left_border, do] = h[1:-1]
h_border[-1,do] = CL_h # condition limite de hauteur
q_border[slice_left_border, do] = q[1:-1]
q_border[-1,do] = q[-2] # tout en aval, recopie de la valeur intérieure
z_bridge_border[slice_left_border, do] = z_bridge_copy[1:-1]
z_bridge_border[-1,do] = CL_h + z_border[-1,do] # z_bridge_copy[-2]
for i in range(1, len(h)-1):
qc_right, h4u_right, hc_right, zc_right, zbc_right = splitting(q_border[i+1,up], q_border[i+1, do], h_border[i+1,up], h_border[i+1,do], z_border[i+1,up], z_border[i+1,do], z_bridge_border[i+1,up], z_bridge_border[i+1,do])
qc_left, h4u_left, hc_left, zc_left, zbc_left = splitting(q_border[i,up], q_border[i, do], h_border[i,up], h_border[i,do], z_border[i,up], z_border[i,do], z_bridge_border[i,up], z_bridge_border[i,do])
# Continuity
# ++++++++++
h_t2[i] = h_t1[i] - dt/dx * (qc_right - qc_left)
# Momentum
# ++++++++
# Limited section at the right border and at the left border -- decentred downstream
d_right = zbc_right - zc_right
d_left = zbc_left - zc_left
# Pressure on the roof -- 0. if free surface
press_roof_right = hc_right - d_right
press_roof_left = hc_left - d_left
# Pressure integral at the right border and at the left border
press_right = np.float32(0.5) * g * (hc_right**np.float32(2.) - press_roof_right**np.float32(2.))
press_left = np.float32(0.5) * g * (hc_left**np.float32(2.) - press_roof_left**np.float32(2.))
# Friction slope based on center values
u_center = q[i] / (z_bridge_copy[i] - z[i])
# Number of surfaces
nb_frott = np.float32(2.) if h[i] > z_bridge[i] - z[i] else np.float32(1.)
# Integration water depth
h_frott = min(h[i], z_bridge[i] - z[i])
# Slope
J = get_friction_slope_2D_Manning_semi_implicit(u_center, h_frott/nb_frott, n)
# Velocity at the right border and at the left border -- decentred upstream
u_right = qc_right / h4u_right
u_left = qc_left / h4u_left
# Momentum at the right border and at the left border
qm_right = u_right * qc_right
qm_left = u_left * qc_left
# Mean pressure impacting bed reaction
h_mean = (hc_right + hc_left)/np.float32(2.)
bed_right = g * h_mean * zc_right
bed_left = g * h_mean * zc_left
# Mean pressure impacting roof reaction
h_roof_right = max(hc_right + zc_right - zbc_right, np.float32(0.))
h_roof_left = max(hc_left + zc_left - zbc_left , np.float32(0.))
h_roof_mean = (h_roof_right + h_roof_left) / np.float32(2.)
roof_right = g * h_roof_mean * zbc_right
roof_left = g * h_roof_mean * zbc_left
rhs = (qm_right - qm_left + press_right - press_left + bed_right - bed_left - roof_right + roof_left)
if rhs !=0.:
pass
q_t2[i] = np.float32(1.)/(np.float32(1.) + dt * g * h_frott * J) * (q_t1[i] - dt/dx * rhs)
if infil_exfil is not None:
idx_up, idx_do, q_infil_exfil, u_infil_exfil, pond, k = infil_exfil
i = idx_up
qc_right, h4u_right, hc_right, zc_right, zbc_right = splitting(q_border[i+1,up], q_border[i+1, do], h_border[i+1,up], h_border[i+1,do], z_border[i+1,up], z_border[i+1,do], z_bridge_border[i+1,up], z_bridge_border[i+1,do])
qc_left, h4u_left, hc_left, zc_left, zbc_left = splitting(q_border[i,up], q_border[i, do], h_border[i,up], h_border[i,do], z_border[i,up], z_border[i,do], z_bridge_border[i,up], z_bridge_border[i,do])
# Limited section at the right border and at the left border -- decentred downstream
d_right = zbc_right - zc_right
d_left = zbc_left - zc_left
# Pressure on the roof -- 0. if free surface
press_roof_right = hc_right - d_right
press_roof_left = hc_left - d_left
# Pressure integral at the right border and at the left border
press_right = np.float32(0.5) * g * (hc_right**np.float32(2.) - press_roof_right**np.float32(2.))
press_left = np.float32(0.5) * g * (hc_left**np.float32(2.) - press_roof_left**np.float32(2.))
# Friction slope based on center values
u_center = q[i] / (z_bridge_copy[i] - z[i])
# Number of surfaces
nb_frott = np.float32(2.) if h[i] > z_bridge[i] - z[i] else np.float32(1.)
# Integration water depth
h_frott = min(h[i], z_bridge[i] - z[i])
# Slope
J = get_friction_slope_2D_Manning_semi_implicit(u_center, h_frott/nb_frott, n)
# Velocity at the right border and at the left border -- decentred upstream
u_right = qc_right / h4u_right
u_left = qc_left / h4u_left
# Momentum at the right border and at the left border
qm_right = u_right * qc_right
qm_left = u_left * qc_left
# Mean pressure impacting bed reaction
h_mean = (hc_right + hc_left)/np.float32(2.)
bed_right = g * h_mean * zc_right
bed_left = g * h_mean * zc_left
# Mean pressure impacting roof reaction
h_roof_right = max(hc_right + zc_right - zbc_right, np.float32(0.))
h_roof_left = max(hc_left + zc_left - zbc_left , np.float32(0.))
h_roof_mean = (h_roof_right + h_roof_left) / np.float32(2.)
roof_right = g * h_roof_mean * zbc_right
roof_left = g * h_roof_mean * zbc_left
h_t2[i] = h_t1[i] - dt/dx * (qc_right - qc_left + q_infil_exfil)
q_t2[i] = np.float32(1.)/(np.float32(1.) + dt * g * h_frott * J) * (q_t1[i] - dt/dx * (qm_right - qm_left + \
press_right - press_left + \
bed_right - bed_left - \
roof_right + roof_left + \
u_infil_exfil * q_infil_exfil))
i = idx_do
qc_right, h4u_right, hc_right, zc_right, zbc_right = splitting(q_border[i+1,up], q_border[i+1, do], h_border[i+1,up], h_border[i+1,do], z_border[i+1,up], z_border[i+1,do], z_bridge_border[i+1,up], z_bridge_border[i+1,do])
qc_left, h4u_left, hc_left, zc_left, zbc_left = splitting(q_border[i,up], q_border[i, do], h_border[i,up], h_border[i,do], z_border[i,up], z_border[i,do], z_bridge_border[i,up], z_bridge_border[i,do])
# Limited section at the right border and at the left border -- decentred downstream
d_right = zbc_right - zc_right
d_left = zbc_left - zc_left
# Pressure on the roof -- 0. if free surface
press_roof_right = hc_right - d_right
press_roof_left = hc_left - d_left
# Pressure integral at the right border and at the left border
press_right = np.float32(0.5) * g * (hc_right**np.float32(2.) - press_roof_right**np.float32(2.))
press_left = np.float32(0.5) * g * (hc_left**np.float32(2.) - press_roof_left**np.float32(2.))
# Friction slope based on center values
u_center = q[i] / (z_bridge_copy[i] - z[i])
# Number of surfaces
nb_frott = np.float32(2.) if h[i] > z_bridge[i] - z[i] else np.float32(1.)
# Integration water depth
h_frott = min(h[i], z_bridge[i] - z[i])
# Slope
J = get_friction_slope_2D_Manning_semi_implicit(u_center, h_frott/nb_frott, n)
# Velocity at the right border and at the left border -- decentred upstream
u_right = qc_right / h4u_right
u_left = qc_left / h4u_left
# Momentum at the right border and at the left border
qm_right = u_right * qc_right
qm_left = u_left * qc_left
# Mean pressure impacting bed reaction
h_mean = (hc_right + hc_left)/np.float32(2.)
bed_right = g * h_mean * zc_right
bed_left = g * h_mean * zc_left
# Mean pressure impacting roof reaction
h_roof_right = max(hc_right + zc_right - zbc_right, np.float32(0.))
h_roof_left = max(hc_left + zc_left - zbc_left , np.float32(0.))
h_roof_mean = (h_roof_right + h_roof_left) / np.float32(2.)
roof_right = g * h_roof_mean * zbc_right
roof_left = g * h_roof_mean * zbc_left
h_t2[i] = h_t1[i] - dt/dx * (qc_right - qc_left - q_infil_exfil)
q_t2[i] = np.float32(1.)/(np.float32(1.) + dt * g * h_frott * J) * (q_t1[i] - dt/dx * (qm_right - qm_left + \
press_right - press_left + \
bed_right - bed_left - \
roof_right + roof_left -\
u_infil_exfil * q_infil_exfil))
limit_h_q(h_t2, q_t2, hmin=np.float32(1e-3), Froudemax=np.float32(3.))
@ jit(nopython=True, cache=True)
[docs]
def limit_h_q(h:np.ndarray, q:np.ndarray, hmin:float = 0., Froudemax:float = 3.) -> None:
""" Limit the water depth and the discharge
:param h: Water depth [m]
:param q: Discharge [m^2/s]
:param hmin: Minimum water depth [m]
:param Froudemax: Maximum Froude number [-]
"""
g = np.float32(9.81)
# retrieve positive and negative values
hpos = np.where(h > hmin)
hneg = np.where(h <= hmin)
# limit water depth
h[hneg] = hmin
q[hneg] = np.float32(0.)
# limit discharge based on Froude number
Fr = np.zeros_like(h)
Fr[hpos] = np.abs(q[hpos]) / h[hpos] / np.sqrt(g * h[hpos])
q[Fr > Froudemax] = Froudemax * np.sqrt(g * h[Fr > Froudemax]) * h[Fr > Froudemax] * np.sign(q[Fr > Froudemax])
# --------------------------
# END JIT compiled functions
# --------------------------
# ----------------------
# START Problems section
# ----------------------
[docs]
MAX_TIME = np.float32(3600)
[docs]
def problem(dom:np.ndarray, z:np.ndarray, h0:float, q0:float, dx:float, CN:float, n:float, h_init:np.ndarray=None, q_init:np.ndarray=None):
""" Solve the mass and momentum equations using a explicit Runge-Kutta scheme (2 steps - 2nd order)
**NO BRIDGE**
"""
#Ensure dom, z is Float32
dom = dom.astype(np.float32)
z = z.astype(np.float32)
h0 = np.float32(h0)
q0 = np.float32(q0)
dx = np.float32(dx)
CN = np.float32(CN)
n = np.float32(n)
h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center = all_unk_border(dom, h0, q0)
if h_init is not None:
h[:] = h_init[:]
if q_init is not None:
q[:] = q_init[:]
totaltime = MAX_TIME
eps = np.float32(1.)
with tqdm(total=float(totaltime)) as pbar:
t = 0.
while eps > 2e-7 and t < totaltime:
dt = compute_dt(dx, h, q, CN)
# Predictor step
Euler_RK(h, h_pred, q, q_pred, h, q, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center)
# Corrector step
Euler_RK(h, h_corr, q, q_corr, h_pred, q_pred, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center)
# Update -- Mean for second order in time
h = (h_pred + h_corr)/np.float32(2.)
q = (q_pred + q_corr)/np.float32(2.)
t+=dt
# eps = np.max(np.abs(q - q_pred))
eps = np.max(np.abs(q[1:-1] - q0))
pbar.update(float(dt))
print("Total time : ", t)
print("Residual : ", eps)
return h, q
[docs]
def problem_wb(dom:np.ndarray, z:np.ndarray, h0:float, q0:float, dx:float, CN:float, n:float, h_init:np.ndarray=None, q_init:np.ndarray=None):
""" Solve the mass and momentum equations using a explicit Runge-Kutta scheme (2 steps - 2nd order)
**NO BRIDGE**
"""
#Ensure dom, z is Float32
dom = dom.astype(np.float32)
z = z.astype(np.float32)
h0 = np.float32(h0)
q0 = np.float32(q0)
dx = np.float32(dx)
CN = np.float32(CN)
n = np.float32(n)
h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center = all_unk_border(dom, h0, q0)
if h_init is not None:
h[:] = h_init[:]
if q_init is not None:
q[:] = q_init[:]
totaltime = MAX_TIME
eps = np.float32(1.)
with tqdm(total=float(totaltime)) as pbar:
t = 0.
while eps > 2e-7 and t < totaltime:
dt = compute_dt(dx, h, q, CN)
# Predictor step
Euler_RK_wb(h, h_pred, q, q_pred, h, q, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center)
# Corrector step
Euler_RK_wb(h, h_corr, q, q_corr, h_pred, q_pred, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center)
# Update -- Mean for second order in time
h = (h_pred + h_corr)/np.float32(2.)
q = (q_pred + q_corr)/np.float32(2.)
t+=dt
# eps = np.max(np.abs(q - q_pred))
eps = np.max(np.abs(q[1:-1] - q0))
pbar.update(float(dt))
print("Total time : ", t)
print("Residual : ", eps)
return h, q
[docs]
def _problem_convergence(dom:np.ndarray, z:np.ndarray, h0:float, q0:float, dx:float, CN:float, n:float):
""" Solve the mass and momentum equations using a explicit Runge-Kutta scheme (2 steps - 2nd order)
**NO BRIDGE**
"""
#Ensure dom, z is Float32
dom = dom.astype(np.float32)
z = z.astype(np.float32)
h0 = np.float32(h0)
q0 = np.float32(q0)
dx = np.float32(dx)
CN = np.float32(CN)
n = np.float32(n)
h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center = all_unk_border(dom, h0, q0)
totaltime = MAX_TIME
eps = np.float32(1.)
all_eps = np.zeros((1_000_000,2), dtype=np.float32)
all_time = np.zeros(1_000_000, dtype=np.float32)
with tqdm(total=float(totaltime)) as pbar:
t = 0.
k = 0
while eps > 1e-7 and t < totaltime:
dt = compute_dt(dx, h, q, CN)
# Predictor step
Euler_RK(h, h_pred, q, q_pred, h, q, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center)
# Corrector step
Euler_RK(h, h_corr, q, q_corr, h_pred, q_pred, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center)
# Update -- Mean for second order in time
h = (h_pred + h_corr)/np.float32(2.)
q = (q_pred + q_corr)/np.float32(2.)
t+=dt
eps_h = np.sum(np.abs(h - h_pred))
eps_q = np.sum(np.abs(q - q_pred))
eps = max(eps_h, eps_q)
all_eps[k] = [eps_h, eps_q]
all_time[k] = t
k+=1
pbar.update(float(dt))
print("Total time : ", t)
print("Residual : ", eps_q)
return h, q, all_eps[:k], all_time[:k]
[docs]
def _problem_convergence_wb(dom:np.ndarray, z:np.ndarray, h0:float, q0:float, dx:float, CN:float, n:float):
""" Well-balanced variant of _problem_convergence (float32)
Uses Euler_RK_wb which avoids catastrophic cancellation in the
pressure + bed-slope gradient by computing:
0.5 * g * (h_r + h_l) * (eta_r - eta_l), eta = h + z
"""
dom = dom.astype(np.float32)
z = z.astype(np.float32)
h0 = np.float32(h0)
q0 = np.float32(q0)
dx = np.float32(dx)
CN = np.float32(CN)
n = np.float32(n)
h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center = all_unk_border(dom, h0, q0)
totaltime = MAX_TIME
eps = np.float32(1.)
all_eps = np.zeros((1_000_000, 2), dtype=np.float32)
all_time = np.zeros(1_000_000, dtype=np.float32)
with tqdm(total=float(totaltime)) as pbar:
t = 0.
k = 0
while eps > np.float32(1e-7) and t < totaltime:
dt = compute_dt(dx, h, q, CN)
# Predictor step
Euler_RK_wb(h, h_pred, q, q_pred, h, q, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center)
# Corrector step
Euler_RK_wb(h, h_corr, q, q_corr, h_pred, q_pred, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center)
# Update -- Mean for second order in time
h = (h_pred + h_corr)/np.float32(2.)
q = (q_pred + q_corr)/np.float32(2.)
t += dt
eps_h = np.max(np.abs(h - h_pred))
eps_q = np.max(np.abs(q - q_pred))
eps = max(eps_h, eps_q)
all_eps[k] = [eps_h, eps_q]
all_time[k] = t
k += 1
pbar.update(float(dt))
print("Total time : ", t)
print("Residual : ", eps)
return h, q, all_eps[:k], all_time[:k]
[docs]
def problem_hedge(dom:np.ndarray, z:np.ndarray, h0:float, q0:float, dx:float, CN:float, n:float):
""" Solve the mass and momentum equations using a explicit Runge-Kutta scheme (2 steps - 2nd order)
**NO BRIDGE but HEDGE in the middle**
"""
h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center = all_unk_border(dom, h0, q0)
theta = np.ones_like(h)
theta_border = np.ones_like(h_border)
slice_hedge = slice(len(h) // 2-1, len(h) // 2+2)
theta_val = np.float32(0.5)
theta[slice_hedge] = theta_val
totaltime = MAX_TIME
eps = 1.
with tqdm(total=float(totaltime)) as pbar:
t = 0.
while eps > 1e-7 and t < totaltime:
dt = compute_dt(dx, h, q, CN)
# Predictor step
Euler_RK_hedge(h, h_pred, q, q_pred, h, q, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center, theta, theta_border)
# Corrector step
Euler_RK_hedge(h, h_corr, q, q_corr, h_pred, q_pred, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center, theta, theta_border)
# Update -- Mean for second order in time
h = (h_pred + h_corr)/np.float32(2.)
q = (q_pred + q_corr)/np.float32(2.)
t+=dt
eps = np.sum(np.abs(q - q_pred))
pbar.update(float(dt))
print("Total time : ", t)
print("Residual : ", eps)
return h, q, theta
[docs]
def problem_bridge(dom:np.ndarray, z:np.ndarray, z_bridge:np.ndarray,
h0:float, q0:float,
dx:float, CN:float, n:float,
h_ini:np.ndarray = None, q_ini:np.ndarray = None) -> tuple[np.ndarray]:
""" Solve the mass and momentum equations using a explicit Rung-Kutta scheme (2 steps - 2nd order)
**WITH BRIDGE and NO OVERFLOW**
"""
h, q, h_pred, q_pred, h_corr, q_corr, q_border, h_border, z_border, u_border, h_center, u_center = all_unk_border(dom, h0, q0)
z_bridge_border = np.zeros_like(z_border)
if h_ini is not None:
h[:] = h_ini[:]
if q_ini is not None:
q[:] = q_ini[:]
totaltime = MAX_TIME
eps = 1.
with tqdm(total=float(totaltime)) as pbar:
t = 0.
while eps > 1e-7 and t < totaltime:
dt = compute_dt(dx, h, q, CN)
# Predictor step
Euler_RK_bridge(h, h_pred, q, q_pred, h, q, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center, z_bridge, z_bridge_border)
# Corrector step
Euler_RK_bridge(h, h_corr, q, q_corr, h_pred, q_pred, h_border, q_border, z, z_border, dt, dx, h0, q0, n, u_border, h_center, u_center, z_bridge, z_bridge_border)
# Update -- Mean for second order in time
h = (h_pred + h_corr)/np.float32(2.)
q = (q_pred + q_corr)/np.float32(2.)
t+=dt
eps = np.sum(np.abs(q - q_pred))
pbar.update(float(dt))
print("Total time : ", t)
print("Residual : ", eps)
return h, q
[docs]
def problem_bridge_multiple_steadystates(dom:np.ndarray, z:np.ndarray, z_bridge:np.ndarray,
h0:float, qmin:float, qmax:float,
dx:float, CN:float, n:float,
) -> list[tuple[float, np.ndarray, np.ndarray]]:
""" Solve multiple steady states for a given discharge range """
all_q = np.arange(qmin, qmax+.1, (qmax-qmin)/10, dtype=np.float32)
ret = []
h = None
for curq in all_q:
h, q = problem_bridge(dom, z, z_bridge, h0, curq, dx, CN, n, h_ini=h, q_ini=None)
ret.append((0., h.copy(), q.copy()))
return ret
# --------------------
# END Problems section
# --------------------