Source code for wolfgpu.loaders.simple_sim_loader

from pathlib import Path
import logging
import numpy as np
from enum import Enum
from ..glsimulation import GLSimulation, TilePackingMode
from ..simple_simulation import SimpleSimulation, TimeStepStrategy, boundary_condition_2D, BoundaryConditionsTypes, Direction
from ..loaders.utils import convert_bc_type, make_impervious_bathymetry

[docs] BC_BLANK_VALUE=np.float32(-9_999_999) # Exactly represented byt float32
[docs] class BoundaryConditionIndices(Enum): """ The column indices of each supported boundary conditions in the boundary conditions table. IMPORTANT: These values are tied to data representation in the shader. Don't change them! """
[docs] BC_TABLE_INDEX_FOR_H_ON_LEFT = 1
[docs] BC_TABLE_INDEX_FOR_HMOD_ON_LEFT = 2
[docs] BC_TABLE_INDEX_FOR_QX_ON_LEFT = 0
[docs] BC_TABLE_INDEX_FOR_QY_ON_LEFT = 3 # Qy on a Y horizontal border FIXME we're missing it on the vertical border !
[docs] BC_TABLE_INDEX_FOR_FROUDE_NORMAL_ON_LEFT = 8
[docs] BC_TABLE_INDEX_FOR_H_ON_BOTTOM = 4
[docs] BC_TABLE_INDEX_FOR_HMOD_ON_BOTTOM = 5
[docs] BC_TABLE_INDEX_FOR_QX_ON_BOTTOM = 6 # Qx applied to a bottom border (so Ax parallel to the border)
[docs] BC_TABLE_INDEX_FOR_QY_ON_BOTTOM = 7 # Qy applied to a bottom border (so Ax parallel to the border)
[docs] BC_TABLE_INDEX_FOR_FROUDE_NORMAL_ON_BOTTOM = 9
[docs] NB_BC_TYPES = len(BoundaryConditionIndices)
[docs] def handle_left_border_boundary_conditions(bc_descriptions:dict, bc_list: list[boundary_condition_2D], nap: np.ndarray, bathymetry: np.ndarray, qxbin: np.ndarray): # ------------------------------------------------------------------------ # Boundary Conditions on left borders # ------------------------------------------------------------------------ bc: boundary_condition_2D for bc in bc_list: bc_type = convert_bc_type(bc.ntype) logging.debug(f"B.C. left border at {bc.i},{bc.j}, type: {bc_type}") # Get the indices of the WBC # Attention! x,y are swapped !!! cell_numpy_crd = (bc.j-1, bc.i-1) if cell_numpy_crd not in bc_descriptions: bc_descriptions[cell_numpy_crd] = [BC_BLANK_VALUE] * NB_BC_TYPES x, y = bc.i-1, bc.j-1 # i,j are Fortran coords => translate to numpy # - Either the BC is on the left border of a cell which # is on the interior side of the border of the domain. # - Either the BC is on the left border of a cell which # is on the exterior side of the border of the domain. left_cell_crd = (y, x - 1) if not( (x == 0 and nap[cell_numpy_crd] == 1) \ or (nap[cell_numpy_crd] == 1 and nap[left_cell_crd] == 0) \ or (nap[cell_numpy_crd] == 0 and nap[left_cell_crd] == 1)): logging.error(f"cell_numpy_crd={cell_numpy_crd}:{nap[cell_numpy_crd]}, left_cell_crd={left_cell_crd}:{nap[left_cell_crd]}") err = f"Weak boundary condition at i={bc.i},j={bc.j}: this B.C. is not on the border of the computation domain" logging.error(err) raise Exception(err) # Check each kind of WBC for specific situations and update the # BC description table. if bc_type == BoundaryConditionsTypes.QX: logging.debug(f"Weak boundary condition over QX at i={bc.i},j={bc.j}, value={bc.val}") bc_descriptions[cell_numpy_crd][ BoundaryConditionIndices.BC_TABLE_INDEX_FOR_QX_ON_LEFT.value ] = bc.val elif bc_type == BoundaryConditionsTypes.QY: logging.debug(f"Weak boundary condition over QY at i={bc.i},j={bc.j}, value={bc.val}") bc_descriptions[cell_numpy_crd][ BoundaryConditionIndices.BC_TABLE_INDEX_FOR_QY_ON_LEFT.value ] = bc.val elif bc_type == BoundaryConditionsTypes.H: loc_msg = f"Weak boundary condition over H at i={bc.i},j={bc.j}" logging.debug(f"{loc_msg}, value={bc.val}") if bc.val < 0: # negative values are a hack to express impervious boundaries logging.error(f"{loc_msg}, h={bc.val} is negative !!!") if qxbin[cell_numpy_crd] > 0: logging.warning(f"{loc_msg} doesn't provide enough information to the cell (see cell flow direction)") if nap[cell_numpy_crd] == 0 \ and bathymetry[left_cell_crd] > bc.val: logging.error(f"{loc_msg}: seems located rightside of the " \ "domain but the bathymetry in the neighbour " \ f"cell is above the surface level of the b.c. {bathymetry[left_cell_crd]} > {bc.val}, I expect it to be below.") if nap[cell_numpy_crd] == 1 \ and bathymetry[cell_numpy_crd] > bc.val: logging.error(f"{loc_msg}: seems located leftside of the " \ "domain but the bathymetry in the cell is " \ f"above the surface level of the b.c. {bathymetry[left_cell_crd]} > {bc.val}, I expect it to be below.") bc_descriptions[cell_numpy_crd][ BoundaryConditionIndices.BC_TABLE_INDEX_FOR_H_ON_LEFT.value ] = bc.val elif bc_type == BoundaryConditionsTypes.HMOD: logging.debug(f"Weak boundary condition over HMOD at i={bc.i},j={bc.j}, value={bc.val}") if bc.val < 0: # negative values are a hack to express impervious boundaries logging.warning(f"Weak boundary condition over HMOD at i={bc.i},j={bc.j}, value={bc.val} is negative !!!") if x < qxbin.shape[1] and y < qxbin.shape[0] and qxbin[y,x] > 0: logging.warning(f"Weak boundary condition over HMOD at i={bc.i},j={bc.j} doesn't provide enough information to the cell (see cell flow direction)") bc_descriptions[cell_numpy_crd][ BoundaryConditionIndices.BC_TABLE_INDEX_FOR_HMOD_ON_LEFT.value ] = bc.val elif bc_type == BoundaryConditionsTypes.FROUDE_NORMAL: logging.debug(f"Weak boundary condition Froude normal direction at i={bc.i},j={bc.j}, value={bc.val}") assert bc.val > 0, f"The Froude number must be > 0, you gave {bc.val}" bc_descriptions[cell_numpy_crd][ BoundaryConditionIndices.BC_TABLE_INDEX_FOR_FROUDE_NORMAL_ON_LEFT.value ] = bc.val else: logging.error(f"Unsupported weak condition type {bc_type}") # Attention: if we have a BC on the left border of cell (i,j) # we record it. But we also mark the cell (i-1,j) with a # no-values boundary condition. This special "no-values b.c." cell # will be interpreted by the GPU engine as a cell with a b.c. # on its *right* border. This is a bit hacky but makes sure # that the values of a boundary condition are not duplicated # across cells (which always carries a risk of mis-duplication bug) # Of course if one wants to change the BC on the (i-1,j) cell # he can as it will not disable the detection on a right border BC. # FIXME This is dangerous as usually a BC is defined on a border # but is meant to produce effect only on one side of the border. cell_numpy_crd_left = (cell_numpy_crd[0], cell_numpy_crd[1]-1) if cell_numpy_crd_left not in bc_descriptions: bc_descriptions[cell_numpy_crd_left] = [BC_BLANK_VALUE] * NB_BC_TYPES
[docs] def handle_bottom_border_boundary_conditions(bc_descriptions:dict, bc_list: list[boundary_condition_2D], nap: np.ndarray, bathymetry: np.ndarray, qybin: np.ndarray): for bc in bc_list: bc_type = convert_bc_type(bc.ntype) logging.debug(f"B.C. bottom border at {bc.i},{bc.j}, type: {bc_type}") # Attention! x,y are swapped !!! cell_numpy_crd = (bc.j-1, bc.i-1) if cell_numpy_crd not in bc_descriptions: bc_descriptions[cell_numpy_crd] = [BC_BLANK_VALUE] * NB_BC_TYPES x, y = bc.i-1, bc.j-1 # i,j are Fortran coords => translate to numpy bottom_cell_crd = (y - 1, x) # - Either the BC is on the bottom border of a cell which # is on the interior side of the border of the domain. # - Either the BC is on the bottom border of a cell which # is on the exterior side of the border of the domain. if not( (x == 0 and nap[cell_numpy_crd] == 1) \ or (nap[cell_numpy_crd] == 1 and nap[bottom_cell_crd] == 0) \ or (nap[cell_numpy_crd] == 0 and nap[bottom_cell_crd] == 1)): err = f"Weak boundary condition at i={bc.i},j={bc.j}: this B.C. is not on the border of the computation domain" logging.error(err) raise Exception(err) if bc_type == BoundaryConditionsTypes.QX: logging.debug(f"Weak boundary condition over QX (bottom border) at i,j:{(bc.i, bc.j)}, value={bc.val}") bc_descriptions[cell_numpy_crd][ BoundaryConditionIndices.BC_TABLE_INDEX_FOR_QX_ON_BOTTOM.value ] = bc.val elif bc_type == BoundaryConditionsTypes.QY: logging.debug(f"Weak boundary condition over QY (bottom border) at i,j:{(bc.i, bc.j)}, value={bc.val}") bc_descriptions[cell_numpy_crd][ BoundaryConditionIndices.BC_TABLE_INDEX_FOR_QY_ON_BOTTOM.value ] = bc.val # elif bc_type == BoundaryConditionsTypes.H.value: # logging.debug(f"Weak boundary condition over H (bottom border) at i,j:{(bc.i, bc.j)}, value={bc.val}") # if bc.val < 0: # # negative values are a hack to express impervious boundaries # logging.warning(f"Weak boundary condition over H (horiz border) at i={bc.i},j={bc.j}, value={bc.val} is negative !!!") # if qxbin[y,x] > 0: # logging.warning(f"Weak boundary condition over H (horiz border) at i={bc.i},j={bc.j} doesn't provide enough information to the cell (see cell flow direction)") # if bathymetry[cell_numpy_crd] > bc.val: # logging.error(f"Weak boundary condition over H at i={bc.i},j={bc.j}: the bathymetry is above the surface level, I expect it to be below.") # bc_descriptions[cell_numpy_crd][BC_TABLE_INDEX_FOR_H_ON_BOTTOM] = bc.val elif bc_type == BoundaryConditionsTypes.H: # ........................................................................ NEW loc_msg = f"Weak horizontal boundary condition over H (surface altitude) at i={bc.i},j={bc.j}" logging.debug(f"{loc_msg}, value={bc.val}") if bc.val < 0: # negative values are a hack to express impervious boundaries logging.error(f"{loc_msg}, h={bc.val} is negative !!!") if qybin[cell_numpy_crd] > 0: logging.warning(f"{loc_msg} doesn't provide enough information to the cell (see cell flow direction)") if nap[cell_numpy_crd] == 0 \ and bathymetry[bottom_cell_crd] > bc.val: logging.error(f"{loc_msg}: seems located top side of a mesh in the " \ "domain but the bathymetry in the neighbour " \ f"cell is above the surface level of the b.c. {bathymetry[bottom_cell_crd]} > {bc.val}, I expect it to be below.") if nap[cell_numpy_crd] == 1 \ and bathymetry[cell_numpy_crd] > bc.val: logging.error(f"{loc_msg}: seems located bottom side of a mesh in the " \ "domain but the bathymetry in the cell is " \ f"above the surface level of the b.c. {bathymetry[bottom_cell_crd]} > {bc.val}, I expect it to be below.") bc_descriptions[cell_numpy_crd][ BoundaryConditionIndices.BC_TABLE_INDEX_FOR_H_ON_BOTTOM.value ] = bc.val # ........................................................................ NEW elif bc_type == BoundaryConditionsTypes.HMOD: logging.debug(f"Weak boundary condition over HMOD (water height, horiz border) at i={bc.i},j={bc.j}, value={bc.val}") if bc.val < 0: # negative values are a hack to express impervious boundaries logging.warning(f"But at i={bc.i},j={bc.j}, B.C.'s value={bc.val} is negative !!!") if x < qybin.shape[1] and y < qybin.shape[0] and qybin[y,x] > 0: logging.warning(f"But at i={bc.i},j={bc.j} B.C. doesn't provide enough information to the cell (see cell flow direction)") bc_descriptions[cell_numpy_crd][ BoundaryConditionIndices.BC_TABLE_INDEX_FOR_HMOD_ON_BOTTOM.value ] = bc.val elif bc_type == BoundaryConditionsTypes.FROUDE_NORMAL: logging.debug(f"Weak boundary condition Froude normal direction at i={bc.i},j={bc.j}, value={bc.val}") assert bc.val > 0, f"The Froude number must be > 0, you gave {bc.val}" bc_descriptions[cell_numpy_crd][ BoundaryConditionIndices.BC_TABLE_INDEX_FOR_FROUDE_NORMAL_ON_BOTTOM.value ] = bc.val else: raise Exception(f"Unsupported weak condition type {bc_type} at i={bc.i},j={bc.j}") # Mirror this BC on the top border of the bottom side cell. cell_numpy_crd_bottom = (cell_numpy_crd[0]-1, cell_numpy_crd[1]) if cell_numpy_crd_bottom not in bc_descriptions: bc_descriptions[cell_numpy_crd_bottom] = [BC_BLANK_VALUE] * NB_BC_TYPES
[docs] def digest_bcs_into_array(bc_descriptions, weak_boundary_cond): # Convert the noted cells' boundary conditions into a # sequential array and pointing towards it. if bc_descriptions: bc_cells_array = [] for i,(cell_numpy_crd,bc_values) in enumerate(bc_descriptions.items()): bc_cells_array.append(bc_values) # Here is a tricky thing: # If there are some b.c. on a cell, then we give it an index >= 1. # If there are no b.c. we give the index 0. # This way we can know if there are b.c. or not without # looking at b.c. themselves. It's an optimisation. # Moreover, don't forget that a right-border B.C. will # be located outside of the active cells ! weak_boundary_cond[cell_numpy_crd] |= (i+1) << 1 # bit 1 is NAP. logging.debug(f"weak b.c.: #{weak_boundary_cond[cell_numpy_crd]}: {cell_numpy_crd} -> {bc_values}") bc_cells_array = np.array(bc_cells_array,dtype=np.float32) #print(bc_cells_array) #bc_cells_array = bc_cells_array.transpose() logging.info(f"There are {len(bc_descriptions)} boundary conditions.") else: bc_cells_array = None return bc_cells_array
[docs] def load_simple_sim_to_gpu( sim: SimpleSimulation, tile_size: int = 16, shader_log_path: Path = None, tiles_packing_mode: TilePackingMode = TilePackingMode.REGULAR, optimize_indirection: bool = False, fail_if_invalid_sim: bool = True ) -> GLSimulation: """ Loads a "simple" model to GPU. """ assert isinstance(sim, SimpleSimulation) if fail_if_invalid_sim: sim._fail_if_invalid() else: errors = sim.check_errors() if errors: for error in errors: logging.error(error) if sim._param_timestep_strategy == TimeStepStrategy.OPTIMIZED_TIME_STEP and sim.param_timestep == 0: ts = None else: ts = sim.param_timestep #from wolfgpu.tile_packer import TilePackingMode gpu = GLSimulation(width=sim._param_nx, height=sim._param_ny, time_step_strategy = sim._param_timestep_strategy, time_step = ts, dx=sim.param_dx, dy=sim.param_dy, tile_size=tile_size, tile_packing_mode=tiles_packing_mode, shader_log_path=shader_log_path, optimized_indirection=optimize_indirection) gpu.set_simulation_duration(sim.param_duration) gpu.set_report_frequency(sim._param_report_period) gpu.set_courant(float(sim.param_courant)) if sim.param_runge_kutta is None: gpu.set_euler_ponderation() else: gpu.set_rk_ponderation(sim.param_runge_kutta) gpu.froude_max = float(sim.param_froude_max) qzero = np.zeros((sim._param_ny,sim._param_nx,3), dtype=np.float32) # q = np.dstack([sim._h.T,sim._qx.T,sim._qy.T]) # print("-"*80) # print(sim.param_timestep_strategy) # print(sim.param_timestep) # print(q.shape) # print("-"*80) if sim.nap is not None: bathymetry = make_impervious_bathymetry(sim.nap, sim._bathymetry) else: bathymetry = sim.bathymetry # FIXME I should reuse the transposed versiosn of all the matrices (else I transpose # them several times and it costs a lot in initialisation) gpu.set_buffers( active_meshes_array=sim.nap.T, bathymetry_array=bathymetry.T, quantity_arrays=[np.dstack([sim._h.T,sim._qx.T,sim._qy.T],), qzero, qzero], manning_array=sim._manning.T) # Prepare infiltration data if sim.infiltration_zones is not None: inf_timings = sim._infiltrations_timings_as_gpu_array() gpu.set_buffers(infiltration_zones_array=sim.infiltration_zones.T-1, infiltration_timings=inf_timings) gpu.set_infiltration_interpolation(sim._param_infiltration_lerp) bc_descriptions = dict() handle_left_border_boundary_conditions(bc_descriptions, [bc for bc in sim._boundary_conditions if bc.direction == Direction.LEFT], sim.nap.T, sim._bathymetry.T, sim._qx.T) handle_bottom_border_boundary_conditions(bc_descriptions, [bc for bc in sim._boundary_conditions if bc.direction == Direction.BOTTOM], sim.nap.T, sim._bathymetry.T, sim._qy.T) weak_boundary_cond = np.zeros((sim._param_ny, sim._param_nx), dtype=np.uint32) weak_boundary_cond[(sim._h.T > 0) | (sim.nap.T == 1)] = 1 # FIXME Not correct, use the NAP mask ! bc_cells_array = digest_bcs_into_array(bc_descriptions, weak_boundary_cond) gpu.set_buffers(weak_bc_array=weak_boundary_cond, bc_cells_array=bc_cells_array) gpu.complete_textures() return gpu