"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
from __future__ import annotations
# Run this as python -m wolfgpu.simple_simulation else you'll enter a
# dependency/import Alice In Wonderland loop.
import json
from enum import Enum
from pathlib import Path
from datetime import datetime
from typing import Union, List, Tuple
import numpy as np
import numpy.typing as npt
from scipy.ndimage import labeled_comprehension
import logging
from copy import deepcopy
from .utils import parse_duration_to_seconds
from wolfhece.PyParams import Wolf_Param, Type_Param
from wolfhece.PyTranslate import _
# PEP-8 way of making a version number available.
from .version import __version__
[docs]
class BoundaryConditionIndices(Enum):
""" The column indices of each supported boundary conditions in the
boundary conditions table. This also covers cell parameters.
IMPORTANT: These values are tied to data representation in the shader.
Don't change them!
These are separate from the `BoundaryConditionsTypes` because they don't serve the same purpose.
These are for data representation in wolfgpu whereas `BoundaryConditionsTypes` are for CPU
Wolf compatibility.
`BoundaryConditionsTypes` and `CellParameterType` must all be converted to
this enum when loading the sim in the gpu.
"""
[docs]
BC_TABLE_INDEX_FOR_H_ON_LEFT = 1
[docs]
BC_TABLE_INDEX_FOR_HMOD_ON_LEFT = 2
[docs]
BC_TABLE_INDEX_FOR_QX_ON_LEFT = 0
[docs]
BC_TABLE_INDEX_FOR_QY_ON_LEFT = 3 # Qy on a Y horizontal border
[docs]
BC_TABLE_INDEX_FOR_FROUDE_NORMAL_ON_LEFT = 8
[docs]
BC_TABLE_INDEX_FOR_H_ON_BOTTOM = 4
[docs]
BC_TABLE_INDEX_FOR_HMOD_ON_BOTTOM = 5
[docs]
BC_TABLE_INDEX_FOR_QX_ON_BOTTOM = 6 # Qx applied to a bottom border
[docs]
BC_TABLE_INDEX_FOR_QY_ON_BOTTOM = 7 # Qy applied to a bottom border
[docs]
BC_TABLE_INDEX_FOR_FROUDE_NORMAL_ON_BOTTOM = 9
[docs]
BC_TABLE_INDEX_FOR_BRIDGE_DECK_ELEVATION = 10
[docs]
BC_TABLE_INDEX_FOR_BRIDGE_ROOF_ELEVATION = 11
[docs]
BC_BLANK_VALUE=np.float32(-9_999_999) # Exactly represented by float32
[docs]
NB_BC_TYPES = len(BoundaryConditionIndices)
[docs]
class CellParameterType(Enum):
""" Represents the type of a cell parameter.
"""
[docs]
BRIDGE_DECK_ELEVATION = 1
[docs]
BRIDGE_ROOF_ELEVATION = 2
# FIXME This should be imported from WolfHECE
[docs]
class BoundaryConditionsTypes(Enum):
"""The boundary conditions types.
Note: The values match the numbers in Wolf's simulations parameters.
"""
""" Weak bondary condition over :math:`h+b`, the water free surface height
(that is, height of water plus bathymetry level).
"""
""" Weak bondary condition over :math:`q_x`, the discharge over the x-axis.
"""
""" Weak bondary condition over :math:`q_y`, the discharge over the x-axis.
"""
""" This is used to represent the absence of boundary condition.
"""
""" Not used.
"""
""" Not used.
"""
""" Weak bondary condition over :math:`h`, the water free surface height
(that is, height of water, measured from the top of the bathymetry).
"""
""" Weak boundary condition for Froude limited height. Water height becomes
:math:`\\min(h, \\frac{u^2}{g \\times \\text{Fr}^2})` where
:math:`\\text{Fr}` is given.
"""
# FIXME This *MUST* be imported from WolfHECE
[docs]
class boundary_condition_2D:
'''
Type des CL générales, faibles et fortes
@author Pierre Archambeau
'''
def __init__(self, i: int, j: int, ntype: BoundaryConditionsTypes, val: float, direction: Direction) -> None:
from wolfhece.mesh2d.cst_2D_boundary_conditions import BCType_2D_GPU, Direction as hece_Direction
assert i >= 1
assert j >= 1
assert isinstance(ntype, BoundaryConditionsTypes | BCType_2D_GPU)
assert isinstance(direction, Direction | hece_Direction)
self.i = i # indice de colonne dans le plus petit maillage
self.j = j # indice de ligne dans le plus petit maillage
self.ntype = ntype # type de cl (h=1,qx=2,qy=3,rien=4,qbx=5,qby=6,hmod=7,fr=8)
self.val = val # valeur à imposer
self.direction = direction
[docs]
def to_dict(self):
return {
"i" : self.i,
"j" : self.j,
"type": self.ntype.name,
"val" : float(self.val), # float makes sure the load and save to JSON has same format ("xxx.yyy") event when passing int's.
"direction": self.direction.name,
}
@classmethod
[docs]
def from_dict(kls, js):
return boundary_condition_2D(js["i"], js["j"], BoundaryConditionsTypes[js["type"]], float(js["val"]), Direction[js["direction"]])
[docs]
class CellParameter:
"""
Cell parameter
"""
def __init__(self, i: int, j: int, ntype: CellParameterType, val: float) -> None:
assert i >= 1, "Coordinates of cells are one-based"
assert j >= 1, "Coordinates of cells are one-based"
assert isinstance(ntype, CellParameterType)
self.i = i # indice de colonne dans le plus petit maillage
self.j = j # indice de ligne dans le plus petit maillage
self.ntype = ntype # type de cl (h=1,qx=2,qy=3,rien=4,qbx=5,qby=6,hmod=7,fr=8)
self.val = val # valeur à imposer
[docs]
def to_dict(self):
return {
"i" : self.i,
"j" : self.j,
"type": self.ntype.name,
"val" : float(self.val), # float makes sure the load and save to JSON has same format ("xxx.yyy") event when passing int's.
}
@classmethod
[docs]
def from_dict(kls, js):
return CellParameter(js["i"], js["j"], CellParameterType[js["type"]], float(js["val"]))
[docs]
class SimulationDurationType(Enum):
""" A type of duration.
This was introduced to abstract the duration of a simulation.
"""
# Note that the way we put it, there's no way you can do
# a simulation that has no planned finite time (it's time
# step could be infinitely small though).
""" A duration expressed in seconds. """
""" A duration expressed in steps. """
[docs]
class ReportFrequencyType(Enum):
""" When specifying the report period, one can
use two different units.
"""
# FIXME Can't we just use a simulationduration ?
# These are the values used by Wolf
""" Report period in iterations """
""" Report period in simulation time (seconds) """
from functools import total_ordering
@total_ordering
[docs]
class SimulationDuration:
""" In the GPU simulator, we represent durations about the simulation
either as number of simulation steps either as simulation time (in seconds).
This class can represent both.
"""
def __init__(self, _type:SimulationDurationType, duration: Union[float, int]):
assert isinstance(_type, SimulationDurationType)
#assert duration > 0, f"A duration must be strictly positive. You gave {duration}."
if _type == SimulationDurationType.STEPS:
assert type(duration) is int, "Simulation total number of steps must be an integer"
self.type = _type
self.duration = duration # Expressed in seconds
def __str__(self):
if self.type == SimulationDurationType.STEPS:
if self.duration <= 1:
return f"{self.duration} step"
else:
return f"{self.duration} steps"
elif self.type == SimulationDurationType.SECONDS:
return f"{self.duration:g} s."
[docs]
def to_dict(self):
return {
"type": self.type.name,
"duration": self.duration
}
[docs]
def from_dict(self, d):
self.type = SimulationDurationType[d["type"]]
if self.type == SimulationDurationType.SECONDS:
self.duration = float(d["duration"])
elif self.type == SimulationDurationType.STEPS:
self.duration = int(d["duration"])
else:
raise Exception(f"Unexpected duration type, you gave {self.type}")
def __add__(self, value:"SimulationDuration"):
assert self.type == value.type, "You are mixing different types of durations"
return SimulationDuration( self.type, self.duration + value.duration)
def __eq__(self, value: object) -> bool:
return self.type == value.type and self.duration == value.duration
def __lt__(self, value: "SimulationDuration") -> bool:
if self.type == SimulationDurationType.SECONDS and value.type == SimulationDurationType.SECONDS or \
self.type == SimulationDurationType.STEPS and value.type == SimulationDurationType.STEPS:
return self.duration < value.duration
else:
raise Exception(f"Can't compare SimulationDuration objects. {self.type} != {value.type}")
@classmethod
[docs]
def from_seconds(cls, seconds) -> SimulationDuration:
return SimulationDuration(SimulationDurationType.SECONDS, parse_duration_to_seconds(seconds))
@classmethod
[docs]
def from_steps(cls, steps) -> SimulationDuration:
return SimulationDuration(SimulationDurationType.STEPS, int(steps))
@classmethod
[docs]
def from_str(cls, d: str) -> SimulationDuration:
# An integer is a number of steps
# Anything else (float or hms) is a duration
try:
steps = int(d)
except ValueError:
return cls.from_seconds(d)
return cls.from_steps(steps)
@classmethod
[docs]
def zero_like(cls, d) -> SimulationDuration:
return SimulationDuration(d.type, 0)
[docs]
def is_zero(self):
return self.duration == 0
[docs]
class TimeStepStrategy(Enum):
""" Describes the way the time step is computed on each simulation step.
"""
""" The time step is constant over all simulation steps.
"""
""" The time step is the maximum of the minimum time steps authorized
by the Courant formula.
"""
[docs]
class InfiltrationInterpolation(Enum):
""" Describes the way the infiltration are computed across
an infiltration time interval."""
# These are the values used by Wolf
[docs]
NONE = 0 # Constant value over time interval
"""
The infiltration is constant over the infiltration time interval.
"""
[docs]
LINEAR = 1 # Linear interpolation over the interval
"""
The infiltration is interpolated over the infiltration time interval.
The initial value is the one associated with interval, the final value
is the one associated with the next interval.
"""
[docs]
class InfiltrationChronology:
"""
This is optimized for access where inserts and reads are not
frequently mixed togehter.
"""
def __init__(self):
self._chronology: dict[float, list[float]] = dict()
self._nb_zones = None
self._timeline = None
# self._dirty indicates that he times in the chronology do not
# correspond to the _timeline array anymore. Therefore, _timeline must
# be rebuilt asap.
self._dirty = False
[docs]
def _sorted_timeline(self):
if self._dirty or self._timeline is None:
self._dirty = False
self._timeline = np.array(sorted(self._chronology.keys()))
return self._timeline
def __str__(self):
s = ""
for ndx, chrono_item in enumerate(self._chronology.items()):
time, infiltrations_for_zone = chrono_item
s += f"{ndx} At {time:.3f} s. {','.join([f'{i:.2f}' for i in infiltrations_for_zone])}\n"
return s
@classmethod
[docs]
def from_array(klass, npa):
ic = InfiltrationChronology()
for row in npa:
ic.set(row[0], row[1:].tolist())
return ic
@classmethod
[docs]
def from_list(klass, chronology:list[float,list[float]]):
ic = InfiltrationChronology()
for row in chronology:
ic.set(row[0], row[1])
return ic
@property
[docs]
def nb_zones(self) -> int:
return self._nb_zones
@property
[docs]
def nb_entries(self) -> int:
return len(self._chronology)
def __iter__(self):
self._times_iterator = iter(self._sorted_timeline())
return self
def __next__(self):
k = next(self._times_iterator)
return k, self._chronology[k]
def __len__(self):
return len(self._chronology)
[docs]
def to_array(self):
array = []
for t in sorted(self._chronology.keys()):
row = [t] + self._chronology[t]
array.append(row)
return np.array(array, dtype=np.float32)
[docs]
def get_row_at_time(self, time: float) -> Tuple[float, List[float]]:
""" Get infiltration chronology row active at time `time`.
:param time: a positive number of seconds.
"""
assert time >= 0, "Time must be a positive number of seconds"
chronology_entry_time = self._get_closest_row_time_by_time(time)
if chronology_entry_time is not None:
return (chronology_entry_time, self._chronology[chronology_entry_time])
else:
return None
[docs]
def get_active_entry_start_time(self, time: float) -> float:
return self._get_closest_row_time_by_time(time)
[docs]
def set(self, time: float, quantities:list[float]):
""" Sets or replaces an entry in the chronology.
:param time: the moment at which the water must be injected. It is
expressed in seconds. If there's already something at that moment,
it will be discarded.
:param quantities: A list giving the quantities of water to inject at
the given `time`. Each quantity in the list corresponds to a
infiltration zone of the infiltration map.
"""
assert type(quantities) is list, f"I want a list for the quantities, not a {type(quantities)}"
if self._nb_zones is None:
assert len(quantities) >= 1, "There's no point in setting an empty list of quantities"
self._nb_zones = len(quantities)
elif len(quantities) != self._nb_zones:
raise ValueError(f"The number of infiltration zones is different than previous ones. I was expecting {self._nb_zones} but you gave {len(quantities)}")
self._dirty = time not in self._chronology
self._chronology[time] = quantities
[docs]
def _get_closest_row_time_by_time(self, time: float):
""" Can return None if no row is active. This happens when `time`
is before all the chronology entries.
"""
ndx = np.searchsorted(self._sorted_timeline(), time, side="right")
if ndx >= 1:
return self._sorted_timeline()[ndx - 1]
else:
return None
[docs]
class SimpleSimulation:
""" A simple simulation scenario.
IMPORTANT: althoug we use numpy arrays to represent various matrices,
be careful: we use shapes as (nb of columns, nb of rows). This is
transposed comprared to regular numpy !
"""
""" Directory where the simulation was loaded or will be saved."""
[docs]
creation_date: datetime
""" Creation or last save date. """
""" A free text comment, no formatting supported. """
[docs]
param_froude_max: float
""" Froude limit """
[docs]
_param_froude_bc_limit_tolerance: float
""" When Froude BC applies and needs to push up the water height,
it will push it up to current_water_height * tolerance.
"""
[docs]
param_runge_kutta: float
""" Runge-Kutta ponderation. The value is the ponderation of the estimator. Thus
1-ponderation is applied to the corrector. None means no RK but simple Euler step."""
""" Size of a single mesh along x axis, in meters. """
""" Size of a single mesh along y axis, in meters. """
""" Size of the computation domain along the x axis, in number of meshes. """
""" Size of the computation domain along the y axis, in number of meshes. """
[docs]
param_base_coord_ll_x: float
""" The computation domain coordinates are relative to "base" coordinates.
This parameter is the x component of the base coordinates.
The basis of the coordinates system is located, by convention, on the lower left corner of the domain
(i.e. at (0,0) in the local coordinates system). """
[docs]
param_base_coord_ll_y: float
""" The computation domain coordinates are relative to "base" coordinates.
This parameter is the y component of the base coordinates.
The basis of the coordinates system is located, by convention, on the lower left corner of the domain
(i.e. at (0,0) in the local coordinates system). """
[docs]
_param_timestep_strategy: TimeStepStrategy
""" Strategy used to compute the time step at each iteration. """
""" In case of a constant time step strategy, this is the duration of the
time step. It will be ignored in other cases. """
[docs]
_param_infiltration_lerp: InfiltrationInterpolation
""" Type of infiltration interpolation: constant values, linear interpolation,... """
[docs]
_param_duration: SimulationDuration
""" Duration of the simulation, either in steps or in simulation time (seconds) """
[docs]
_param_report_period: SimulationDuration
""" Duration of the period for reporting, either in steps or in simulation time (seconds) """
[docs]
_h : npt.NDArray[np.float32]
""" Matrix of water height, in meters. """
[docs]
_qx : npt.NDArray[np.float32]
""" Matrix of quantity of movement along x axis, in m²/s. """
[docs]
_qy : npt.NDArray[np.float32]
""" Matrix of quantity of movement along y axis, in m²/s. """
[docs]
_bathymetry : npt.NDArray[np.float32]
""" Bathymetry, in meters. """
[docs]
_manning : npt.NDArray[np.float32]
""" Manning coefficients. """
[docs]
_infiltration_zones : npt.NDArray[np.int32]
""" Infiltration zones indices. Indices are numbered from one.
0 means no infiltration in the mesh. """
[docs]
_nap : npt.NDArray[np.uint8]
""" Matrix representing which mesh are part of the computation
domain or not. 1 means a mesh is part of the computation
domain, 0 means it's not."""
[docs]
_bridge_roof_elevation: npt.NDArray[np.float32]
""" Elevation of the bridge roof. """
""" Attributes of the simulation as Wolf_Param instance. """
def __init__(self, nx: int=0, ny: int=0):
""" Create a new simple simulation.
When created, a simulation is empty. One must at least set the NAP array
and the `h` array to have something that can be simulated.
nx, ny : number of meshes horizontally and vertically.
For technical reasons, nx and ny must be both >= 5.
"""
if nx > 0 and ny > 0:
assert nx >= 5, "The number of meshes on the x axis must be at least 5"
assert ny >= 5, "The number of meshes on the y axis must be at least 5"
self.path = None
self._h = None
self._qx = None
self._qy = None
self._manning = None
self._bathymetry = None
self._infiltration_zones = None
self._nap = None
self._bridge_roof_elevation = None
self._boundary_conditions: list[boundary_condition_2D] = []
self._cell_parameters: list[CellParameter] = []
self._infiltrations_chronology =[]
self._infiltrations_timings_as_gpu_array_cache = None
self.param_courant = 0.5
self.param_dx, self.param_dy = 0, 0
self._param_nx, self._param_ny = nx, ny
self.param_froude_max = 3
self._param_froude_bc_limit_tolerance = 1.0
self._param_timestep_strategy = TimeStepStrategy.OPTIMIZED_TIME_STEP
self._param_timestep = 0.0
self.param_runge_kutta = 0.3
self._param_duration = SimulationDuration.from_steps(1000)
self._param_report_period = SimulationDuration.from_steps(10)
self._param_infiltration_lerp = InfiltrationInterpolation.NONE
self.param_base_coord_ll_x = 0.0
self.param_base_coord_ll_y = 0.0
self.comment = None
self.creation_date = None
[docs]
def _check_np_array_type(self, a: np.array, dtype=np.float32, check_size=True):
if not isinstance(a, np.ndarray):
raise TypeError("I expect a numpy array")
elif not a.dtype == dtype:
raise TypeError(f"I expect the type {dtype}, you gave {a.dtype}")
elif check_size and a.shape != (self._param_nx, self._param_ny):
raise TypeError(f"I expect the array to have shape ({self._param_nx},{self._param_ny})")
else:
return True
[docs]
def force_Euler_scheme(self):
""" Force the simulation to use a simple Euler scheme.
"""
self.param_runge_kutta = None
@property
[docs]
def param_dx(self):
""" dx is the x-dimension of a mesh (expressed in meters).
"""
return self._param_dx
@param_dx.setter
def param_dx(self, dx):
self._infiltrations_timings_as_gpu_array_cache = None
self._param_dx = dx
@property
[docs]
def param_dy(self):
""" dy is the y-dimension of a mesh (expressed in meters)
"""
return self._param_dy
@param_dy.setter
def param_dy(self, dy):
self._infiltrations_timings_as_gpu_array_cache = None
self._param_dy = dy
@property
[docs]
def h(self) -> npt.NDArray[np.float32]:
""" An array representing the water height (expressed in meters above the bathymetry).
"""
return self._h
@h.setter
def h(self, value: npt.NDArray[np.float32]):
if self._check_np_array_type(value):
self._h = value
@property
[docs]
def qx(self) -> npt.NDArray[np.float32]:
""" An array representing the water discharge along x-axis (expressed in m/s²).
"""
return self._qx
@qx.setter
def qx(self, value: npt.NDArray[np.float32]):
if self._check_np_array_type(value):
self._qx = value
@property
[docs]
def qy(self) -> npt.NDArray[np.float32]:
""" An array representing the water discharge along y-axis (expressed in m/s²).
"""
return self._qy
@qy.setter
def qy(self, value: npt.NDArray[np.float32]):
if self._check_np_array_type(value):
self._qy = value
@property
[docs]
def manning(self) -> npt.NDArray[np.float32]:
""" An array representing the Manning coefficient for each mesh.
"""
return self._manning
@manning.setter
def manning(self, value: npt.NDArray[np.float32]):
if self._check_np_array_type(value):
self._manning = value
@property
[docs]
def infiltration_zones(self) -> npt.NDArray[np.int32]:
""" An array representing the infiltration zones for each mesh.
"""
return self._infiltration_zones
@infiltration_zones.setter
def infiltration_zones(self, value: npt.NDArray[np.int32]):
if self._check_np_array_type(value, np.int32):
self._infiltration_zones = value
@property
[docs]
def nap(self) -> npt.NDArray[np.uint8]:
""" An array representing the meshes that are part of the computation domain.
"""
return self._nap
@nap.setter
def nap(self, value: npt.NDArray[np.uint8]):
if self._check_np_array_type(value, np.uint8):
self._nap = value
@property
[docs]
def bathymetry(self) -> npt.NDArray[np.float32]:
""" An array representing the bathymetry each mesh (expressed in meters).
"""
return self._bathymetry
@bathymetry.setter
def bathymetry(self, value: npt.NDArray[np.float32]):
if self._check_np_array_type(value):
self._bathymetry = value
@property
[docs]
def param_nx(self) -> int:
""" Dimension of the computation domain on the x axis.
"""
return self._param_nx
@param_nx.setter
def param_nx(self, value: int):
self._param_nx = value
@property
[docs]
def param_ny(self) -> int:
""" Dimension of the computation domain on the y axis.
"""
return self._param_ny
@param_ny.setter
def param_ny(self, value: int):
self._param_ny = value
@property
[docs]
def param_froude_bc_limit_tolerance(self):
""" When Froude BC applies and needs to push up the water height,
it will push it up to current_water_height * tolerance.
"""
return self._param_froude_bc_limit_tolerance
@param_froude_bc_limit_tolerance.setter
def param_froude_bc_limit_tolerance(self, value: float):
self._param_froude_bc_limit_tolerance = value
@property
[docs]
def shape(self) -> tuple[int, int]:
""" The shape of the various arrays representing data in the computation domain.
This is equal to `(param_nx, param_ny)`.
"""
return (self._param_nx, self._param_ny)
@property
[docs]
def param_duration(self) -> SimulationDuration:
""" Duration of the simulation, either in steps or in simulation time (seconds).
The duration is represented by an object of class `SimulationDuration`. """
return self._param_duration
@param_duration.setter
def param_duration(self, value: SimulationDuration):
if isinstance(value, SimulationDuration):
self._param_duration = value
else:
raise TypeError(f"For simulation duration, I expect an object of type SimulationDuration. You gave a '{value.__class__.__name__}'.")
@property
[docs]
def param_report_period(self) -> SimulationDuration:
""" Duration of the period for reporting, either in steps or in simulation time (seconds).
The duration is represented by an object of class `SimulationDuration`. """
return self._param_report_period
@param_report_period.setter
def param_report_period(self, value: SimulationDuration):
if isinstance(value, SimulationDuration):
self._param_report_period = value
else:
raise TypeError(f"For reporting period, I expect an object of type SimulationDuration. You gave a '{value.__class__.__name__}'.")
@property
[docs]
def param_timestep(self) -> float:
""" In case of a constant timestep strategy, this is the duration of a step, expressed in seconds. """
return self._param_timestep
@param_timestep.setter
def param_timestep(self, value: float):
if isinstance(value, float):
self._param_timestep = value
else:
raise TypeError(f"For time step, I expect a number of seconds in float.")
@property
[docs]
def param_timestep_strategy(self) -> TimeStepStrategy:
""" Strategy used to compute the time step at each iteration.
The strategy is represented by an object of class `TimeStepStrategy`. """
return self._param_timestep_strategy
@param_timestep_strategy.setter
def param_timestep_strategy(self, value: TimeStepStrategy):
if isinstance(value, TimeStepStrategy):
self._param_timestep_strategy = value
else:
raise TypeError(f"For time step strategy, I expect an object of type TimeStepStrategy. You gave a '{value.__class__.__name__}'.")
@property
[docs]
def param_infiltration_lerp(self) -> InfiltrationInterpolation:
""" Type of infiltration interpolation: constant values, linear interpolation,...
The interpolation type is represented by an object of class `InfiltrationInterpolation`. """
return self._param_infiltration_lerp
@param_infiltration_lerp.setter
def param_infiltration_lerp(self, value: InfiltrationInterpolation):
if isinstance(value, InfiltrationInterpolation):
self._param_infiltration_lerp = value
else:
raise TypeError(f"For infiltration interpolation strategy, I expect an object of type InfiltrationInterpolation. You gave a '{value.__class__.__name__}'.")
[docs]
def add_boundary_condition(self, i: int, j: int, bc_type:BoundaryConditionsTypes, bc_value: float, border:Direction):
""" Add a boundary condition to the model.
:param i, j: position of the boundary condition (1-based)
:param bc_type: type of boundary condition
:param bc_value: value/parameter of the boundary condition
:param border: mesh's border on which the boundary condtion applies
"""
if isinstance(bc_type, int):
for bc in BoundaryConditionsTypes:
if bc.value == bc_type:
bc_type = bc
break
self._boundary_conditions.append(boundary_condition_2D(i, j, bc_type, bc_value, border))
[docs]
def number_of_bc_along_X(self):
return len([bc for bc in self._boundary_conditions if bc.direction == Direction.X])
[docs]
def number_of_bc_along_Y(self):
return len([bc for bc in self._boundary_conditions if bc.direction == Direction.Y])
[docs]
def add_cell_parameter(self, i: int, j: int, bc_type:CellParameterType, bc_value: float):
""" Add a cell parameter to the model.
:param i: i position of the boundary condition (1-based)
:param j: j position of the boundary condition (1-based)
:param bc_type: type of cell parameter
:param bc_value: value/parameter of the boundary condition
:param border: mesh's border on which the boundary condtion applies
"""
self._cell_parameters.append(CellParameter(i, j, bc_type, bc_value))
[docs]
def get_cell_parameter(self, i: int, j: int, bc_type:CellParameterType) -> float:
""" Get the value of a cell parameter at a given position.
:param i, j: position of the boundary condition (1-based)
:param bc_type: type of cell parameter
"""
for cp in self._cell_parameters:
if cp.i == i and cp.j == j and cp.ntype == bc_type:
return cp.val
return None
[docs]
def get_all_cell_parameters(self, bc_type:CellParameterType) -> List[CellParameter]:
""" Get all the cell parameters of a given type.
:param bc_type: type of cell parameter
"""
return [cp for cp in self._cell_parameters if cp.ntype == bc_type]
[docs]
def clear_cell_parameters(self, bc_type:CellParameterType = None):
""" Clears all the cell parameters. Useful if one wants to build
a new simulation starting with an existing one.
:param bc_type: if given, only the cell parameters of this type will be cleared.
"""
if bc_type is None:
self._cell_parameters = []
else:
self._cell_parameters = [cp for cp in self._cell_parameters if cp.ntype != bc_type]
[docs]
def clear_boundary_conditions(self):
""" Clears all the boundary conditions. Useful if one wants to build
a new simulation starting with an existing one.
"""
self._boundary_conditions = []
[docs]
def add_infiltration(self, time_start: float, zones_values: list[float]):
""" Add an infiltration point in the infiltration chronology.
:param time_start: start time of the infiltration (in seconds)
:param zones_values: an array representing the quantity of water per second
to add to each infiltration zones, starting at time_start.
The first item corrsesponds to the zone 1, second item corresponds
to zone 2 and so on.
"""
# Invalidate the timings cache.
self._infiltrations_timings_as_gpu_array_cache = None
self._infiltrations_chronology.append( (time_start, zones_values) )
[docs]
def clear_infiltration_chronology(self):
""" Clear the infiltration chronology. Useful if one wants to build
a new simulation starting with an existing one.
"""
self._infiltrations_timings_as_gpu_array_cache = None
self._infiltrations_chronology = []
@property
[docs]
def boundary_condition(self) -> list[boundary_condition_2D]:
return self._boundary_conditions
@property
[docs]
def infiltrations_chronology(self) -> list[float,list[float]]:
""" Returns the chronology of infiltration.
This is a list. Each entry of the list is:
`[start_time, [list of values for each infiltration zone]]`
The start_time is when the infiltration should start. The list
of value is the quantity of water falling on each mesh of each
infitlation zones (the n-th entry of the list corresponds to
the n-th infiltration zone).
"""
return self._infiltrations_chronology
@infiltrations_chronology.setter
def infiltrations_chronology(self, infiltrations_chronology):
self._infiltrations_timings_as_gpu_array_cache = None
self._infiltrations_chronology = infiltrations_chronology
[docs]
def _infiltrations_timings_as_gpu_array(self):
""" Compute the infiltration timing array in a form suitable
for the GPU processing.
"""
# Maybe a bit premature but here I cache the construction
# of the infiltration timing GPU array because it could
# be slow if one provides a lot of timings
if self._infiltrations_timings_as_gpu_array_cache is None and \
len(self._infiltrations_chronology) >= 1 :
# The number of infiltration zones is implicitely given by
# the length of rows in the infiltration chronology.
# There, it is expected that all rows have the same number
# of infiltration zones.
nb_zones = len(self._infiltrations_chronology[0][1])
# 1 + nb_zones : 1 column for the timing, nb_zones columns for the
# actual infiltrated quantities.
shape = (len(self._infiltrations_chronology), 1 + nb_zones)
inf_timings_and_quantities = np.zeros( shape, dtype=np.float32)
for ndx, inf_data in enumerate(self._infiltrations_chronology):
inf_start, inf_quantities = inf_data
assert len(inf_data[1]) == nb_zones, "Not all infiltration zones values arays have the same length."
inf_timings_and_quantities[ndx, 0] = inf_start
inf_timings_and_quantities[ndx, 1:len(inf_quantities)+1] = inf_quantities
# Spread each infiltration's the water over its zone's meshes.
logging.info("Spreading infiltrations over the mesh")
# Precompute the number/surface of cells in each zone.
# For large mesh, large number of infiltration zones and
# numerous time steps, this could be **very** slow to
# compute this for each row of inf_timings.
# FIXME : To be optimized if number of zones evolves with time.
# Number of meshes per infiltration zones.
# In the infiltration array, the zones are numbered from 1
# to nb_zones (inclusive).
# Note that the source for the actual number of zones is
# the infiltration chronology. So it is possible that there
# are less infiltration zones in the infiltration array.
zones_indices = range(1, nb_zones+1)
meshes_per_zones = labeled_comprehension(
self._infiltration_zones, self._infiltration_zones,
zones_indices,
np.count_nonzero, int, 0)
zones_area= np.array(meshes_per_zones, dtype=np.float32) * self.param_dx * self.param_dy
for row in inf_timings_and_quantities:
for zone_ndx, zone_area in zip(zones_indices, zones_area):
original_inf = row[zone_ndx]
if zone_area > 0:
row[zone_ndx] /= zone_area
else:
# The zone area is 0. It means it's no in use in the infiltration map.
# So we clean the infiltration timings_and_quantities data to reflect
# data.
row[zone_ndx] = 0
# Infiltration is in m³/s for a complete zone
# It is divided by the infitlrated surface => (m³/s) / (m²) becomes m/s.
logging.debug(f"Infiltration t_start={row[0]:.1f} s, zone {zone_ndx} has {zone_area} cells with {original_inf} m³/s => infiltration per cell = {row[zone_ndx]:.4g} m/s")
self._infiltrations_timings_as_gpu_array_cache = inf_timings_and_quantities
logging.info(f"Spread infiltrations over the mesh - Done !")
return self._infiltrations_timings_as_gpu_array_cache
[docs]
def _check_infiltrations(self, infiltration_zones: np.ndarray, infiltration_timings, interp_infiltration: InfiltrationInterpolation) -> list[str]:
""" Check if the infiltration data is consistent. """
errors = []
nb_infiltration_lines = infiltration_timings.shape[0]
if not np.all(np.diff(infiltration_timings[:,0]) > 0):
errors.append(f"The infiltration timings are not strictly increasing: {infiltration_timings[:,0]}")
if interp_infiltration == InfiltrationInterpolation.LINEAR:
if infiltration_timings[0,0] != 0:
errors.append(f"The first infiltration time must be zero if you use interpolation")
if nb_infiltration_lines < 2:
errors.append(f"You need at least two data points in infiltrations if you use interpolation")
if errors:
return errors
zones_numbers = np.unique(infiltration_zones[infiltration_zones > 0]).tolist()
if len(zones_numbers) == 0:
logging.info("No infiltration zones found in the infiltration map.")
for zone_ndx in range(1, infiltration_timings.shape[1]):
errors.append(f"The zone {zone_ndx} doesn't appear in the infiltration map.")
else:
logging.debug(f"Infiltration zones numbers are : {zones_numbers}")
# Check if each zone actually has an infiltration.
# This test has been disabled because sometimes, users like to disable
# an infiltration by setting its infiltrated quantity to zero.
# used_zones = set()
# unused_zones = set()
# for row in infiltration_timings:
# # On each line, we look at every infiltrated/exfiltrated quantities
# for zone_ndx in range(1, row.shape[0]):
# if abs(row[zone_ndx]) > 0.0:
# used_zones.add(zone_ndx)
# else:
# unused_zones.add(zone_ndx)
# Labeled_comprehension needs to count something that exists.
# If there are no infiltration zones (the array is zero), then we
# count those zeros so that Labeled_comprehension has something to count.
num_count = labeled_comprehension(infiltration_zones, infiltration_zones,
index=np.arange(0, infiltration_zones.max()+1),
func=np.count_nonzero,
out_dtype=int,
default=0)
# Now we make sure we avoid the "zeros" (no infiltrations)
for zone_ndx in range(1, infiltration_timings.shape[1]):
if zone_ndx >= num_count.shape[0] or num_count[zone_ndx] == 0:
errors.append(f"The zone {zone_ndx} doesn't appear in the infiltration map.")
# for zone_ndx in range(1,infiltration_timings.shape[1]):
# a = infiltration_zones == zone_ndx
# b = np.sum(a)
# if b == 0:
# logging.warning(f"The zone {zone_ndx} doesn't appear in the infiltration map.")
return []
[docs]
def plot_infiltration(self, figax = None, toshow:bool=True):
""" Plot the infiltration chronology.
:param figax: a tuple (fig, ax) of matplotlib objects. If None, a new figure is created.
:param toshow: if True, the figure is shown.
:return: the figure and the axis.
"""
import matplotlib.pyplot as plt
if figax is None:
fig, ax = plt.subplots()
inf = self.infiltrations_chronology
times = [x[0] for x in inf]
values = [x[1] for x in inf]
if self.param_infiltration_lerp == InfiltrationInterpolation.LINEAR:
fig.suptitle(_("Infiltration chronology (linear interpolation)"))
for i in range(len(values[0])):
ax.plot(times, [x[i] for x in values], label=f"Zone {i+1}")
elif self.param_infiltration_lerp == InfiltrationInterpolation.NONE:
fig.suptitle(_("Infiltration chronology (constant values)"))
for i in range(len(values[0])):
ax.step(times, [x[i] for x in values], label=f"Zone {i+1}", where="post")
ax.set_xlabel(_("Time (s)"))
ax.set_ylabel(_("Flow rate ($m^3s^{-1}$)"))
ax.legend()
if toshow:
fig.show()
return fig, ax
[docs]
def check_warnings(self) -> Union[None, list[str]]:
""" Check if the model currently set in this object is valid.
A number of tests are made, they are not exhaustive.
Returns None if no problem was found. Returns a list of
warnings messages else.
"""
res = []
if self._h is not None:
if np.count_nonzero(self._h < 0) > 0:
res.append("There are meshes where you have h < 0. I can't handle that so I nullify h where h < 0.")
self._h[self._h < 0] = 0.
if np.count_nonzero(self._h == 99999.) > 0:
res.append("There are meshes where you have h == 99999. I can't handle that so I nullify h where h == 99999.")
self._h[self._h == 99999.] = 0.
if np.count_nonzero(self.h[self.nap == 0]):
res.append("There are meshes where you have h > 0 but nap == 0 ! I can't handle that so I nullify h where nap==0.")
self._h[self.nap == 0] = 0.
if self._h is not None and self._qx is not None:
if np.count_nonzero( (self._h == 0) & (self._qx > 0)):
res.append("There are meshes where you have qx > 0 but h == 0 ! I can't handle that so I nullify qx where h==0.")
self._qx[self._h == 0] = 0.
if self._h is not None and self._qy is not None:
if np.count_nonzero((self._h == 0) & (self._qy > 0)):
res.append("There are meshes where you have qy > 0 but h == 0 ! I can't handle that so I nullify qy where h==0.")
self._qy[self._h == 0] = 0.
if np.count_nonzero(self.bathymetry[self.nap ==0] == 0.):
res.append("There are meshes where you have bathymetry == 0 but nap == 0 ! I prefer to set the bathymetry to 99999 outside the computation domain.")
self._bathymetry[self.nap == 0] = 99999.
if np.count_nonzero(self.bathymetry[self.nap ==0] != 99999.):
res.append("There are meshes where you have bathymetry != 99999 but nap == 0 ! I prefer to set the bathymetry to 99999 outside the computation domain.")
self._bathymetry[self.nap == 0] = 99999.
if len(res) == 0:
return None
else:
return res
[docs]
def check_errors(self) -> Union[None, list[str]]:
""" Check if the model currently set in this object is valid.
A number of tests are made, they are not exhaustive.
Returns None if no problem was found. Returns a list of
error messages else.
"""
res = []
if self._infiltration_zones is not None:
if self._infiltration_zones.dtype != np.int32:
res.append(f"Infiltration zones must be a np.int32 array")
if self._infiltration_zones.shape != self.shape:
res.append(f"Infiltration zones shape must be (param. nx, param. ny)")
if not np.all(self._infiltration_zones >= 0):
l = np.unique(self._infiltration_zones).tolist()
res.append(f"In the infiltration zones array, the zones are numbered starting at 1. Zero means \"mesh has no infiltration\". But I see these values: {','.join(map(str,l))}.")
if self.nap is not None:
if self.nap.dtype != np.uint8:
res.append(f"NAP matrix must be a uint8 array")
if self.nap.shape != self.shape:
res.append(f"NAP shape must be (param. nx, param. ny)")
else:
res.append("The NAP array has not been set")
for name, array in {"h":self._h, "qx":self._qx, "qy":self._qy, "bathymetry":self._bathymetry, "manning":self._manning}.items():
array: np.ndarray
if array is not None:
if array.dtype != np.float32:
res.append(f"The {name} array's type must be np.float32")
if array.shape != self.shape:
res.append(f"{name} shape must be (param. nx, param. ny)")
else:
res.append(f"{name} array must be specified")
if self._bathymetry is not None:
if self.nap is None:
if np.count_nonzero(self._bathymetry < 0) > 0:
res.append("I have found negative heights in the bathymetry. I can't handle that.")
else:
if np.count_nonzero(self._bathymetry[self.nap > 0] < 0) > 0:
res.append("I have found negative heights in the bathymetry. I can't handle that.")
if not (self.param_courant > 0):
res.append("Courant number is wrong")
if not (self.param_dx > 0):
res.append(f"dx ({self.param_dx}) is wrong, it should be > 0")
if not (self.param_dy > 0):
res.append(f"dy ({self.param_dy}) is wrong, it should be > 0")
if not (self._param_nx > 4 ):
res.append(f"nx is wrong ({self._param_nx}), it should be an integer > 4 (we have 2 cells wide borders)")
if not (self._param_ny > 4 ):
res.append(f"ny is wrong ({self._param_ny}), it should be an integer > 4 (we have 2 cells wide borders)")
if self._infiltration_zones is not None and self._infiltrations_chronology == []:
res.append("If you give infiltrations zones then you must give infiltration chronolgy too")
if self._infiltration_zones is None and self._infiltrations_chronology != []:
res.append("If you give an infiltration chronolgy then you must give infiltrations zones too")
if self._infiltration_zones is not None and self._infiltrations_chronology != []:
for inf in self._infiltrations_chronology:
if len(inf) < 2:
res.append("In the infiltration chronology array, each line must have at least 2 elements: one for the time and at least one for the infiltration quantities")
break
if inf[0] < 0:
res.append("In infiltration chronolgy array, the time component (first element of a line) must be >= 0")
if len(inf) != len(self._infiltrations_chronology[0]):
res.append("In infiltration chronolgy array, all lines must have the same number of elements")
res.extend(
self._check_infiltrations(
self._infiltration_zones,
self._infiltrations_timings_as_gpu_array(),
self._param_infiltration_lerp))
if self._params_have_duplicated_values() != "":
res.append("Some parameters have duplicated values")
if len(res) == 0:
return None
else:
return res
[docs]
def _fail_if_invalid(self):
ce = self.check_warnings()
if ce is not None:
logging.warning( "\n\n" + ";\n\n".join(ce))
msg = "**** The simulation is not completely valid but I will try to run it anyway. ****"
logging.warning('*' * len(msg))
logging.warning(msg)
logging.warning('*' * len(msg))
ce = self.check_errors()
if ce is not None:
raise Exception( "\n\n" + ";\n\n".join(ce))
@classmethod
[docs]
def load(cls, simulation_dir: Path | str):
""" Load a simulation.
:param store_dir: where the simulation model is stored (a directory).
:type store_dir: Path
"""
# NOTE I thought about using a marshalling framework such as marshmallow
# but they tend to over-engineer the code and this may be a bit of
# a burden to maintain for people not accustomed to such patterns of
# development. So I K.I.S.S.
simulation_dir = Path(simulation_dir)
assert simulation_dir.exists(), f"The directory {simulation_dir} doesn't exist."
with open(simulation_dir / "parameters.json","r") as pfile:
data = json.loads(pfile.read())
assert data["spec_version"] in ("0.1", "0.2")
spec_version = data["spec_version"]
model = SimpleSimulation(data["parameters"]["nx"], data["parameters"]["ny"])
model.path = simulation_dir
model.comment = data["comment"]
model.creation_date = datetime.strptime(data["creation_date"], "%d/%m/%Y %H:%M:%S")
model.param_dx = data["parameters"]["dx"]
model.param_dy = data["parameters"]["dy"]
model.param_nx = data["parameters"]["nx"]
model.param_ny = data["parameters"]["ny"]
model.param_courant = data["parameters"]["courant"]
model.param_froude_max = data["parameters"]["froude_max"]
if spec_version >= "0.2":
model._param_froude_bc_limit_tolerance = data["parameters"]["froude_bc_tolerance"]
model._param_timestep_strategy = TimeStepStrategy[data["parameters"]["timestep_strategy"]]
model._param_timestep = data["parameters"]["timestep"]
model._param_infiltration_lerp = InfiltrationInterpolation[ data["parameters"]["infiltration_interp"]]
model._param_duration.from_dict(data["parameters"]["duration"])
model._param_report_period.from_dict(data["parameters"]["report_period"])
model.param_runge_kutta = data["parameters"]["runge_kutta"]
model.param_base_coord_ll_x = data["parameters"]["base_coord_x"]
model.param_base_coord_ll_y = data["parameters"]["base_coord_y"]
model._h = np.load(simulation_dir / data["maps"]["h"])
model._qx = np.load(simulation_dir / data["maps"]["qx"])
model._qy = np.load(simulation_dir / data["maps"]["qy"])
model._bathymetry = np.load(simulation_dir / data["maps"]["bathymetry"])
model._manning = np.load(simulation_dir / data["maps"]["manning"])
if "NAP" in data["maps"]:
model._nap = np.load(simulation_dir / data["maps"]["NAP"])
if "bridge_roof" in data["maps"]:
model.bridge_roof = np.load(simulation_dir / data["maps"]["bridge_roof"])
if "infiltration_zones" in data["maps"]:
model._infiltration_zones = np.load(simulation_dir / data["maps"]["infiltration_zones"])
# Clear the list because one can call load several times.
model._boundary_conditions = []
for bc in data["boundary_conditions"]:
model._boundary_conditions.append(boundary_condition_2D.from_dict(bc))
model._infiltrations_chronology = []
for infiltrations in data["infiltrations"]:
start_time, inf_values = infiltrations["start_time"], infiltrations["q"]
model._infiltrations_chronology.append(
[start_time, inf_values]
)
model._fail_if_invalid()
return model
[docs]
def copy(self) -> SimpleSimulation:
""" Create a new simulation from an existing one. """
return deepcopy(self)
[docs]
def save(self, store_dir: Path = None):
""" Save the model to a directory. If `store_dir` is None, the path
used to load the model is used. If the model was not loaded from a
directory, an exception is raised.
"""
assert isinstance(store_dir, Path), "store_dir must be a path"
self._fail_if_invalid()
if store_dir is None and self.path is None:
raise Exception("The simulation has no directory. Please provide one.")
path = store_dir or self.path
assert path is not None, "I need a path to save to"
if isinstance(path, str):
path = Path(path)
if not path.exists():
path.mkdir(parents=True)
else:
assert path.is_dir()
for name, array in {"h":self._h, "qx":self._qx, "qy":self._qy, "bathymetry":self._bathymetry,
"manning":self._manning, "infiltration_zones":self._infiltration_zones,
"NAP": self.nap}.items():
array: np.ndarray
if array is not None:
np.save(path / name, array)
if self.has_bridge():
np.save(path / "bridge_roof", self.bridge_roof)
self._save_json(path)
[docs]
def _save_json(self, store_dir: Path = None):
""" Save the parameters to a json file. """
if store_dir is None and self.path is None:
raise Exception("The simulation has no directory. Please provide one.")
path = store_dir or self.path
if self.creation_date is None:
self.creation_date = datetime.now()
maps = {}
for name, array in {"h":self._h, "qx":self._qx, "qy":self._qy, "bathymetry":self._bathymetry,
"manning":self._manning, "infiltration_zones":self._infiltration_zones,
"NAP": self.nap}.items():
array: np.ndarray
if array is not None:
maps[name] = name + ".npy"
if self.has_bridge():
maps["bridge_roof"] = "bridge_roof.npy"
with open(path / "parameters.json","w") as pfile:
params = {
"courant": self.param_courant,
"froude_max": self.param_froude_max,
"froude_bc_tolerance": self._param_froude_bc_limit_tolerance,
"nx" : self._param_nx,
"ny" : self._param_ny,
"dx" : self.param_dx,
"dy" : self.param_dy,
"timestep_strategy" : self._param_timestep_strategy.name,
"timestep": self._param_timestep,
"infiltration_interp" : self._param_infiltration_lerp.name,
"duration": self._param_duration.to_dict(),
"report_period": self._param_report_period.to_dict(),
"runge_kutta": self.param_runge_kutta,
"base_coord_x": self.param_base_coord_ll_x,
"base_coord_y": self.param_base_coord_ll_y
}
bcs = [bc.to_dict() for bc in self._boundary_conditions]
infs = [ { "start_time": start, "q": values} for start, values in self._infiltrations_chronology ]
data = {
"spec_version" : "0.2",
"comment" : self.comment,
"creation_date" : self.creation_date.strftime("%d/%m/%Y %H:%M:%S"),
"parameters" : params,
"boundary_conditions": bcs,
"infiltrations" : infs,
"maps" : maps
}
pfile.write(json.dumps(data, indent=2))
def __str__(self) -> str:
""" Return a string representation of the simulation. """
manning = self._manning
active = self.nap
wet = self._h
m = manning[active > 0]
w,h = self._param_nx, self._param_ny
domain = w*h
total_wet = np.count_nonzero(wet[:,:][active > 0] > 0)
total_active = np.sum(active > 0)
txt = []
txt.append(f"Domain : {w}x{h} meshes, (dx={self.param_dx:g} m.,dy={self.param_dx:g} m.), {100*total_active/domain:.1f}% active, {100*total_wet/domain:.1f}% wet")
if self.param_timestep_strategy == TimeStepStrategy.FIXED_TIME_STEP:
txt.append(f"Timestep : Fixed, {self._param_timestep:g} s.")
else:
txt.append(f"Timestep : Optimized")
if self.param_report_period is not None:
rep = f"1 report every {self.param_report_period}"
# if self.param_report_period.type == SimulationDurationType.SECONDS:
# rep += f" (time sampled every {TIME_SAMPLING_RESOLUTION} steps)"
else:
rep = "no reports recorded"
txt.append(f"Duration : {self.param_duration}, {rep}")
if self.param_runge_kutta == 1.0:
txt.append("Ponderation : Euler")
else:
txt.append(
f"Ponderation : Runge-Kutta {self.param_runge_kutta:g}/{1-self.param_runge_kutta:g}"
)
if self.param_infiltration_lerp == InfiltrationInterpolation.NONE:
interp = "None"
elif self.param_infiltration_lerp == InfiltrationInterpolation.LINEAR:
interp = "Linear"
txt.append(f"Infiltration interpolation : {interp}")
txt.append(f"Courant number : {self.param_courant:g}") # FIXME Non dimensional, correct ?
txt.append(f"Froude limit : {self.param_froude_max:g}")
txt.append(f"Froude BC tol. : {self.param_froude_bc_limit_tolerance:g}")
#print(f"Dry up handling: {['Enabled','Disabled'][sim._no_dry is True]}")
txt.append(f"Mean of Manning coefficients: {np.mean(m):.3f} [s.m^{{-1/3}}]")
return "\n".join(txt)
[docs]
def to_wolfparam(self) -> Wolf_Param:
""" Return a Wolf_Param object that represents the parameters of the simulation. """
_wp = Wolf_Param(parent = None,
title = "Simple simulation GPU",
to_read=False,
withbuttons=True,
toShow=False,
init_GUI=False,
force_even_if_same_default = True)
_wp.add_param(_('Scheme'), _('Time step strategy'), self.param_timestep_strategy.value, Type_Param.Integer, jsonstr={"Values":{"Fixed time step":1,"Optimized time step":2}})
_wp.add_param(_('Infiltration'), _('Interpolation mode'), self.param_infiltration_lerp.value, Type_Param.Integer, jsonstr={"Values":{"None":0, "Linear":1}})
_wp = self.add_to_wolfparam(_wp)
return _wp
[docs]
def add_to_wolfparam(self, dest:Wolf_Param) -> Wolf_Param:
""" Return a Wolf_Param object that represents the parameters of the simulation. """
dest[(_('General'), _('X Lower left corner [m]'))] = self.param_base_coord_ll_x
dest[(_('General'), _('Y Lower left corner [m]'))] = self.param_base_coord_ll_y
dest[(_('Spatial resolution'), 'dx [m]')] = self.param_dx
dest[(_('Spatial resolution'), 'dy [m]')] = self.param_dy
dest[(_('Shape'), _('Number of meshes along X - nx'))] = self.param_nx
dest[(_('Shape'), _('Number of meshes along Y - ny'))] = self.param_ny
dest[(_('Scheme'), _('Courant number [-]'))] = self.param_courant
dest[(_('Scheme'), _('Froude limit [-]'))] = self.param_froude_max
dest[(_('Scheme'), _('Time step strategy'))] = self.param_timestep_strategy.value
dest[(_('Scheme'), _('Time step [s]'))] = self.param_timestep
dest[(_('Scheme'), _('Runge-Kutta ponderation [-]'))] = self.param_runge_kutta
dest[(_('Duration'), _('Total duration [s]'))] = self.param_duration.duration
dest[(_('Duration'), _('Report period [s]'))] = self.param_report_period.duration
dest[(_('Boundary conditions'), _('Froude BC - tolerance on H [-]'))] = self.param_froude_bc_limit_tolerance
dest[(_('Infiltration'), _('Interpolation mode'))] = self.param_infiltration_lerp.value
return dest
[docs]
def from_wolfparam(self, wp: Wolf_Param):
""" Callback called when the Wolf_Param object is modified and apply routine is called. """
assert isinstance(wp, Wolf_Param), "I need a Wolf_Param object."
self.param_base_coord_ll_x = wp[(_('General'), _('X Lower left corner [m]'))]
self.param_base_coord_ll_y = wp[(_('General'), _('Y Lower left corner [m]'))]
self.param_dx = wp[(_('Spatial resolution'), 'dx [m]')]
self.param_dy = wp[(_('Spatial resolution'), 'dy [m]')]
self.param_nx = wp[(_('Shape'), _('Number of meshes along X - nx'))]
self.param_ny = wp[(_('Shape'), _('Number of meshes along Y - ny'))]
self.param_courant = wp[(_('Scheme'), _('Courant number [-]'))]
self.param_froude_max = wp[(_('Scheme'), _('Froude limit [-]'))]
self.param_froude_bc_limit_tolerance = wp[(_('Boundary conditions'), _('Froude BC - tolerance on H [-]'))]
strategy = wp[(_('Scheme'), _('Time step strategy'))]
self.param_timestep_strategy = TimeStepStrategy.FIXED_TIME_STEP if strategy == 1 else TimeStepStrategy.OPTIMIZED_TIME_STEP
self.param_timestep = wp[(_('Scheme'), _('Time step [s]'))]
self.param_runge_kutta = wp[(_('Scheme'), _('Runge-Kutta ponderation [-]'))]
self.param_duration.duration = wp[(_('Duration'), _('Total duration [s]'))]
self.param_report_period.duration = wp[(_('Duration'), _('Report period [s]'))]
lerp = wp[(_('Infiltration'), _('Interpolation mode'))]
self.param_infiltration_lerp = InfiltrationInterpolation.NONE if lerp == 0 else InfiltrationInterpolation.LINEAR
[docs]
def write_initial_condition_from_record(self, recpath:Path = None, id_rec:int = None, destpath:Path = None) -> int:
"""
Define the initial condition of the simulation from a previous record.
:param recpath: the path to the records. if None, the default path is used and 'simul_gpu_results' as result directory.
:param id_rec: the index of the record you want to start from.
:param destpath: the path where to save the initial condition. If None, the current path is used.
:return: 0 if everything went well, -1 if the record doesn't exist, -2 if the record path doesn't exist.
"""
if recpath is None:
recpath = self.path / 'simul_gpu_results'
assert isinstance(recpath, Path), "I need a Path. Voilà."
# import here to avoid circular import
from .results_store import ResultsStore
from shutil import copyfile
if not recpath.exists():
logging.error(f"Record path {recpath} doesn't exist.")
return -2
# create a result store
_store = ResultsStore(recpath, mode="r")
# check if the record exists
if id_rec is None:
id_rec = _store.nb_results
assert id_rec > 0, "The index of the record must be > 0"
if id_rec > _store.nb_results:
logging.error(f"Record {id_rec} doesn't exist.")
return -1
# get the result
t,dt,n_iter_dry_up_euler, n_iter_dry_up_rk, h,qx,qy = _store.get_result(id_rec)
# check the shape
assert h.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation."
assert qx.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation."
assert qy.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation."
if destpath is None:
destpath = self.path
destpath.mkdir(parents=True, exist_ok=True)
# save the initial condition
if self.path == destpath:
copyfile(self.path / "h.npy" , destpath / "_h_backup.npy")
copyfile(self.path / "qx.npy", destpath / "_qx_backup.npy")
copyfile(self.path / "qy.npy", destpath / "_qy_backup.npy")
# write the initial condition
mask = h == 99999.0
h[mask] = 0.0
qx[mask] = 0.0
qy[mask] = 0.0
np.save(destpath / 'h.npy', h.T)
np.save(destpath / 'qx.npy', qx.T)
np.save(destpath / 'qy.npy', qy.T)
if self.path != destpath:
# Copy bathymetry data only if the destination path is different from the current path
np.save(destpath / 'bathymetry.npy', self.bathymetry)
return 0
[docs]
def copy_ic_from_simulation(self, source:SimpleSimulation):
""" Copy the initial condition from a simulation to the current simulation. """
assert source is not None, "Source simulation is None."
assert source.param_nx == self.param_nx, "Source simulation has a different nx than the current simulation."
assert source.param_ny == self.param_ny, "Source simulation has a different ny than the current simulation."
self.h = source.h.copy()
self.qx = source.qx.copy()
self.qy = source.qy.copy()
mask = self.h == 99999.0
self.h[mask] = 0.0
self.qx[mask] = 0.0
self.qy[mask] = 0.0
[docs]
def copy_ic_from_record(self, source:Path, id_rec:int = None):
""" Copy the initial condition from a record to the current simulation. """
assert source.exists(), f"Source path {source} doesn't exist."
# import here to avoid circular import
from .results_store import ResultsStore
# create a result store
_store = ResultsStore(source, mode="r")
# check if the record exists
if id_rec is None:
id_rec = _store.nb_results
assert id_rec > 0, "The index of the record must be > 0"
if id_rec > _store.nb_results:
logging.error(f"Record {id_rec} doesn't exist.")
return -1
# get the result
t,dt,n_iter_dry_up_euler, n_iter_dry_up_rk, h,qx,qy = _store.get_result(id_rec)
# check the shape
assert h.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation."
assert qx.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation."
assert qy.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation."
mask = h == 99999.0
self.h[mask] = 0.0
self.qx[mask] = 0.0
self.qy[mask] = 0.0
self.h = h.T
self.qx = qx.T
self.qy = qy.T
return 0
[docs]
def has_infiltration(self):
return self.infiltration_zones is not None
[docs]
def mask_nap_contour(self, nb_cells:int=1):
""" Mask the contour of the NAP array. """
self.nap[:nb_cells, :] = 0
self.nap[-nb_cells:, :] = 0
self.nap[:, :nb_cells] = 0
self.nap[:, -nb_cells:] = 0
[docs]
def make_void_arrays(self):
""" Create void arrays from (nx, ny) shape.
Arrays :
- nap
- h
- qx
- qy
- manning
- bathymetry
"""
assert self.param_nx > 0, "nx must be > 0"
assert self.param_ny > 0, "ny must be > 0"
self._nap = np.ones((self.param_nx, self.param_ny), dtype=np.uint8)
self.mask_nap_contour()
self._h = np.zeros((self.param_nx, self.param_ny), dtype=np.float32)
self._qx = np.zeros((self.param_nx, self.param_ny), dtype=np.float32)
self._qy = np.zeros((self.param_nx, self.param_ny), dtype=np.float32)
self._manning = np.zeros((self.param_nx, self.param_ny), dtype=np.float32)
self._bathymetry = np.zeros((self.param_nx, self.param_ny), dtype=np.float32)
[docs]
def make_void_infiltration_zones(self):
""" Create a void infiltration zones array. """
assert self.param_nx > 0, "nx must be > 0"
assert self.param_ny > 0, "ny must be > 0"
self._infiltration_zones = np.zeros((self.param_nx, self.param_ny), dtype=np.int32)
@property
[docs]
def bridge_roof(self) -> np.ndarray:
""" Return the bridge roof. """
if self._bridge_roof_elevation is None:
self.make_void_bridge_roof()
for cur in self._cell_parameters:
cur:CellParameter
if cur.ntype == CellParameterType.BRIDGE_ROOF_ELEVATION:
self._bridge_roof_elevation[cur.i-1, cur.j-1] = cur.val + self.bathymetry[cur.i-1, cur.j-1]
return self._bridge_roof_elevation
[docs]
def has_bridge(self) -> bool:
""" Return True if the simulation has a bridge roof. """
for cur in self._cell_parameters:
cur:CellParameter
if cur.ntype == CellParameterType.BRIDGE_ROOF_ELEVATION:
return True
return False
@bridge_roof.setter
def bridge_roof(self, value:np.ndarray):
""" Set the bridge roof. """
if value is None:
self._bridge_roof_elevation = value
self.clear_cell_parameters(CellParameterType.BRIDGE_ROOF_ELEVATION)
return
if self._check_np_array_type(value):
self._bridge_roof_elevation = value
self.convert_roof_array2params(value)
[docs]
def make_void_bridge_roof(self):
""" Create a void bridge roof array. """
assert self.param_nx > 0, "nx must be > 0"
assert self.param_ny > 0, "ny must be > 0"
self._bridge_roof_elevation = np.ones((self.param_nx, self.param_ny), dtype=np.float32) * 99999.0
[docs]
def convert_roof_array2params(self, roof_array:np.ndarray = None):
""" Convert a roof array to a list of parameters.
It will remove all the existing bridge roof parameters
before add the new ones.
"""
if roof_array is None:
roof_array = self._bridge_roof_elevation
if roof_array is None:
logging.error("No roof elevation to convert.")
return
self.clear_cell_parameters(CellParameterType.BRIDGE_ROOF_ELEVATION)
ij = np.where((roof_array > 0) & (roof_array != 99999.0) & (self.nap ==1))
if len(ij[0]) == 0:
return
for i,j in zip(ij[0], ij[1]):
self.add_cell_parameter(i+1, j+1, CellParameterType.BRIDGE_ROOF_ELEVATION, roof_array[i,j] - self.bathymetry[i,j])
logging.debug(f"Converted {len(ij[0])} roof elevations to parameters.")
[docs]
def _params_have_duplicated_values(self) -> str:
""" Check if the parameters are not duplicated. """
ret = ""
# for i,cur in enumerate(self._cell_parameters):
# cur:CellParameter
# for j,cur2 in enumerate(self._cell_parameters[i+1:]):
# cur2:CellParameter
# if i != j and cur.i == cur2.i and cur.j == cur2.j and cur.ntype == cur2.ntype:
# logging.warning(f"Duplicate parameters at ({cur.i},{cur.j})")
# ret += f"Duplicate parameters at ({cur.i},{cur.j})\n"
tmp = list(set(self._cell_parameters))
if len(tmp) != len(self._cell_parameters):
logging.warning("Duplicate parameters found - Check your parameters !")
ret += "Duplicate parameters found - Check your parameters !\n"
return ret
[docs]
def ensure_nullvalue_outside_nap(self, nodata=99999.0):
""" Force the nodata where nap is 0 for:
- topo/bathymetry
- manning
Force zero value for initial conditions where nap is 0:
- h
- qx
- qy
"""
msk= self.nap == 0
self._h[msk] = 0.
self._qx[msk] = 0.
self._qy[msk] = 0.
self._manning[msk] = nodata
self._bathymetry[msk] = nodata
if self.infiltration_zones is not None:
self._infiltration_zones[msk] = 0
if self.bridge_roof is not None:
self.bridge_roof[msk] = nodata