"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
from abc import ABC, abstractmethod
from pathlib import Path
from enum import Enum
import gettext
import numpy as np
from typing import Tuple, List, Dict, Union, final, Literal, Annotated
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from pint import UnitRegistry, Unit
[docs]
unit_registry = UnitRegistry()
from wolfhece.wolf_array import header_wolf
from wolfhece.wolf_array import WolfArray
from wolfgpu.simplesimu.infiltrations import InfiltrationChronology
from wolfgpu.simplesimu.durations import SimulationDuration, SimulationDurationType
from wolfgpu.glsimulation import GLSimulation, GLSimulationGlobalState, BoundaryConditionIndices, BC_ON_LEFT, BC_ON_BOTTOM, BoundaryConditionsTypes, Direction, BC_BLANK_VALUE
from wolfgpu.results_store import ResultsStore
[docs]
class SimulationProxy:
""" A proxy to query and update a running simulation.
This class exists to hide the methods of the simulator from the caller
because they are numerous and many of them are of no interest to
the caller.
"""
# Implementation note: this proxy should just hold the data to be updated.
# It should not apply update on the GPU itself. Why ? Because if there are
# multiple injectors, then one may want to group all their updated data and
# apply them on the GPU in one go.
# Right now we d/l the whole bathymetry from the GPU which costs a *lot*.
# So we share that download across proxies.
def __init__(self, glsim: GLSimulation,
global_state: GLSimulationGlobalState,
active_zone: header_wolf,
simulation_current_quantity: int,
original_infiltration_chronology: InfiltrationChronology):
"""
:param glsim: The simulation we are in.
:param global_state: A copy of the global state of the GPU simulation.
:param active_zone: The geographic area the injector will use
:param simulation_current_quantity: The index of the current quantity
being simulated (i.e. the one that will be updated at next step).
:param original_infiltration_chronology: The infiltration chronology
"""
assert isinstance(original_infiltration_chronology, InfiltrationChronology) or original_infiltration_chronology is None
[docs]
self._glsim: GLSimulation = glsim
# FIXME global_state should come from glsim, no ??? Not yet because it
# comes from the simulation runner. So instead of passing a global state
# copy I would pass a SimulationRunner which is not any better. The more
# I progress, the more I copy stuff from the runner here. So, I'll need
# to put that stuff back in the runner and make the proxy a real proxy
# (i.e. something that hides the runner/simulator).
[docs]
self._global_state = global_state
[docs]
self._active_zone: header_wolf = active_zone
[docs]
self._cached_bathymetry = None
""" Cached bathymetry over the whole simulation domain. (j,i) indexing. """
[docs]
self._cached_h_qx_qy = None
""" Cached h, qx, qy over the whole simulation domain. (j,i) indexing. """
[docs]
self._cached_infiltration_zones = None
""" Cached infiltration zones over the whole simulation domain. (j,i) indexing. """
[docs]
self._cached_manning = None
""" Cached Manning's n over the whole simulation domain. (j,i) indexing. """
[docs]
self._cached_cell_param_pr_bcs_values = None
[docs]
self._cached_cell_param_or_bcs_indices = None
[docs]
self._bathymetry_was_set = False
[docs]
self._h_qx_qy_was_set = False
[docs]
self._infiltration_zones_was_set = False
[docs]
self._infiltration_chronology_was_set = False
[docs]
self._manning_was_set = False
[docs]
self._param_bc_was_set = False
[docs]
self._param_new_bc_was_set = False
[docs]
self._simulation_current_quantity = simulation_current_quantity
[docs]
self._infiltration_chronology = original_infiltration_chronology
[docs]
def _ensure_array_size(self, a):
""" Ensure the array size is the one expected for the active zone.
:param a: The array to check.
"""
expected_shape = tuple([ self._active_zones_as_slices[i].stop - self._active_zones_as_slices[i].start for i in (0,1) ])
assert a.shape == expected_shape, f"The shape of the array you give {a.shape} is not what I expected {expected_shape}"
[docs]
def _set_active_zone(self, zone_of_interest: header_wolf):
""" Set a new active zone for this injector.
:param zone_of_interest: The new active zone.
"""
self._active_zone = zone_of_interest
@property
[docs]
def active_zone(self) -> header_wolf:
return self._active_zone
@property
[docs]
def _active_zones_as_slices(self):
""" Slices corresponding to the active zone of the injector, in
simulation coordinates.
0-based indexing.
We need to convert from world coordinates to simulation coordinates
by subtracting the origin offsets.
"""
slices = self._active_zone.to_slices()
return (slice(slices[0].start - int(self._glsim.ox / self._glsim.dx),
slices[0].stop - int(self._glsim.ox / self._glsim.dx)),
slice(slices[1].start - int(self._glsim.oy / self._glsim.dy),
slices[1].stop - int(self._glsim.oy / self._glsim.dy))
)
@property
[docs]
def _active_zones_as_bcs_slices(self):
slices = self._active_zones_as_slices
slices_bcs = slice(max(slices[0].start, 1), min(slices[0].stop+1, self._glsim.nbx)), \
slice(max(slices[1].start, 1), min(slices[1].stop+1, self._glsim.nby))
return slices_bcs
[docs]
def _updated_infiltration_chronology(self):
return self._infiltration_chronology
@property
[docs]
def current_sim_step(self) -> int:
""" The current step in the simulation (starting at zero).
"""
return self._global_state.simulation_step
@property
[docs]
def current_sim_time(self) -> float:
""" The current time in the simulation.
Expressed in seconds (starting at zero).
"""
return self._global_state.simulation_time
[docs]
def get_bathymetry(self) -> np.ndarray:
""" Get the bathymetry unknown over the zone of interest.
:return: The bathymetry values over the zone of interest.
We cache the bathymetry to avoid multiple d/l from the GPU.
"""
if self._cached_bathymetry is None:
self._cached_bathymetry = self._glsim.read_bathymetry()
return self._cached_bathymetry[self._active_zones_as_slices[1], self._active_zones_as_slices[0]].T
[docs]
def set_bathymetry(self, b: np.ndarray | WolfArray):
""" Set the bathymetry unknown over the zone of interest.
:param b: The bathymetry values to set. The array size must be equal to the size of the
zone of interest.
"""
if isinstance(b, WolfArray):
b = b.array.data
self._ensure_array_size(b)
self._cached_bathymetry[self._active_zones_as_slices[1], self._active_zones_as_slices[0]] = b.T
self._bathymetry_was_set = True
[docs]
def get_mannings_n(self) -> np.ndarray:
""" Get the Manning's n unknown over the zone of interest.
:return: The Manning's n values over the zone of interest.
We cache the Manning's n to avoid multiple d/l from the GPU.
"""
if self._cached_manning is None:
self._cached_manning = self._glsim.read_manning()
return self._cached_manning[self._active_zones_as_slices[1], self._active_zones_as_slices[0]].T
[docs]
def set_mannings_n(self, b: np.ndarray | WolfArray):
""" Set the Manning's n unknown over the zone of interest.
:param b: The Manning's n values to set. The array size must be equal to the size of the
zone of interest.
"""
if isinstance(b, WolfArray):
b = b.array.data
self._ensure_array_size(b)
self._cached_manning[self._active_zones_as_slices[1], self._active_zones_as_slices[0]] = b.T
self._manning_was_set = True
[docs]
def get_infiltration_zones(self) -> np.ndarray:
""" Get the infiltration zones over the zone of interest.
Infiltration zone are 1-based indices as it is a WOLF convention.
:return: The infiltration zones over the zone of interest.
We cache the infiltration zones to avoid multiple d/l from the GPU.
"""
if self._cached_infiltration_zones is None:
self._cached_infiltration_zones = self._glsim.read_infiltration_map() + 1
return self._cached_infiltration_zones[self._active_zones_as_slices[1], self._active_zones_as_slices[0]].T
[docs]
def set_infiltration_zones(self, b: np.ndarray | WolfArray):
""" Set the infiltration zones over the zone of interest.
:param b: The infiltration zones to set. The array size must be equal to the size of the
zone of interest.
"""
if isinstance(b, WolfArray):
b = b.array.data
self._ensure_array_size(b)
self._cached_infiltration_zones[self._active_zones_as_slices[1], self._active_zones_as_slices[0]] = b.T
self._infiltration_zones_was_set = True
[docs]
def get_active_infiltration_quantities(self):
""" Get the current infiltration chronology's row: that is, the
currently infiltrated quantities.
:return: The current infitlration chronology's row. If none is
active (because current time is before the chronology beginning)
then `None` is returned.
"""
row_at_t = self._infiltration_chronology.get_row_at_time(self.current_sim_time)
if row_at_t:
t, values = row_at_t
return values
else:
return None
[docs]
def set_active_infiltration_quantities(self, q) -> None:
""" Set the current infiltration chronology's row: that is, set the
infiltrated quantities.
"""
t = self._infiltration_chronology.get_active_entry_start_time(self.current_sim_time)
assert t is not None, "You're trying to set the active infiltration row, but there's none active"
self._infiltration_chronology.set(t, q)
self._infiltration_chronology_was_set = True
[docs]
def insert_infiltration_quantities(self, t: float, q: List[float]) -> None:
""" Insert or replace an infiltration chronology's row.
:param t: Beginning time for the infiltration of the row (seconds).
:param q: Infiltrated quantities, as many as the number of infiltrated zones.
"""
self._infiltration_chronology.set(t, q)
self._infiltration_chronology_was_set = True
[docs]
def _cell_bcs_params_values_indices(self):
""" Get the cell parameters values over the whole simulation domain.
:return: A numpy array with the cell parameters values.
"""
if self._cached_cell_param_pr_bcs_values is None:
self._cached_cell_param_pr_bcs_values = self._glsim.read_bcs_params_values()
if self._cached_cell_param_or_bcs_indices is None:
self._cached_cell_param_or_bcs_indices = self._glsim.get_weak_bcs_params_index_position()
return self._cached_cell_param_pr_bcs_values, self._cached_cell_param_or_bcs_indices
[docs]
def _convert_cached_bcs_params_indices(self):
""" Get the cached BCs params indices over the whole simulation domain. """
# Re-pack the indices into bits (for the GPU shaders to decode them)
new_indices = ((self._cached_cell_param_or_bcs_indices + 1) << 1).astype(np.uint32).T
assert new_indices.shape == self._glsim._active_meshes_array.shape, "Shape mismatch in _convert_cached_bcs_params_indices"
return np.ascontiguousarray(np.where(new_indices >= 0, new_indices, self._glsim._active_meshes_array))
[docs]
def _add_new_bc_param(self, i:int, j:int):
if self._cached_cell_param_pr_bcs_values is None or self._cached_cell_param_or_bcs_indices is None:
self._cell_bcs_params_values_indices()
idx = self._cached_cell_param_or_bcs_indices[i,j]
if idx < 0:
# Need to add a new BC
new_idx = self._cached_cell_param_pr_bcs_values.shape[0]
new_row = np.full((1, self._cached_cell_param_pr_bcs_values.shape[1]), BC_BLANK_VALUE, dtype=np.float32)
self._cached_cell_param_pr_bcs_values = np.vstack((self._cached_cell_param_pr_bcs_values, new_row))
self._cached_cell_param_or_bcs_indices[i,j] = new_idx
self._param_new_bc_was_set = True
return new_idx
else:
return idx
[docs]
def get_BC_value(self, i:int, j: int, param_type: BoundaryConditionIndices, zero_based: bool = False) -> float:
""" Get the cell parameter at location (i,j) in simulation coordinates.
:param i: The i index (0-based or 1-based) in simulation coordinates.
:param j: The j index (0-based or 1-based) in simulation coordinates.
:param param_type: The type of parameter to get. See
`BoundaryConditionIndices` enum for possible values.
:param zero_based: If `True`, then (i,j) are 0-based indices. If
`False`, then (i,j) are 1-based indices.
:return: The cell parameter value at location (i,j).
"""
assert isinstance(param_type, BoundaryConditionIndices), "param_type must be a BoundaryConditionIndices enum value"
slices = self._active_zones_as_slices
if not zero_based:
i -= 1
j -= 1
assert slices[0].start <= i < slices[0].stop, f"You are outside the active zone: i={i} must be in [{slices[0].start},{slices[0].stop}["
assert slices[1].start <= j < slices[1].stop, f"You are outside the active zone: j={j} must be in [{slices[1].start},{slices[1].stop}["
values, indices = self._cell_bcs_params_values_indices()
"""// IMPORTANT! `bc_index` must have been initialized. This is done in the
// compute_derivative function which should be one of the topmost one (so
// bc_index should be available in many places)
uint bc_index = weak_bcs[global_cell_index]; """
ndx = indices[i,j]
if ndx < 0:
return 99999.0
return values[ndx, param_type.value]
[docs]
def get_bridge_roof_parameter(self, i:int, j: int, zero_based: bool = False) -> float:
""" Get the cell parameter at location (i,j) in simulation coordinates.
:param i: The i index (0-based or 1-based) in simulation coordinates.
:param j: The j index (0-based or 1-based) in simulation coordinates.
:param zero_based: If `True`, then (i,j) are 0-based indices. If
`False`, then (i,j) are 1-based indices.
:return: The cell parameter value at location (i,j).
"""
slices = self._active_zones_as_slices
if not zero_based:
i -= 1
j -= 1
assert slices[0].start <= i < slices[0].stop, f"You are outside the active zone: i={i} must be in [{slices[0].start},{slices[0].stop}["
assert slices[1].start <= j < slices[1].stop, f"You are outside the active zone: j={j} must be in [{slices[1].start},{slices[1].stop}["
values, indices = self._cell_bcs_params_values_indices()
param_type = BoundaryConditionIndices.BC_TABLE_INDEX_FOR_BRIDGE_ROOF_ELEVATION.value
"""// IMPORTANT! `bc_index` must have been initialized. This is done in the
// compute_derivative function which should be one of the topmost one (so
// bc_index should be available in many places)
uint bc_index = weak_bcs[global_cell_index]; """
ndx = indices[i,j]
if ndx < 0:
return 99999.0
return values[ndx, param_type]
[docs]
def get_bridge_roof_in_active_zone_as_ijval(self, zero_based: bool = False) -> Tuple[np.ndarray, np.ndarray]:
""" Get all the bridge roof elevations over the active zone."""
slices = self._active_zones_as_slices
values, indices = self._cell_bcs_params_values_indices()
param_type = BoundaryConditionIndices.BC_TABLE_INDEX_FOR_BRIDGE_ROOF_ELEVATION.value
sub_indices = indices[slices]
ij = np.where(sub_indices >= 0)
col = values[:, param_type]
values = col[sub_indices[ij[0][:], ij[1][:]]]
# Keep only ij and values if value is not 99999.0
valid_mask = (values != BC_BLANK_VALUE) & (values != 99999.0)
ij = (ij[0][valid_mask], ij[1][valid_mask])
values = values[valid_mask]
if zero_based:
return ij, values
else:
return (ij[0] + 1, ij[1] + 1), values
[docs]
def get_bridge_roof_in_active_zone_as_array(self) -> np.ndarray:
""" Get all the bridge roof elevations over the active zone as a full array."""
slices = self._active_zones_as_slices
ij, values = self.get_bridge_roof_in_active_zone_as_ijval(zero_based=True)
param_values = np.zeros((slices[0].stop - slices[0].start,
slices[1].stop - slices[1].start),
dtype=np.float32)
param_values[ij[0][:] - slices[0].start, ij[1][:] - slices[1].start] = values
return param_values
[docs]
def _get_BCs_in_active_zone_as_ijval(self, param_type: BoundaryConditionIndices, zero_based: bool = False) -> np.ndarray:
""" Get all the cell parameters of type `param_type` over the active zone.
:param param_type: The type of parameter to get. See
`BoundaryConditionIndices` enum for possible values.
:return: A numpy array with the cell parameters of type `param_type`
over the active zone.
"""
slices = self._active_zones_as_slices
values, indices = self._cell_bcs_params_values_indices()
param_type_value = param_type.value
sub_indices = indices[slices] # 1-based bc indices over active zone
ij = np.where(sub_indices >= 0)
col = values[:, param_type_value]
values = col[sub_indices[ij[0][:], ij[1][:]]]
# Keep only ij and values if value is not 99999.0
valid_mask = (values != BC_BLANK_VALUE) & (values != 99999.0)
ij = (ij[0][valid_mask], ij[1][valid_mask])
values = values[valid_mask]
if zero_based:
return ij, values
else:
return (ij[0] + 1, ij[1] + 1), values
[docs]
def get_BCs_in_active_zone_as_ijval(self, param_type:BoundaryConditionsTypes, direction: Direction, zero_based: bool = False) -> Tuple[np.ndarray, np.ndarray]:
""" Get all the cell parameters of type `param_type` over the active zone.
:param param_type: The type of parameter to get. See
`BoundaryConditionsTypes` enum for possible values.
:param direction: The direction of the boundary condition.
:return: A numpy array with the cell parameters of type `param_type`
"""
return self._get_BCs_in_active_zone_as_ijval(BoundaryConditionIndices.index_for_bc_type(param_type, direction), zero_based= zero_based)
[docs]
def get_BCs_in_active_zone_as_dict(self, zero_based: bool = False) -> dict:
ret = {}
for bctype in BoundaryConditionsTypes:
for direction in Direction:
ij, values = self.get_BCs_in_active_zone_as_ijval(bctype, direction, zero_based=zero_based)
ret[(bctype, direction)] = (ij, values)
return ret
[docs]
def get_potential_BCs_in_active_zone(self, zero_based: bool = False) -> dict:
slices = self._active_zones_as_bcs_slices
def bc_on_left(i,j, nap):
return nap[i-1,j] ^ nap[i,j]
def bc_on_bottom(i,j, nap):
return nap[i,j-1] ^ nap[i,j]
nap = self._glsim._active_meshes_array.T
ret = {}
for direction, func in zip(Direction, [bc_on_left, bc_on_bottom]):
ret[direction] = [(i,j) for i in range(slices[0].start, slices[0].stop) for j in range(slices[1].start, slices[1].stop) if func(i,j,nap)]
if zero_based:
return ret
else:
ret_1based = {}
for direction in ret:
ret_1based[direction] = [(i+1,j+1) for (i,j) in ret[direction]]
return ret_1based
[docs]
def get_BCs_values_in_active_zone_as_array(self, param_type: BoundaryConditionIndices) -> np.ndarray:
""" Get all the cell parameters of type `param_type` over the active zone as a full array.
:param param_type: The type of parameter to get. See
`BoundaryConditionIndices` enum for possible values.
:return: A numpy array with the cell parameters of type `param_type`
over the active zone.
"""
slices = self._active_zones_as_slices
ij, values = self._get_BCs_in_active_zone_as_ijval(param_type, zero_based=True)
param_values = np.zeros((slices[0].stop - slices[0].start,
slices[1].stop - slices[1].start),
dtype=np.float32)
col = values[:, param_type.value]
param_values[ij[0][:] - slices[0].start, ij[1][:] - slices[1].start] = col
return param_values
[docs]
def set_bridge_roof_parameter(self, i:int, j: int, value: float, zero_based: bool = False) -> None:
""" Set the cell parameter at location (i,j) in simulation coordinates.
:param i: The i index (0-based or 1-based) in simulation coordinates.
:param j: The j index (0-based or 1-based) in simulation coordinates.
:param value: The value to set.
:param zero_based: If `True`, then (i,j) are 0-based
indices. If `False`, then (i,j) are 1-based indices.
"""
self._set_BC_or_parameter_value(i, j,
BoundaryConditionIndices.BC_TABLE_INDEX_FOR_BRIDGE_ROOF_ELEVATION,
value,
zero_based,
check_neighbor=False)
[docs]
def set_bridge_roof_parameter_from_array(self, values: np.ndarray | WolfArray) -> None:
""" Set the bridge roof elevation parameter over the active zone from an array.
:param values: The array of values to set. Its shape must match the
active zone shape.
"""
if isinstance(values, WolfArray):
values = values.array.data
assert values.shape == (self._active_zones_as_slices[0].stop - self._active_zones_as_slices[0].start,
self._active_zones_as_slices[1].stop - self._active_zones_as_slices[1].start), \
f"The shape of the array you give {values.shape} is not what I expected {(self._active_zones_as_slices[0].stop - self._active_zones_as_slices[0].start, self._active_zones_as_slices[1].stop - self._active_zones_as_slices[1].start)}"
ij = np.argwhere(values != 99999.0)
for index in ij:
self.set_bridge_roof_parameter(index[0] + self._active_zones_as_slices[0].start,
index[1] + self._active_zones_as_slices[1].start,
values[index[0], index[1]],
zero_based=True)
if self._cached_cell_param_or_bcs_indices is None:
self._cell_bcs_params_values_indices()
# Nullify all parameters that were defined previously but are not anymore
# We do not reduce the number of BCs, we just set the parameter to 99999.0
ij = np.argwhere((self._cached_cell_param_or_bcs_indices >= 0 ) & (values == 99999.0))
for index in ij:
self._set_BC_or_parameter_value(index[0],
index[1],
BoundaryConditionIndices.BC_TABLE_INDEX_FOR_BRIDGE_ROOF_ELEVATION,
99999.0,
zero_based=True,
check_neighbor=False)
[docs]
def _set_BC_or_parameter_value(self, i:int, j: int,
param_type: BoundaryConditionIndices,
value: float,
zero_based: bool = False,
check_neighbor: bool = True) -> None:
""" Set the cell parameter at location (i,j) in simulation coordinates.
:param i: The i index (0-based or 1-based) in simulation coordinates.
:param j: The j index (0-based or 1-based) in simulation coordinates.
:param param_type: The type of parameter to set. See
`BoundaryConditionIndices` enum for possible values.
:param value: The value to set.
:param zero_based: If `True`, then (i,j) are 0-based
indices. If `False`, then (i,j) are 1-based indices.
"""
assert isinstance(param_type, BoundaryConditionIndices), "param_type must be a BoundaryConditionIndices enum value"
slices = self._active_zones_as_slices
if not zero_based:
i -= 1
j -= 1
assert slices[0].start <= i < slices[0].stop, f"You are outside the active zone: i={i} must be in [{slices[0].start},{slices[0].stop}["
assert slices[1].start <= j < slices[1].stop, f"You are outside the active zone: j={j} must be in [{slices[1].start},{slices[1].stop}["
nap = self._glsim._active_meshes_array.T
if check_neighbor:
if param_type in BC_ON_LEFT:
if not(nap[i-1,j] ^ nap[i,j]):
raise RuntimeError(f"You are trying to set a BC parameter on the left boundary of a mesh at cell ({i},{j}) which is not allowed.")
elif param_type in BC_ON_BOTTOM:
if not(nap[i,j-1] ^ nap[i,j]):
raise RuntimeError(f"You are trying to set a BC parameter on the bottom boundary of a mesh at cell ({i},{j}) which is not allowed.")
values, indices = self._cell_bcs_params_values_indices()
"""// IMPORTANT! `bc_index` must have been initialized. This is done in the
// compute_derivative function which should be one of the topmost one (so
// bc_index should be available in many places)
uint bc_index = weak_bcs[global_cell_index]; """
ndx = indices[i,j]
if ndx < 0:
ndx = self._add_new_bc_param(i + slices[0].start, j + slices[1].start)
self._cached_cell_param_pr_bcs_values[ndx, param_type.value] = value
else:
values[ndx, param_type.value] = value
self._param_bc_was_set = True
[docs]
def set_BC_value(self, i:int, j: int,
param_type: BoundaryConditionsTypes,
direction: Direction,
value: float,
zero_based: bool = False) -> None:
""" Set the cell parameter at location (i,j) in simulation coordinates.
:param i: The i index (0-based or 1-based) in simulation coordinates.
:param j: The j index (0-based or 1-based) in simulation coordinates.
:param param_type: The type of parameter to set. See
`BoundaryConditionIndices` enum for possible values.
:param value: The value to set.
:param zero_based: If `True`, then (i,j) are 0-based
indices. If `False`, then (i,j) are 1-based indices.
"""
assert isinstance(param_type, BoundaryConditionsTypes), "param_type must be a BoundaryConditionsTypes enum value"
assert isinstance(direction, Direction), "direction must be a Direction enum value"
self._set_BC_or_parameter_value(i, j,
BoundaryConditionIndices.index_for_bc_type(param_type, direction),
value,
zero_based,
check_neighbor=True)
[docs]
def _h_qx_qy_cache(self):
if self._cached_h_qx_qy is None:
self._cached_h_qx_qy = self._glsim._read_full_quantity_result(self._simulation_current_quantity)
return self._cached_h_qx_qy
[docs]
def get_h(self):
""" Get the h unknown over the zone of interest.
:return: The h values over the zone of interest.
"""
return self._h_qx_qy_cache()[self._active_zones_as_slices[1], self._active_zones_as_slices[0], 0].T
[docs]
def set_h(self, h : np.ndarray | WolfArray):
""" Set the h unknown over the zone of interest.
:param h: The h values to set. The array size must be equal to the size of the
zone of interest.
"""
if isinstance(h, WolfArray):
h = h.array.data
self._ensure_array_size(h)
self._h_qx_qy_was_set = True
self._h_qx_qy_cache()[self._active_zones_as_slices[1], self._active_zones_as_slices[0],0] = h.T
[docs]
def get_qx(self):
""" Get the Qx unknown over the zone of interest.
:return: The Qx values over the zone of interest.
"""
return self._h_qx_qy_cache()[self._active_zones_as_slices[1], self._active_zones_as_slices[0],1].T
[docs]
def set_qx(self, qx : np.ndarray | WolfArray):
""" Set the Qx unknown over the zone of interest.
:param qx: The Qx values to set. The array size must be equal to the size of the
zone of interest.
"""
if isinstance(qx, WolfArray):
qx = qx.array.data
self._ensure_array_size(qx)
self._h_qx_qy_was_set = True
self._h_qx_qy_cache()[self._active_zones_as_slices[1], self._active_zones_as_slices[0],1] = qx.T
[docs]
def get_qy(self):
""" Get the Qy unknown over the zone of interest.
:return: The Qy values over the zone of interest.
"""
return self._h_qx_qy_cache()[self._active_zones_as_slices[1], self._active_zones_as_slices[0],2].T
[docs]
def set_qy(self, qy : np.ndarray | WolfArray):
""" Set the Qy unknown over the zone of interest.
:param qy: The Qy values to set. The array size must be equal to the size of the
zone of interest.
"""
if isinstance(qy, WolfArray):
qy = qy.array.data
self._ensure_array_size(qy)
self._h_qx_qy_was_set = True
self._h_qx_qy_cache()[self._active_zones_as_slices[1], self._active_zones_as_slices[0],2] = qy.T
[docs]
def get_froude(self):
""" Get the Froude number over the zone of interest.
:return: The Froude number values over the zone of interest.
"""
h_qx_qy = self._h_qx_qy_cache()[self._active_zones_as_slices[1], self._active_zones_as_slices[0],:]
h = h_qx_qy[:,:,0].T
qx = h_qx_qy[:,:,1].T
qy = h_qx_qy[:,:,2].T
gravity = 9.81
froude = np.where(h>0.0, np.sqrt(qx**2 + qy**2) / h / np.sqrt(gravity * h), 0.0)
return froude
[docs]
def get_head(self):
""" Get the water head over the zone of interest.
:return: The water head values over the zone of interest.
"""
b = self.get_bathymetry()
h_qx_qy = self._h_qx_qy_cache()[self._active_zones_as_slices[1], self._active_zones_as_slices[0],:]
h = h_qx_qy[:,:,0].T
qx = h_qx_qy[:,:,1].T
qy = h_qx_qy[:,:,2].T
head = np.where(h>0.0, b + h + (qx**2 + qy**2) / (2 * 9.81 * h), 0.0)
return head
[docs]
def get_wolfarray(self, which_quantity: Literal['h', 'qx', 'qy',
'froude', 'head',
'infiltration', 'manning'],
mask_nodata:bool = True,
mask_null_h:bool = False) -> WolfArray:
""" Get a wolf array over the zone of interest.
:param which_quantity: The quantity to get.
:return: The wolf array values over the zone of interest.
"""
assert which_quantity in ['h', 'qx', 'qy', 'bathy', 'froude', 'head', 'infiltration', 'manning'], f"Unsupported quantity: {which_quantity}"
null_value = 0.
if which_quantity == 'bathy':
full_array = self.get_bathymetry()
null_value = 99999.
elif which_quantity == 'h':
full_array = self.get_h()
elif which_quantity == 'qx':
full_array = self.get_qx()
elif which_quantity == 'qy':
full_array = self.get_qy()
elif which_quantity == 'froude':
full_array = self.get_froude()
elif which_quantity == 'head':
full_array = self.get_head()
elif which_quantity == 'infiltration':
full_array = self.get_infiltration_zones()
elif which_quantity == 'manning':
full_array = self.get_mannings_n()
ret = WolfArray(srcheader=self._active_zone, np_source=full_array,
nullvalue=null_value, masknull=mask_nodata)
if mask_null_h:
if which_quantity is not 'h':
h_array = self.get_h()
ret.array.mask[:,:] = (ret.array.mask) & (h_array == 0.)
return ret
[docs]
def set_wolfarray(self, which_quantity: Literal['bathy', 'h',
'qx', 'qy',
'infiltration', 'manning'], wa: WolfArray) -> None:
""" Set a wolf array over the zone of interest.
:param which_quantity: The quantity to set.
:param wa: The wolf array to set over the zone of interest.
"""
assert which_quantity in ['bathy', 'h', 'qx', 'qy', 'infiltration', 'manning'], f"Unsupported quantity: {which_quantity}"
assert isinstance(wa, WolfArray), "You must provide a WolfArray instance"
if which_quantity == 'bathy':
self.set_bathymetry(wa.array.data)
elif which_quantity == 'h':
self.set_h(wa.array.data)
elif which_quantity == 'qx':
self.set_qx(wa.array.data)
elif which_quantity == 'qy':
self.set_qy(wa.array.data)
elif which_quantity == 'infiltration':
self.set_infiltration_zones(wa.array.data)
elif which_quantity == 'manning':
self.set_mannings_n(wa.array.data)
[docs]
def get_result_path(self) -> str:
""" Get the path to the result folder for this simulation.
:return: The path to the result folder.
"""
return self._glsim.res
# def get_infiltration_chronology(self) -> InfiltrationChronology:
# if self._infiltration_chronology:
# return self._infiltration_chronology
# else:
# raise RuntimeError("You can't query the infiltrations if none was set in the simulation")
# def set_infiltration_chronology(self, chrono: InfiltrationChronology):
# """ Set a new infiltration chronology in the simulator. WARNING! The
# infitlration chronology stricture (number of zones, number of
# chronology entries) must be left untouched. So you can't add zones
# nor entries in the chronology.
# """
# if self._infiltration_chronology is not None:
# assert chrono.nb_zones == self._glsim.nb_infiltration_zones, "You have changed the number of infiltration zones. It's not supported."
# assert chrono.nb_entries == self._glsim.nb_infiltration_lines, "You have changed the number of entries in the infiltration chronology. It's not supported, you can only modify existing entries."
# self._infiltration_chronology_was_set = True
# self._infiltration_chronology = chrono
# else:
# raise RuntimeError("You can't set the infiltrations if none was set in the simulation")
[docs]
class SimulationInjector(ABC):
"""
Injectors allow small python scripts to be run at several point in time
during the simulation. The goal of these script should be to update the
simulation in real time, potentially affecting the simulation outcome.
Injectors are called between simulation steps (that is after the end of a
step and before the beginning of the next step).
"""
@classmethod
@abstractmethod
[docs]
def compatibility(self):
"""
Give the versions of Python and wolfgpu this injector should work with.
The versions numbers can be expressed as version constraints. In that
case they must match
[SimpleSpec](https://python-semanticversion.readthedocs.io/en/latest/reference.html?highlight=simplespec#semantic_version.SimpleSpec)
of the [semantic_version](https://pypi.org/project/semantic-version/)
package.
:rtype: This method returns a dictionary with two entries "python" and
"wolfgpu". For example:
```
return {
"python": ">=3.11",
"wolfgpu": ">=1.4"
}
```
"""
pass
@abstractmethod
[docs]
def inject(self, sim_proxy:SimulationProxy) -> None:
""" This method will starting from the `time_to_first_injection` or
after the time it itself returned.
:return: Return the time the simulator must wait before calling this
method again. Return `None` if you don't want to be called anymore.
:rtype: SimulationDuration
"""
pass
@abstractmethod
[docs]
def do_report(self, proxy: SimulationProxy, rs: ResultsStore, record_path:Path) -> None:
""" This method will propose to write some reporting data.
:return: Nothing.
:rtype: None
"""
pass
@classmethod
[docs]
def make_from_dict(self, json_dict: dict) -> "SimulationInjector":
"""
Given a dictionary, create a brand new injector.
This method will be called by the simulator to build the injector.
:param json_dict: A `dict` that is JSON-serializable.
:rtype: SimulationInjector
"""
pass
[docs]
def get_params(self) -> dict:
"""
Create a dictionary from an existing injector. This method will be
called by the simulator to save the injector.
:rtype: A JSON-serializable dict. In practice, that means that the dict
values must be either dict or simple types: int, float, string.
Arrays are currently excluded. If you need them, use relative paths
to files.
"""
# For the moment, we don't expect the injectors to save themselves. Writing
# the json will be done at the simulation level.
return {}
[docs]
def default_dict_active_zone(which_type: Literal["header_wolf", "bounds"] = "header_wolf") -> dict:
""" Get a default active zone dictionary that can be used to create a
header_wolf.
:param which_type: The type of data to save: 'header_wolf' or 'bounds'.
:return: A dictionary that can be used to create a header_wolf.
"""
assert which_type in ['header_wolf', 'bounds'], "type_towhich_type_save must be either 'header_wolf' or 'bounds'"
if which_type == 'header_wolf':
d = {"type": which_type,
"origx": 0.0,
"origy": 0.0,
"dx": 1.0,
"dy": 1.0,
"nbx": 1,
"nby": 1,}
else:
d = {"type": which_type,
"minx": 0.0,
"maxx": 1.0,
"miny": 0.0,
"maxy": 1.0,}
return d
[docs]
def get_injector_dict(injector_class: SimulationInjector,
type_duration:Literal['time','step'],
duration:float | int,
active_zone:header_wolf | dict | None = None,
source_file:str | Path | None = None) -> dict:
""" Get a default injector dictionary that can be used to create a
TimeBasedInjector or StepBasedInjector.
:param type_duration: The type of duration: 'time' or 'step'.
:param duration: The duration value.
:return: A dictionary that can be used to create an injector.
"""
assert issubclass(injector_class, SimulationInjector), "injector_class must be a subclass of SimulationInjector"
assert type_duration in ['time','step'], "type_duration must be either 'time' or 'step'"
assert isinstance(duration, (int, float)), "duration must be an int or a float"
assert active_zone is None or isinstance(active_zone, (header_wolf, dict)), "active_zone must be either None or a header_wolf instance"
ret = {}
ret['type'] = injector_class.__name__
if active_zone is None:
hdr = header_wolf()
ret['active_zone'] = hdr.to_dict(type_to_save='bounds')
elif isinstance(active_zone, dict):
assert "header_type" in active_zone, "active_zone dict must have a 'header_type' key"
if active_zone['header_type'] == 'header_wolf':
assert all (k in active_zone for k in ['origx', 'origy', 'dx', 'dy', 'nbx', 'nby']), "active_zone dict must have keys 'origx', 'origy', 'dx', 'dy', 'nbx', 'nby'"
elif active_zone['header_type'] == 'bounds':
assert all (k in active_zone for k in ['minx', 'maxx', 'miny', 'maxy']), "active_zone dict must have keys 'minx', 'maxx', 'miny', 'maxy'"
else:
raise ValueError("active_zone dict 'header_type' key must be either 'header_wolf' or 'bounds'")
ret['active_zone'] = active_zone
else:
ret['active_zone'] = active_zone.to_dict(type_to_save='bounds')
if type_duration == 'time':
ret['time_to_first_injection'] = SimulationDuration(SimulationDurationType.SECONDS, duration).to_dict()
else:
ret['time_to_first_injection'] = SimulationDuration(SimulationDurationType.STEPS, duration).to_dict()
if source_file is not None:
source_file = Path(source_file)
if not source_file.exists():
raise FileNotFoundError(f"The source file '{source_file}' does not exist.")
ret['source'] = "file://" + str(source_file.absolute())
return ret