from pathlib import Path
import logging
import numpy as np
from ..simple_simulation import BoundaryConditionIndices
from ..simple_simulation import BC_BLANK_VALUE, NB_BC_TYPES
from ..glsimulation import GLSimulation, TilePackingMode
from ..simple_simulation import SimpleSimulation, TimeStepStrategy, boundary_condition_2D, BoundaryConditionsTypes, Direction, CellParameter, CellParameterType
from ..loaders.utils import convert_bc_type, make_impervious_bathymetry
[docs]
def handle_cell_parameters(bc_descriptions:dict, cell_parameters_list: list[CellParameter]):
for cp in cell_parameters_list:
# Attention! x,y are swapped !!!
cell_numpy_crd = (cp.j-1, cp.i-1)
if cell_numpy_crd not in bc_descriptions:
bc_descriptions[cell_numpy_crd] = [BC_BLANK_VALUE] * NB_BC_TYPES
bc_table_index = None
match cp.ntype:
case CellParameterType.BRIDGE_DECK_ELEVATION:
bc_table_index = BoundaryConditionIndices.BC_TABLE_INDEX_FOR_BRIDGE_DECK_ELEVATION
case CellParameterType.BRIDGE_ROOF_ELEVATION:
bc_table_index = BoundaryConditionIndices.BC_TABLE_INDEX_FOR_BRIDGE_ROOF_ELEVATION
case _:
raise Exception(f"Unsupported cell parameter type {cp.ntype}")
bc_descriptions[cell_numpy_crd][
bc_table_index.value
] = cp.val
[docs]
def handle_left_border_boundary_conditions(bc_descriptions:dict, bc_list: list[boundary_condition_2D],
nap: np.ndarray, bathymetry: np.ndarray, qxbin: np.ndarray):
# ------------------------------------------------------------------------
# Boundary Conditions on left borders
# ------------------------------------------------------------------------
bc: boundary_condition_2D
for bc in bc_list:
bc_type = convert_bc_type(bc.ntype)
logging.debug(f"B.C. left border at {bc.i},{bc.j}, type: {bc_type}")
# Get the indices of the WBC
# Attention! x,y are swapped !!!
cell_numpy_crd = (bc.j-1, bc.i-1)
if cell_numpy_crd not in bc_descriptions:
bc_descriptions[cell_numpy_crd] = [BC_BLANK_VALUE] * NB_BC_TYPES
x, y = bc.i-1, bc.j-1 # i,j are Fortran coords => translate to numpy
# - Either the BC is on the left border of a cell which
# is on the interior side of the border of the domain.
# - Either the BC is on the left border of a cell which
# is on the exterior side of the border of the domain.
left_cell_crd = (y, x - 1)
if not( \
(x == 0 and nap[cell_numpy_crd] == 1) \
or (nap[cell_numpy_crd] == 1 and nap[left_cell_crd] == 0) \
or (nap[cell_numpy_crd] == 0 and nap[left_cell_crd] == 1)):
logging.error(f"cell_numpy_crd={cell_numpy_crd}:{nap[cell_numpy_crd]}, left_cell_crd={left_cell_crd}:{nap[left_cell_crd]}")
err = f"Weak boundary condition at i={bc.i},j={bc.j}: this B.C. is not on the border of the computation domain"
logging.error(err)
raise Exception(err)
# Check each kind of WBC for specific situations and update the
# BC description table.
if bc_type == BoundaryConditionsTypes.QX:
logging.debug(f"Weak boundary condition over QX at i={bc.i},j={bc.j}, value={bc.val}")
bc_descriptions[cell_numpy_crd][
BoundaryConditionIndices.BC_TABLE_INDEX_FOR_QX_ON_LEFT.value
] = bc.val
elif bc_type == BoundaryConditionsTypes.QY:
logging.debug(f"Weak boundary condition over QY at i={bc.i},j={bc.j}, value={bc.val}")
bc_descriptions[cell_numpy_crd][
BoundaryConditionIndices.BC_TABLE_INDEX_FOR_QY_ON_LEFT.value
] = bc.val
elif bc_type == BoundaryConditionsTypes.H:
loc_msg = f"Weak boundary condition over H at i={bc.i},j={bc.j}"
logging.debug(f"{loc_msg}, value={bc.val}")
if bc.val < 0:
# negative values are a hack to express impervious boundaries
logging.error(f"{loc_msg}, h={bc.val} is negative !!!")
if qxbin[cell_numpy_crd] > 0:
logging.warning(f"{loc_msg} doesn't provide enough information to the cell (see cell flow direction)")
if nap[cell_numpy_crd] == 0 \
and bathymetry[left_cell_crd] > bc.val:
logging.error(f"{loc_msg}: seems located rightside of the " \
"domain but the bathymetry in the neighbour " \
f"cell is above the surface level of the b.c. {bathymetry[left_cell_crd]} > {bc.val}, I expect it to be below.")
if nap[cell_numpy_crd] == 1 \
and bathymetry[cell_numpy_crd] > bc.val:
logging.error(f"{loc_msg}: seems located leftside of the " \
"domain but the bathymetry in the cell is " \
f"above the surface level of the b.c. {bathymetry[cell_numpy_crd]} > {bc.val}, I expect it to be below.")
bc_descriptions[cell_numpy_crd][
BoundaryConditionIndices.BC_TABLE_INDEX_FOR_H_ON_LEFT.value
] = bc.val
elif bc_type == BoundaryConditionsTypes.HMOD:
logging.debug(f"Weak boundary condition over HMOD at i={bc.i},j={bc.j}, value={bc.val}")
if bc.val < 0:
# negative values are a hack to express impervious boundaries
logging.warning(f"Weak boundary condition over HMOD at i={bc.i},j={bc.j}, value={bc.val} is negative !!!")
if x < qxbin.shape[1] and y < qxbin.shape[0] and qxbin[y,x] > 0:
logging.warning(f"Weak boundary condition over HMOD at i={bc.i},j={bc.j} doesn't provide enough information to the cell (see cell flow direction)")
bc_descriptions[cell_numpy_crd][
BoundaryConditionIndices.BC_TABLE_INDEX_FOR_HMOD_ON_LEFT.value
] = bc.val
elif bc_type == BoundaryConditionsTypes.FROUDE_NORMAL:
logging.debug(f"Weak boundary condition Froude normal direction at i={bc.i},j={bc.j}, value={bc.val}")
assert bc.val > 0, f"The Froude number must be > 0, you gave {bc.val}"
bc_descriptions[cell_numpy_crd][
BoundaryConditionIndices.BC_TABLE_INDEX_FOR_FROUDE_NORMAL_ON_LEFT.value
] = bc.val
else:
logging.error(f"Unsupported weak condition type {bc_type}")
# Attention: if we have a BC on the left border of cell (i,j)
# we record it. But we also mark the cell (i-1,j) with a
# no-values boundary condition. This special "no-values b.c." cell
# will be interpreted by the GPU engine as a cell with a b.c.
# on its *right* border. This is a bit hacky but makes sure
# that the values of a boundary condition are not duplicated
# across cells (which always carries a risk of mis-duplication bug)
# Of course if one wants to change the BC on the (i-1,j) cell
# he can as it will not disable the detection on a right border BC.
# FIXME This is dangerous as usually a BC is defined on a border
# but is meant to produce effect only on one side of the border.
cell_numpy_crd_left = (cell_numpy_crd[0], cell_numpy_crd[1]-1)
if cell_numpy_crd_left not in bc_descriptions:
bc_descriptions[cell_numpy_crd_left] = [BC_BLANK_VALUE] * NB_BC_TYPES
[docs]
def handle_bottom_border_boundary_conditions(bc_descriptions:dict, bc_list: list[boundary_condition_2D],
nap: np.ndarray, bathymetry: np.ndarray, qybin: np.ndarray):
for bc in bc_list:
bc_type = convert_bc_type(bc.ntype)
logging.debug(f"B.C. bottom border at {bc.i},{bc.j}, type: {bc_type}")
# Attention! x,y are swapped !!!
cell_numpy_crd = (bc.j-1, bc.i-1)
if cell_numpy_crd not in bc_descriptions:
bc_descriptions[cell_numpy_crd] = [BC_BLANK_VALUE] * NB_BC_TYPES
x, y = bc.i-1, bc.j-1 # i,j are Fortran coords => translate to numpy
bottom_cell_crd = (y - 1, x)
# - Either the BC is on the bottom border of a cell which
# is on the interior side of the border of the domain.
# - Either the BC is on the bottom border of a cell which
# is on the exterior side of the border of the domain.
if not( (x == 0 and nap[cell_numpy_crd] == 1) \
or (nap[cell_numpy_crd] == 1 and nap[bottom_cell_crd] == 0) \
or (nap[cell_numpy_crd] == 0 and nap[bottom_cell_crd] == 1)):
err = f"Weak boundary condition at i={bc.i},j={bc.j}: this B.C. is not on the border of the computation domain"
logging.error(err)
raise Exception(err)
if bc_type == BoundaryConditionsTypes.QX:
logging.debug(f"Weak boundary condition over QX (bottom border) at i,j:{(bc.i, bc.j)}, value={bc.val}")
bc_descriptions[cell_numpy_crd][
BoundaryConditionIndices.BC_TABLE_INDEX_FOR_QX_ON_BOTTOM.value
] = bc.val
elif bc_type == BoundaryConditionsTypes.QY:
logging.debug(f"Weak boundary condition over QY (bottom border) at i,j:{(bc.i, bc.j)}, value={bc.val}")
bc_descriptions[cell_numpy_crd][
BoundaryConditionIndices.BC_TABLE_INDEX_FOR_QY_ON_BOTTOM.value
] = bc.val
# elif bc_type == BoundaryConditionsTypes.H.value:
# logging.debug(f"Weak boundary condition over H (bottom border) at i,j:{(bc.i, bc.j)}, value={bc.val}")
# if bc.val < 0:
# # negative values are a hack to express impervious boundaries
# logging.warning(f"Weak boundary condition over H (horiz border) at i={bc.i},j={bc.j}, value={bc.val} is negative !!!")
# if qxbin[y,x] > 0:
# logging.warning(f"Weak boundary condition over H (horiz border) at i={bc.i},j={bc.j} doesn't provide enough information to the cell (see cell flow direction)")
# if bathymetry[cell_numpy_crd] > bc.val:
# logging.error(f"Weak boundary condition over H at i={bc.i},j={bc.j}: the bathymetry is above the surface level, I expect it to be below.")
# bc_descriptions[cell_numpy_crd][BC_TABLE_INDEX_FOR_H_ON_BOTTOM] = bc.val
elif bc_type == BoundaryConditionsTypes.H:
# ........................................................................ NEW
loc_msg = f"Weak horizontal boundary condition over H (surface altitude) at i={bc.i},j={bc.j}"
logging.debug(f"{loc_msg}, value={bc.val}")
if bc.val < 0:
# negative values are a hack to express impervious boundaries
logging.error(f"{loc_msg}, h={bc.val} is negative !!!")
if qybin[cell_numpy_crd] > 0:
logging.warning(f"{loc_msg} doesn't provide enough information to the cell (see cell flow direction)")
if nap[cell_numpy_crd] == 0 \
and bathymetry[bottom_cell_crd] > bc.val:
logging.error(f"{loc_msg}: seems located top side of a mesh in the " \
"domain but the bathymetry in the neighbour " \
f"cell is above the surface level of the b.c. {bathymetry[bottom_cell_crd]} > {bc.val}, I expect it to be below.")
if nap[cell_numpy_crd] == 1 \
and bathymetry[cell_numpy_crd] > bc.val:
logging.error(f"{loc_msg}: seems located bottom side of a mesh in the " \
"domain but the bathymetry in the cell is " \
f"above the surface level of the b.c. {bathymetry[bottom_cell_crd]} > {bc.val}, I expect it to be below.")
bc_descriptions[cell_numpy_crd][
BoundaryConditionIndices.BC_TABLE_INDEX_FOR_H_ON_BOTTOM.value
] = bc.val
# ........................................................................ NEW
elif bc_type == BoundaryConditionsTypes.HMOD:
logging.debug(f"Weak boundary condition over HMOD (water height, horiz border) at i={bc.i},j={bc.j}, value={bc.val}")
if bc.val < 0:
# negative values are a hack to express impervious boundaries
logging.warning(f"But at i={bc.i},j={bc.j}, B.C.'s value={bc.val} is negative !!!")
if x < qybin.shape[1] and y < qybin.shape[0] and qybin[y,x] > 0:
logging.warning(f"But at i={bc.i},j={bc.j} B.C. doesn't provide enough information to the cell (see cell flow direction)")
bc_descriptions[cell_numpy_crd][
BoundaryConditionIndices.BC_TABLE_INDEX_FOR_HMOD_ON_BOTTOM.value
] = bc.val
elif bc_type == BoundaryConditionsTypes.FROUDE_NORMAL:
logging.debug(f"Weak boundary condition Froude normal direction at i={bc.i},j={bc.j}, value={bc.val}")
assert bc.val > 0, f"The Froude number must be > 0, you gave {bc.val}"
bc_descriptions[cell_numpy_crd][
BoundaryConditionIndices.BC_TABLE_INDEX_FOR_FROUDE_NORMAL_ON_BOTTOM.value
] = bc.val
else:
raise Exception(f"Unsupported weak condition type {bc_type} at i={bc.i},j={bc.j}")
# Mirror this BC on the top border of the bottom side cell.
cell_numpy_crd_bottom = (cell_numpy_crd[0]-1, cell_numpy_crd[1])
if cell_numpy_crd_bottom not in bc_descriptions:
bc_descriptions[cell_numpy_crd_bottom] = [BC_BLANK_VALUE] * NB_BC_TYPES
[docs]
def digest_bcs_into_array(bc_descriptions, weak_boundary_cond_array):
# Convert the noted cells' boundary conditions into a
# sequential array and pointing towards it.
if bc_descriptions:
bc_cells_array = []
for i,(cell_numpy_crd,bc_values) in enumerate(bc_descriptions.items()):
bc_cells_array.append(bc_values)
# Here is a tricky thing:
# If there are some b.c. on a cell, then we give it an index >= 1.
# If there are no b.c. we give the index 0.
# This way we can know if there are b.c. or not without
# looking at b.c. themselves. It's an optimisation.
# Moreover, don't forget that a right-border B.C. will
# be located outside of the active cells !
weak_boundary_cond_array[cell_numpy_crd] |= (i+1) << 1 # bit 1 is NAP.
logging.debug(f"weak b.c. / cell param: #{weak_boundary_cond_array[cell_numpy_crd]}: {cell_numpy_crd} -> {bc_values}")
bc_cells_array = np.array(bc_cells_array,dtype=np.float32)
#print(bc_cells_array)
#bc_cells_array = bc_cells_array.transpose()
logging.info(f"There are {len(bc_descriptions)} boundary conditions/cell parameters.")
else:
bc_cells_array = None
return bc_cells_array
[docs]
def load_simple_sim_to_gpu(
sim: SimpleSimulation,
tile_size: int = 16,
shader_log_path: Path = None,
tiles_packing_mode: TilePackingMode = TilePackingMode.REGULAR,
optimize_indirection: bool = False,
fail_if_invalid_sim: bool = True
) -> GLSimulation:
""" Loads a "simple" model to GPU.
"""
assert isinstance(sim, SimpleSimulation)
if fail_if_invalid_sim:
sim._fail_if_invalid()
else:
errors = sim.check_errors()
if errors:
for error in errors:
logging.error(error)
if sim._param_timestep_strategy == TimeStepStrategy.OPTIMIZED_TIME_STEP and sim.param_timestep == 0:
ts = None
else:
ts = sim.param_timestep
#from wolfgpu.tile_packer import TilePackingMode
gpu = GLSimulation(width=sim._param_nx, height=sim._param_ny,
time_step_strategy = sim._param_timestep_strategy,
time_step = ts, dx=sim.param_dx, dy=sim.param_dy,
tile_size=tile_size,
tile_packing_mode=tiles_packing_mode,
shader_log_path=shader_log_path,
optimized_indirection=optimize_indirection)
gpu.set_froude_bc_tolerance( sim.param_froude_bc_limit_tolerance)
gpu.set_simulation_duration(sim.param_duration)
gpu.set_report_frequency(sim._param_report_period)
gpu.set_courant(float(sim.param_courant))
if sim.param_runge_kutta is None:
gpu.set_euler_ponderation()
else:
gpu.set_rk_ponderation(sim.param_runge_kutta)
gpu.froude_max = float(sim.param_froude_max)
qzero = np.zeros((sim._param_ny,sim._param_nx,3), dtype=np.float32)
# q = np.dstack([sim._h.T,sim._qx.T,sim._qy.T])
# print("-"*80)
# print(sim.param_timestep_strategy)
# print(sim.param_timestep)
# print(q.shape)
# print("-"*80)
if sim.nap is not None:
bathymetry = make_impervious_bathymetry(sim.nap, sim._bathymetry)
else:
bathymetry = sim.bathymetry
# FIXME I should reuse the transposed versiosn of all the matrices (else I transpose
# them several times and it costs a lot in initialisation)
gpu.set_buffers(
active_meshes_array=sim.nap.T,
bathymetry_array=bathymetry.T,
quantity_arrays=[np.dstack([sim._h.T,sim._qx.T,sim._qy.T],), qzero, qzero],
manning_array=sim._manning.T)
# Prepare infiltration data
if sim.infiltration_zones is not None:
inf_timings = sim._infiltrations_timings_as_gpu_array()
gpu.set_buffers(
infiltration_zones_array=sim.infiltration_zones.T-1,
infiltration_timings=inf_timings)
gpu.set_infiltration_interpolation(sim._param_infiltration_lerp)
bc_descriptions = dict()
handle_left_border_boundary_conditions(bc_descriptions,
[bc for bc in sim._boundary_conditions if bc.direction == Direction.LEFT],
sim.nap.T, sim._bathymetry.T, sim._qx.T)
handle_bottom_border_boundary_conditions(bc_descriptions,
[bc for bc in sim._boundary_conditions if bc.direction == Direction.BOTTOM],
sim.nap.T, sim._bathymetry.T, sim._qy.T)
handle_cell_parameters(bc_descriptions, sim._cell_parameters)
weak_boundary_cond = np.zeros((sim._param_ny, sim._param_nx), dtype=np.uint32)
weak_boundary_cond[(sim._h.T > 0) | (sim.nap.T == 1)] = 1 # FIXME Not correct, use the NAP mask !
bc_cells_array = digest_bcs_into_array(bc_descriptions, weak_boundary_cond)
gpu.set_buffers(weak_bc_array=weak_boundary_cond,
bc_cells_array=bc_cells_array)
gpu.complete_textures()
return gpu