Source code for wolfgpu.simple_simulation

"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

from __future__ import annotations
# Run this as python -m wolfgpu.simple_simulation else you'll enter a
# dependency/import Alice In Wonderland loop.

import json
import sys
import importlib
from enum import Enum
from pathlib import Path
from datetime import datetime
from typing import Union, List, Type, Literal
from pprint import pprint

import numpy as np
import numpy.typing as npt
from scipy.ndimage import labeled_comprehension
import logging
from copy import deepcopy

from wolfgpu.injector import SimulationInjector
from wolfgpu.injector_wrapper import InjectorWrapper
from wolfgpu.simplesimu.durations import SimulationDuration, ReportFrequencyType, TimeStepStrategy, SimulationDurationType
from wolfgpu.simplesimu.infiltrations import InfiltrationInterpolation, InfiltrationChronology
from wolfgpu.simplesimu.boundary_conditions import BoundaryConditionIndices, BC_BLANK_VALUE, NB_BC_TYPES, BoundaryConditionsTypes, boundary_condition_2D, Direction


from wolfhece.PyParams import Wolf_Param, Type_Param
from wolfhece.PyTranslate import _
from wolfhece.wolf_array import header_wolf

# PEP-8 way of making a version number available.
from .version import __version__
from wolfgpu.defaults import DEFAULT_FROUDE_BC_TOLERANCE
# Why this ? Because we needed to update the injectors which lead to
# various import issues. To make sure current users of Simplesimulation
# are not broken, I have used this technique to re-export stuff, breaking
# some dependency cycles.

__all__ = [
    "BoundaryConditionIndices",
    "BC_BLANK_VALUE",
    "NB_BC_TYPES",
    "CellParameterType",
    "BoundaryConditionsTypes",
    "Direction",
    "ReportFrequencyType",
    "SimulationDuration",
    "SimulationDurationType",
    "ReportFrequencyType",
    "TimeStepStrategy",
    "InfiltrationChronology",
    "SimpleSimulation"
]

[docs] class CellParameterType(Enum): """ Represents the type of a cell parameter. """
[docs] BRIDGE_DECK_ELEVATION = 1
[docs] BRIDGE_ROOF_ELEVATION = 2
INJECTOR_FROM_FILE_MARK="__from_file__" # FIXME This should be imported from WolfHECE
[docs] class BoundaryConditionsTypes(Enum): """The boundary conditions types. Note: The values match the numbers in Wolf's simulations parameters. """
[docs] H = 1
""" Weak boundary condition over :math:`h+b`, the water free surface height (that is, height of water plus bathymetry level). """
[docs] QX = 2
""" Weak boundary condition over :math:`q_x`, the discharge over the x-axis. """
[docs] QY = 3
""" Weak boundary condition over :math:`q_y`, the discharge over the x-axis. """
[docs] NONE = 4
""" This is used to represent the absence of boundary condition. """
[docs] QBX = 5
""" Not used. """
[docs] QBY = 6
""" Not used. """
[docs] HMOD = 7
""" Weak boundary condition over :math:`h`, the water free surface height. """
[docs] FROUDE_NORMAL = 8
""" Weak boundary condition for Froude limited height. Water height becomes :math:`\\min(h, \\frac{u^2}{g \\times \\text{Fr}^2})` where :math:`\\text{Fr}` is given. """
[docs] class Direction(Enum):
[docs] LEFT = 1
[docs] BOTTOM = 2
[docs] X = 1
[docs] Y = 2
# FIXME This *MUST* be imported from WolfHECE class boundary_condition_2D: ''' Type des CL générales, faibles et fortes @author Pierre Archambeau ''' def __init__(self, i: int, j: int, ntype: BoundaryConditionsTypes, val: float, direction: Direction) -> None: from wolfhece.mesh2d.cst_2D_boundary_conditions import BCType_2D_GPU, Direction as hece_Direction INTEGER_TYPES=set([int, np.int16, np.int32, np.int64]) if not type(i) in INTEGER_TYPES: raise TypeError(f"The type of 'i' is not integral. You gave a '{type(i)}', I support {INTEGER_TYPES}") if not type(j) in INTEGER_TYPES: raise TypeError(f"The type of 'j' is not integral. You gave a '{type(j)}', I support {INTEGER_TYPES}") if not isinstance(ntype, BoundaryConditionsTypes | BCType_2D_GPU): raise TypeError(f"{ntype} should be BoundaryConditionsTypes. You gave {type(ntype)}") if not isinstance(direction, Direction | hece_Direction): raise TypeError(f"{direction} should be of type Direction. You gave {type(ntype)}") if i < 1: raise ValueError(f"The boundary conditions indices start at 1. You gave: {i} for the i index.") if j < 1: raise ValueError(f"The boundary conditions indices start at 1. You gave: {j} for the j index.") # I cast to "int" because these values will be marshalled to JSON. # However I use the default behaviour of JSON which only handles 'int' # and not numpy's int types if type(i) != int or type(j) != int: logging.warning("You have given i,j values for which the type is not 'int'. I will cast them to int.") self.i = int(i) # indice de colonne dans le plus petit maillage self.j = int(j) # indice de ligne dans le plus petit maillage self.ntype = ntype # type de cl (h=1,qx=2,qy=3,rien=4,qbx=5,qby=6,hmod=7,fr=8) self.val = val # valeur à imposer self.direction = direction def to_dict(self): return { "i" : self.i, "j" : self.j, "type": self.ntype.name, "val" : float(self.val), # float makes sure the load and save to JSON has same format ("xxx.yyy") event when passing int's. "direction": self.direction.name, } @classmethod def from_dict(kls, js): return boundary_condition_2D(js["i"], js["j"], BoundaryConditionsTypes[js["type"]], float(js["val"]), Direction[js["direction"]]) class CellParameter: """ Cell parameter """ def __init__(self, i: int, j: int, ntype: CellParameterType, val: float) -> None: assert i >= 1, "Coordinates of cells are one-based" assert j >= 1, "Coordinates of cells are one-based" assert isinstance(ntype, CellParameterType) self.i = i # indice de colonne dans le plus petit maillage self.j = j # indice de ligne dans le plus petit maillage self.ntype = ntype # type de cl (h=1,qx=2,qy=3,rien=4,qbx=5,qby=6,hmod=7,fr=8) self.val = val # valeur à imposer def to_dict(self): return { "i" : self.i, "j" : self.j, "type": self.ntype.name, "val" : float(self.val), # float makes sure the load and save to JSON has same format ("xxx.yyy") event when passing int's. } @classmethod def from_dict(kls, js): return CellParameter(js["i"], js["j"], CellParameterType[js["type"]], float(js["val"]))
[docs] class SimpleSimulation: """ A simple simulation scenario. IMPORTANT: although we use numpy arrays to represent various matrices, be careful: we use shapes as (nb of columns, nb of rows). This is transposed compared to regular numpy ! """
[docs] path: Path
""" Directory where the simulation was loaded or will be saved."""
[docs] creation_date: datetime
""" Creation or last save date. """
[docs] comment: str
""" A free text comment, no formatting supported. """
[docs] param_froude_max: float
""" Froude limit """
[docs] _param_froude_bc_limit_tolerance: float
""" When Froude BC applies and needs to push up the water height, it will push it up to current_water_height * tolerance. """
[docs] param_runge_kutta: float
""" Runge-Kutta ponderation. The value is the ponderation of the estimator. Thus 1-ponderation is applied to the corrector. None means no RK but simple Euler step."""
[docs] _param_dx: float
""" Size of a single mesh along x axis, in meters. """
[docs] _param_dy: float
""" Size of a single mesh along y axis, in meters. """
[docs] _param_nbx: int
""" Size of the computation domain along the x axis, in number of meshes. """
[docs] _param_nby: int
""" Size of the computation domain along the y axis, in number of meshes. """
[docs] param_base_coord_ll_x: float
""" The computation domain coordinates are relative to "base" coordinates. This parameter is the x component of the base coordinates. The basis of the coordinates system is located, by convention, on the lower left corner of the domain (i.e. at (0,0) in the local coordinates system). """
[docs] param_base_coord_ll_y: float
""" The computation domain coordinates are relative to "base" coordinates. This parameter is the y component of the base coordinates. The basis of the coordinates system is located, by convention, on the lower left corner of the domain (i.e. at (0,0) in the local coordinates system). """
[docs] _param_timestep_strategy: TimeStepStrategy
""" Strategy used to compute the time step at each iteration. """
[docs] _param_timestep: float
""" In case of a constant time step strategy, this is the duration of the time step. It will be ignored in other cases. """
[docs] _param_infiltration_lerp: InfiltrationInterpolation
""" Type of infiltration interpolation: constant values, linear interpolation,... """
[docs] _param_duration: SimulationDuration
""" Duration of the simulation, either in steps or in simulation time (seconds) """
[docs] _param_report_period: SimulationDuration
""" Duration of the period for reporting, either in steps or in simulation time (seconds) """
[docs] _h : npt.NDArray[np.float32]
""" Matrix of water height, in meters. """
[docs] _qx : npt.NDArray[np.float32]
""" Matrix of quantity of movement along x axis, in m²/s. """
[docs] _qy : npt.NDArray[np.float32]
""" Matrix of quantity of movement along y axis, in m²/s. """
[docs] _bathymetry : npt.NDArray[np.float32]
""" Bathymetry, in meters. """
[docs] _manning : npt.NDArray[np.float32]
""" Manning coefficients. """
[docs] _infiltration_zones : npt.NDArray[np.int32]
""" Infiltration zones indices. Indices are numbered from one. 0 means no infiltration in the mesh. """
[docs] _nap : npt.NDArray[np.uint8]
""" Matrix representing which mesh are part of the computation domain or not. 1 means a mesh is part of the computation domain, 0 means it's not."""
[docs] _bridge_roof_elevation: npt.NDArray[np.float32]
""" Elevation of the bridge roof. """
[docs] _wp : Wolf_Param
""" Attributes of the simulation as Wolf_Param instance. """ def __init__(self, nx: int=0, ny: int=0): """ Create a new simple simulation. When created, a simulation is empty. One must at least set the NAP array and the `h` array to have something that can be simulated. nx, ny : number of meshes horizontally and vertically. For technical reasons, nx and ny must be both >= 5. """ if nx > 0 and ny > 0: assert nx >= 5, "The number of meshes on the x axis must be at least 5" assert ny >= 5, "The number of meshes on the y axis must be at least 5" self.path = None self._h = None self._qx = None self._qy = None self._manning = None self._bathymetry = None self._infiltration_zones = None self._nap = None self._bridge_roof_elevation = None
[docs] self._boundary_conditions: list[boundary_condition_2D] = []
[docs] self._cell_parameters: list[CellParameter] = []
[docs] self._infiltrations_chronology =[]
[docs] self._infiltrations_timings_as_gpu_array_cache = None
[docs] self._wrapped_injectors: dict[str, InjectorWrapper] = {}
[docs] self.param_courant = 0.5
self._param_dx, self._param_dy = 0, 0 self._param_nbx, self._param_nby = nx, ny self.param_froude_max = 3 self._param_froude_bc_limit_tolerance = DEFAULT_FROUDE_BC_TOLERANCE self._param_timestep_strategy = TimeStepStrategy.OPTIMIZED_TIME_STEP self._param_timestep = 0.0 self.param_runge_kutta = 0.3 self._param_duration = SimulationDuration.from_steps(1000) self._param_report_period = SimulationDuration.from_steps(10) self._param_infiltration_lerp = InfiltrationInterpolation.NONE self.param_base_coord_ll_x = 0.0 self.param_base_coord_ll_y = 0.0 self.comment = None self.creation_date = None
[docs] def _check_np_array_type(self, a: np.array, dtype=np.float32, check_size=True): if not isinstance(a, np.ndarray): raise TypeError("I expect a numpy array") elif not a.dtype == dtype: raise TypeError(f"I expect the type {dtype}, you gave {a.dtype}") elif check_size and a.shape != (self._param_nbx, self._param_nby): raise TypeError(f"I expect the array to have shape (nx={self._param_nbx}, ny={self._param_nby})") else: return True
[docs] def _interior_cell(self, bc: boundary_condition_2D, zero_based=True): """ Given a boundary condition, return the cell which touches the boundary condition border *and* which is inside the simulation domain. This works only if the boundary condition's coordinates are correct. """ is_in_domain = self.nap[bc.i-1, bc.j-1] if is_in_domain: crd = (bc.i, bc.j) else: match bc.direction: case Direction.LEFT: crd = (bc.i-1, bc.j) case Direction.BOTTOM: crd = (bc.i, bc.j-1) case _: raise Exception("Bad direction ?") if zero_based: return (crd[0]-1, crd[1]-1) else: return crd
[docs] def force_Euler_scheme(self): """ Force the simulation to use a simple Euler scheme. """ self.param_runge_kutta = None
@property
[docs] def param_dx(self): """ dx is the x-dimension of a mesh (expressed in meters). """ return self._param_dx
@param_dx.setter def param_dx(self, dx): self._infiltrations_timings_as_gpu_array_cache = None self._param_dx = dx @property
[docs] def param_dy(self): """ dy is the y-dimension of a mesh (expressed in meters) """ return self._param_dy
@param_dy.setter def param_dy(self, dy): self._infiltrations_timings_as_gpu_array_cache = None self._param_dy = dy @property
[docs] def h(self) -> npt.NDArray[np.float32]: """ An array representing the water height (expressed in meters above the bathymetry). """ return self._h
@h.setter def h(self, value: npt.NDArray[np.float32]): if self._check_np_array_type(value): self._h = value @property
[docs] def qx(self) -> npt.NDArray[np.float32]: """ An array representing the water discharge along x-axis (expressed in m/s²). """ return self._qx
@qx.setter def qx(self, value: npt.NDArray[np.float32]): if self._check_np_array_type(value): self._qx = value @property
[docs] def qy(self) -> npt.NDArray[np.float32]: """ An array representing the water discharge along y-axis (expressed in m/s²). """ return self._qy
@qy.setter def qy(self, value: npt.NDArray[np.float32]): if self._check_np_array_type(value): self._qy = value @property
[docs] def manning(self) -> npt.NDArray[np.float32]: """ An array representing the Manning coefficient for each mesh. """ return self._manning
@manning.setter def manning(self, value: npt.NDArray[np.float32]): if self._check_np_array_type(value): self._manning = value @property
[docs] def infiltration_zones(self) -> npt.NDArray[np.int32]: """ An array representing the infiltration zones for each mesh. """ return self._infiltration_zones
@infiltration_zones.setter def infiltration_zones(self, value: npt.NDArray[np.int32]): if self._check_np_array_type(value, np.int32): self._infiltration_zones = value @property
[docs] def nap(self) -> npt.NDArray[np.uint8]: """ An array representing the meshes that are part of the computation domain. """ return self._nap
@nap.setter def nap(self, value: npt.NDArray[np.uint8]): if self._check_np_array_type(value, np.uint8): self._nap = value @property
[docs] def bathymetry(self) -> npt.NDArray[np.float32]: """ An array representing the bathymetry each mesh (expressed in meters). """ return self._bathymetry
@bathymetry.setter def bathymetry(self, value: npt.NDArray[np.float32]): if self._check_np_array_type(value): self._bathymetry = value @property
[docs] def param_nx(self) -> int: """ Dimension of the computation domain on the x axis. """ return self._param_nbx
@param_nx.setter def param_nx(self, value: int): self._param_nbx = value @property
[docs] def param_ny(self) -> int: """ Dimension of the computation domain on the y axis. """ return self._param_nby
@param_ny.setter def param_ny(self, value: int): self._param_nby = value @property
[docs] def param_froude_bc_limit_tolerance(self): """ When Froude BC applies and needs to push up the water height, it will push it up to current_water_height * tolerance. """ return self._param_froude_bc_limit_tolerance
@param_froude_bc_limit_tolerance.setter def param_froude_bc_limit_tolerance(self, value: float): self._param_froude_bc_limit_tolerance = value @property
[docs] def shape(self) -> tuple[int, int]: """ The shape of the various arrays representing data in the computation domain. This is equal to `(param_nx, param_ny)`. """ return (self._param_nbx, self._param_nby)
@property
[docs] def param_duration(self) -> SimulationDuration: """ Duration of the simulation, either in steps or in simulation time (seconds). The duration is represented by an object of class `SimulationDuration`. """ return self._param_duration
@param_duration.setter def param_duration(self, value: SimulationDuration): if isinstance(value, SimulationDuration): self._param_duration = value else: raise TypeError(f"For simulation duration, I expect an object of type SimulationDuration. You gave a '{value.__class__.__name__}'.") @property
[docs] def param_report_period(self) -> SimulationDuration: """ Duration of the period for reporting, either in steps or in simulation time (seconds). The duration is represented by an object of class `SimulationDuration`. """ return self._param_report_period
@param_report_period.setter def param_report_period(self, value: SimulationDuration): if isinstance(value, SimulationDuration): self._param_report_period = value else: raise TypeError(f"For reporting period, I expect an object of type SimulationDuration. You gave a '{value.__class__.__name__}'.") @property
[docs] def param_timestep(self) -> float: """ In case of a constant timestep strategy, this is the duration of a step, expressed in seconds. """ return self._param_timestep
@param_timestep.setter def param_timestep(self, value: float): if isinstance(value, float): self._param_timestep = value else: raise TypeError(f"For time step, I expect a number of seconds in float.") @property
[docs] def param_timestep_strategy(self) -> TimeStepStrategy: """ Strategy used to compute the time step at each iteration. The strategy is represented by an object of class `TimeStepStrategy`. """ return self._param_timestep_strategy
@param_timestep_strategy.setter def param_timestep_strategy(self, value: TimeStepStrategy): if isinstance(value, TimeStepStrategy): self._param_timestep_strategy = value else: raise TypeError(f"For time step strategy, I expect an object of type TimeStepStrategy. You gave a '{value.__class__.__name__}'.") @property
[docs] def param_infiltration_lerp(self) -> InfiltrationInterpolation: """ Type of infiltration interpolation: constant values, linear interpolation,... The interpolation type is represented by an object of class `InfiltrationInterpolation`. """ return self._param_infiltration_lerp
@param_infiltration_lerp.setter def param_infiltration_lerp(self, value: InfiltrationInterpolation): if isinstance(value, InfiltrationInterpolation): self._param_infiltration_lerp = value else: raise TypeError(f"For infiltration interpolation strategy, I expect an object of type InfiltrationInterpolation. You gave a '{value.__class__.__name__}'.")
[docs] def add_boundary_condition(self, i: int, j: int, bc_type:BoundaryConditionsTypes, bc_value: float, border:Direction): """ Add a boundary condition to the model. :param i, j: position of the boundary condition (1-based) :param bc_type: type of boundary condition :param bc_value: value/parameter of the boundary condition :param border: mesh's border on which the boundary condtion applies """ if isinstance(bc_type, int): found = False for bc in BoundaryConditionsTypes: if bc.value == bc_type: bc_type = bc found = True break assert found, f"The B.C. index {bc_type} is unknown. Prefer using enum `BoundaryConditionsTypes`." self._boundary_conditions.append(boundary_condition_2D(i, j, bc_type, bc_value, border))
[docs] def header_wolf(self) -> header_wolf: wh = header_wolf() wh.dx, wh.dy = self._param_dx, self._param_dy wh.nbx, wh.nby = self._param_nbx, self._param_nby wh.origx, wh.origy = self.param_base_coord_ll_x, self.param_base_coord_ll_y return wh
[docs] def number_of_bc_along_X(self): return len([bc for bc in self._boundary_conditions if bc.direction == Direction.X])
[docs] def number_of_bc_along_Y(self): return len([bc for bc in self._boundary_conditions if bc.direction == Direction.Y])
[docs] def add_cell_parameter(self, i: int, j: int, bc_type:CellParameterType, bc_value: float): """ Add a cell parameter to the model. :param i: i position of the boundary condition (1-based) :param j: j position of the boundary condition (1-based) :param bc_type: type of cell parameter :param bc_value: value/parameter of the boundary condition :param border: mesh's border on which the boundary condtion applies """ self._cell_parameters.append(CellParameter(i, j, bc_type, bc_value))
[docs] def get_cell_parameter(self, i: int, j: int, bc_type:CellParameterType) -> float: """ Get the value of a cell parameter at a given position. :param i, j: position of the boundary condition (1-based) :param bc_type: type of cell parameter """ for cp in self._cell_parameters: if cp.i == i and cp.j == j and cp.ntype == bc_type: return cp.val return None
[docs] def get_all_cell_parameters(self, bc_type:CellParameterType) -> List[CellParameter]: """ Get all the cell parameters of a given type. :param bc_type: type of cell parameter """ return [cp for cp in self._cell_parameters if cp.ntype == bc_type]
[docs] def clear_cell_parameters(self, bc_type:CellParameterType = None): """ Clears all the cell parameters. Useful if one wants to build a new simulation starting with an existing one. :param bc_type: if given, only the cell parameters of this type will be cleared. """ if bc_type is None: self._cell_parameters = [] else: self._cell_parameters = [cp for cp in self._cell_parameters if cp.ntype != bc_type]
[docs] def clear_boundary_conditions(self): """ Clears all the boundary conditions. Useful if one wants to build a new simulation starting with an existing one. """ self._boundary_conditions = []
[docs] def get_injector(self, identifier:str) -> InjectorWrapper: """ Get an injector by its identifier. The identifier must be present in the simulation. """ return self._wrapped_injectors[identifier].injector
[docs] def add_injector(self, identifier:str, injector: SimulationInjector, active_zone: header_wolf, time_to_first_injection: SimulationDuration, replace=False): """ At this point only the active zone is handled by the simulator itself. The rest of the parameters of an injector (such as the starting time) will be handled by the injector's class. :param identifier: A unique identifier for the injector, used in various places (but not for display). :param active_zone: The zone over which the injector can act. These zones are handled by the simulator because it needs to make sure no two zones overlap. :param time_to_first_injection: Time to first injection, how much time the simulator will wait before calling this injector. :param replace: In case an identifier with the same `name` exists, it will be replaced. """ assert isinstance(identifier, str) assert identifier not in self._wrapped_injectors or replace, f"Identifier '{identifier}' is already in use" assert isinstance(injector, SimulationInjector), f"Expected a decendant of SimulationInjector, got {injector.__class__}" assert isinstance(active_zone, header_wolf), f"Expected header_wolf, got {active_zone}" assert isinstance(time_to_first_injection, SimulationDuration), f"Expected SimulationDuration got {time_to_first_injection}" if injector.__class__ not in SimpleSimulation._injector_classes_registry: # auto add to the registry improve ease of use (else one would have # to register classes before adding them) SimpleSimulation.register_injector_class(injector.__class__) self._wrapped_injectors[identifier] = InjectorWrapper(injector, self, active_zone, identifier, time_to_first_injection)
[docs] def delete_injector(self, identifier:str): """ Delete an injector by its identifier. :param identifier: The identifier of the injector to delete. """ assert isinstance(identifier, str) assert identifier in self._wrapped_injectors, f"Injector identifier '{identifier}' is unknown" del self._wrapped_injectors[identifier]
[docs] def get_injector_base_params(self, identifier:str) -> tuple[header_wolf, SimulationDuration]: assert isinstance(identifier, str) assert identifier in self._wrapped_injectors, f"Injector identifier '{identifier}' is unknown" wi = self._wrapped_injectors[identifier] return (wi._active_zone, wi._time_to_first_injection)
[docs] def set_injector_base_params(self, identifier:str, active_zone: header_wolf, time_to_first_injection: SimulationDuration): assert isinstance(identifier, str) assert identifier in self._wrapped_injectors, f"Injector identifier '{identifier}' is unknown" assert isinstance(active_zone, header_wolf), f"Expected header_wolf, got {active_zone}" assert isinstance(time_to_first_injection, SimulationDuration), f"Expected SimulationDuration got {time_to_first_injection}" wi = self._wrapped_injectors[identifier] wi._active_zone = active_zone wi._time_to_first_injection = time_to_first_injection
[docs] def _injector_wrappers(self) -> dict[str, InjectorWrapper]: return self._wrapped_injectors
[docs] def add_infiltration(self, time_start: float, zones_values: list[float]): """ Add an infiltration point in the infiltration chronology. :param time_start: start time of the infiltration (in seconds) :param zones_values: an array representing the quantity of water per second to add to each infiltration zones, starting at time_start. The first item corrsesponds to the zone 1, second item corresponds to zone 2 and so on. """ # Invalidate the timings cache. self._infiltrations_timings_as_gpu_array_cache = None self._infiltrations_chronology.append( (time_start, zones_values) )
[docs] def clear_infiltration_chronology(self): """ Clear the infiltration chronology. Useful if one wants to build a new simulation starting with an existing one. """ self._infiltrations_timings_as_gpu_array_cache = None self._infiltrations_chronology = []
@property
[docs] def boundary_condition(self) -> list[boundary_condition_2D]: return self._boundary_conditions
@property
[docs] def infiltrations_chronology(self) -> list[float,list[float]]: """ Returns the chronology of infiltration. This is a list. Each entry of the list is: `[start_time, [list of values for each infiltration zone]]` The start_time is when the infiltration should start. The list of value is the quantity of water falling on each mesh of each infitlation zones (the n-th entry of the list corresponds to the n-th infiltration zone). """ return self._infiltrations_chronology
@infiltrations_chronology.setter def infiltrations_chronology(self, infiltrations_chronology): self._infiltrations_timings_as_gpu_array_cache = None self._infiltrations_chronology = infiltrations_chronology
[docs] def _infiltrations_timings_as_gpu_array(self): """ Compute the infiltration timing array in a form suitable for the GPU processing. """ # Maybe a bit premature but here I cache the construction # of the infiltration timing GPU array because it could # be slow if one provides a lot of timings if self._infiltrations_timings_as_gpu_array_cache is None and \ len(self._infiltrations_chronology) >= 1 : warnings = self._check_infiltrations_warnings( self._infiltration_zones, self._nap, self._bathymetry) if warnings: logging.warning("Infiltration warnings:") for w in warnings: logging.warning(f" {w}") # The number of infiltration zones is implicitely given by # the length of rows in the infiltration chronology. # There, it is expected that all rows have the same number # of infiltration zones. nb_zones = len(self._infiltrations_chronology[0][1]) # 1 + nb_zones : 1 column for the timing, nb_zones columns for the # actual infiltrated quantities. shape = (len(self._infiltrations_chronology), 1 + nb_zones) inf_timings_and_quantities = np.zeros( shape, dtype=np.float32) for ndx, inf_data in enumerate(self._infiltrations_chronology): inf_start, inf_quantities = inf_data assert len(inf_data[1]) == nb_zones, "Not all infiltration zones values arays have the same length." inf_timings_and_quantities[ndx, 0] = inf_start inf_timings_and_quantities[ndx, 1:len(inf_quantities)+1] = inf_quantities # Spread each infiltration's the water over its zone's meshes. logging.info("Spreading infiltrations over the mesh") # Precompute the number/surface of cells in each zone. # For large mesh, large number of infiltration zones and # numerous time steps, this could be **very** slow to # compute this for each row of inf_timings. # FIXME : To be optimized if number of zones evolves with time. # Number of meshes per infiltration zones. # In the infiltration array, the zones are numbered from 1 # to nb_zones (inclusive). # Note that the source for the actual number of zones is # the infiltration chronology. So it is possible that there # are less infiltration zones in the infiltration array. zones_indices = range(1, nb_zones+1) meshes_per_zones = labeled_comprehension( self._infiltration_zones, self._infiltration_zones, zones_indices, np.count_nonzero, int, 0) zones_area= np.array(meshes_per_zones, dtype=np.float32) * self.param_dx * self.param_dy for row in inf_timings_and_quantities: for zone_ndx, zone_area in zip(zones_indices, zones_area): original_inf = row[zone_ndx] if zone_area > 0: row[zone_ndx] /= zone_area else: # The zone area is 0. It means it's no in use in the infiltration map. # So we clean the infiltration timings_and_quantities data to reflect # data. row[zone_ndx] = 0 # Infiltration is in m³/s for a complete zone # It is divided by the infitlrated surface => (m³/s) / (m²) becomes m/s. logging.debug(f"Infiltration t_start={row[0]:.1f} s, zone {zone_ndx} has {zone_area} cells with {original_inf} m³/s => infiltration per cell = {row[zone_ndx]:.4g} m/s") self._infiltrations_timings_as_gpu_array_cache = inf_timings_and_quantities logging.info(f"Spread infiltrations over the mesh - Done !") return self._infiltrations_timings_as_gpu_array_cache
[docs] def _check_infiltrations_warnings(self, infiltration_zones: np.ndarray, nap:np.ndarray, bathymetry:np.ndarray) -> list[str]: """ Check if the infiltration data is consistent. """ warnings = [] # Check if infiltration_zones are consistent with the nap array. if np.any((infiltration_zones > 0) & (nap == 0)): warnings.append("Some infiltration zones are defined in the infiltration map but not in the NAP array. Please check your infiltration map and NAP array.") warnings.append("We force the infiltration zones to be zero outside the NAP array.") # Force the infiltration zones to be zero outside the NAP array. infiltration_zones[(infiltration_zones > 0) & (nap == 0)] = 0 # Check if infiltration_zones are consistent with the bathymetry array. if np.any((infiltration_zones > 0) & (bathymetry == 99999.)): warnings.append("Some infiltration zones are defined in the infiltration map but not the bathymetry has 99999. value. Please check your infiltration map and bathymetry array.") warnings.append("We force the infiltration zones to be zero where bathymetry is 99999.") # Force the infiltration zones to be zero outside the NAP array. infiltration_zones[(infiltration_zones > 0) & (bathymetry == 99999.)] = 0 return warnings
[docs] def _check_infiltrations(self, infiltration_zones: np.ndarray, infiltration_timings, interp_infiltration: InfiltrationInterpolation, nap:np.ndarray, bathymetry:np.ndarray) -> list[str]: """ Check if the infiltration data is consistent. """ errors = [] nb_infiltration_lines = infiltration_timings.shape[0] if not np.all(np.diff(infiltration_timings[:,0]) > 0): errors.append(f"The infiltration timings are not strictly increasing: {infiltration_timings[:,0]}") if interp_infiltration == InfiltrationInterpolation.LINEAR: if infiltration_timings[0,0] != 0: errors.append(f"The first infiltration time must be zero if you use interpolation") if nb_infiltration_lines < 2: errors.append(f"You need at least two data points in infiltrations if you use interpolation") if errors: return errors zones_numbers = np.unique(infiltration_zones[infiltration_zones > 0]).tolist() if len(zones_numbers) == 0: logging.info("No infiltration zones found in the infiltration map.") for zone_ndx in range(1, infiltration_timings.shape[1]): errors.append(f"The zone {zone_ndx} doesn't appear in the infiltration map.") else: logging.debug(f"Infiltration zones numbers are : {zones_numbers}") # Check if each zone actually has an infiltration. # This test has been disabled because sometimes, users like to disable # an infiltration by setting its infiltrated quantity to zero. # used_zones = set() # unused_zones = set() # for row in infiltration_timings: # # On each line, we look at every infiltrated/exfiltrated quantities # for zone_ndx in range(1, row.shape[0]): # if abs(row[zone_ndx]) > 0.0: # used_zones.add(zone_ndx) # else: # unused_zones.add(zone_ndx) # Labeled_comprehension needs to count something that exists. # If there are no infiltration zones (the array is zero), then we # count those zeros so that Labeled_comprehension has something to count. num_count = labeled_comprehension(infiltration_zones, infiltration_zones, index=np.arange(0, infiltration_zones.max()+1), func=np.count_nonzero, out_dtype=int, default=0) # Now we make sure we avoid the "zeros" (no infiltrations) for zone_ndx in range(1, infiltration_timings.shape[1]): if zone_ndx >= num_count.shape[0] or num_count[zone_ndx] == 0: errors.append(f"The zone {zone_ndx} doesn't appear in the infiltration map.") # for zone_ndx in range(1,infiltration_timings.shape[1]): # a = infiltration_zones == zone_ndx # b = np.sum(a) # if b == 0: # logging.warning(f"The zone {zone_ndx} doesn't appear in the infiltration map.") return errors
[docs] def plot_infiltration(self, figax = None, toshow:bool=True): """ Plot the infiltration chronology. :param figax: a tuple (fig, ax) of matplotlib objects. If None, a new figure is created. :param toshow: if True, the figure is shown. :return: the figure and the axis. """ import matplotlib.pyplot as plt if figax is None: fig, ax = plt.subplots() inf = self.infiltrations_chronology times = [x[0] for x in inf] values = [x[1] for x in inf] if self.param_infiltration_lerp == InfiltrationInterpolation.LINEAR: fig.suptitle(_("Infiltration chronology (linear interpolation)")) for i in range(len(values[0])): ax.plot(times, [x[i] for x in values], label=f"Zone {i+1}") elif self.param_infiltration_lerp == InfiltrationInterpolation.NONE: fig.suptitle(_("Infiltration chronology (constant values)")) for i in range(len(values[0])): ax.step(times, [x[i] for x in values], label=f"Zone {i+1}", where="post") ax.set_xlabel(_("Time (s)")) ax.set_ylabel(_("Flow rate ($m^3s^{-1}$)")) ax.legend() if toshow: fig.show() return fig, ax
[docs] def check_warnings(self) -> Union[None, list[str]]: """ Check if the model currently set in this object is valid. A number of tests are made, they are not exhaustive. Returns None if no problem was found. Returns a list of warnings messages else. """ res = [] if self._h is not None: if np.count_nonzero(self._h < 0) > 0: res.append("There are meshes where you have h < 0. I can't handle that so I nullify h where h < 0.") self._h[self._h < 0] = 0. if np.count_nonzero(self._h == 99999.) > 0: res.append("There are meshes where you have h == 99999. I can't handle that so I nullify h where h == 99999.") self._h[self._h == 99999.] = 0. if np.count_nonzero(self.h[self.nap == 0]): res.append("There are meshes where you have h > 0 but nap == 0 ! I can't handle that so I nullify h where nap==0.") self._h[self.nap == 0] = 0. if self._h is not None and self._qx is not None: if np.count_nonzero( (self._h == 0) & (self._qx > 0)): res.append("There are meshes where you have qx > 0 but h == 0 ! I can't handle that so I nullify qx where h==0.") self._qx[self._h == 0] = 0. if self._h is not None and self._qy is not None: if np.count_nonzero((self._h == 0) & (self._qy > 0)): res.append("There are meshes where you have qy > 0 but h == 0 ! I can't handle that so I nullify qy where h==0.") self._qy[self._h == 0] = 0. if np.count_nonzero(self.bathymetry[self.nap ==0] == 0.): res.append("There are meshes where you have bathymetry == 0 but nap == 0 ! I prefer to set the bathymetry to 99999 outside the computation domain.") self._bathymetry[self.nap == 0] = 99999. if np.count_nonzero(self.bathymetry[self.nap ==0] != 99999.): res.append("There are meshes where you have bathymetry != 99999 but nap == 0 ! I prefer to set the bathymetry to 99999 outside the computation domain.") self._bathymetry[self.nap == 0] = 99999. if self.infiltration_zones is not None and self._nap is not None: res.extend(self._check_infiltrations_warnings( self._infiltration_zones, self._nap, self._bathymetry)) if len(res) == 0: return None else: return res
[docs] def check_errors(self) -> Union[None, list[str]]: """ Check if the model currently set in this object is valid. A number of tests are made, they are not exhaustive. These tests are performed after the creation of the simulation (or part of it), because sometimes we need several data to do the test (for example, we need both the NAP matrix and the boundary conditions to check they are coherent) Returns None if no problem was found. Returns a list of error messages else. """ res = [] if self._infiltration_zones is not None: if self._infiltration_zones.dtype != np.int32: res.append(f"Infiltration zones must be a np.int32 array") if self._infiltration_zones.shape != self.shape: res.append(f"Infiltration zones shape must be (param. nx, param. ny)") if not np.all(self._infiltration_zones >= 0): l = np.unique(self._infiltration_zones).tolist() res.append(f"In the infiltration zones array, the zones are numbered starting at 1. Zero means \"mesh has no infiltration\". But I see these values: {','.join(map(str,l))}.") if self.nap is not None: if self.nap.dtype != np.uint8: res.append(f"NAP matrix must be a uint8 array") if self.nap.shape != self.shape: res.append(f"NAP shape must be (param. nx, param. ny)") else: res.append("The NAP array has not been set") for name, array in {"h":self._h, "qx":self._qx, "qy":self._qy, "bathymetry":self._bathymetry, "manning":self._manning}.items(): array: np.ndarray if array is not None: if array.dtype != np.float32: res.append(f"The {name} array's type must be np.float32") if array.shape != self.shape: res.append(f"{name} shape must be (param. nx, param. ny)") else: res.append(f"{name} array must be specified") if self._bathymetry is not None: if self.nap is None: if np.count_nonzero(self._bathymetry < 0) > 0: res.append("I have found negative heights in the bathymetry. I can't handle that.") else: if np.count_nonzero(self._bathymetry[self.nap > 0] < 0) > 0: res.append("I have found negative heights in the bathymetry. I can't handle that.") if not (self.param_courant > 0): res.append("Courant number is wrong") if not (self.param_dx > 0): res.append(f"dx ({self.param_dx}) is wrong, it should be > 0") if not (self.param_dy > 0): res.append(f"dy ({self.param_dy}) is wrong, it should be > 0") if not (self._param_nbx > 4 ): res.append(f"nx is wrong ({self._param_nbx}), it should be an integer > 4 (we have 2 cells wide borders)") if not (self._param_nby > 4 ): res.append(f"ny is wrong ({self._param_nby}), it should be an integer > 4 (we have 2 cells wide borders)") if self._infiltration_zones is not None and self._infiltrations_chronology == []: res.append("If you give infiltrations zones then you must give infiltration chronolgy too") if self._infiltration_zones is None and self._infiltrations_chronology != []: res.append("If you give an infiltration chronolgy then you must give infiltrations zones too") if self._infiltration_zones is not None and self._infiltrations_chronology != []: for inf in self._infiltrations_chronology: if len(inf) < 2: res.append("In the infiltration chronology array, each line must have at least 2 elements: one for the time and at least one for the infiltration quantities") break if inf[0] < 0: res.append("In infiltration chronolgy array, the time component (first element of a line) must be >= 0") if len(inf) != len(self._infiltrations_chronology[0]): res.append("In infiltration chronolgy array, all lines must have the same number of elements") res.extend( self._check_infiltrations( self._infiltration_zones, self._infiltrations_timings_as_gpu_array(), self._param_infiltration_lerp, self._nap, self._bathymetry)) if self._params_have_duplicated_values() != "": res.append("Some parameters have duplicated values") if self._wrapped_injectors: for inj_name, inj in self._wrapped_injectors.items(): if inj._active_zone.nbx == 0 or inj._active_zone.nby == 0: res.append(f"The active zone of the injector '{inj_name}' has no cell inside (it's too thin)") if inj._active_zone.find_intersection(self.header_wolf()) is None: res.append(f"The active zone of the injector '{inj_name}' is outside the simulation domain") if len(self._wrapped_injectors) >= 2: # Now we detect if we don't have (space) overlaping injectors. t = np.zeros((self.param_nx, self.param_ny), np.bool) # bool is stored as a a byte: https://numpy.org/devdocs/reference/arrays.scalars.html#numpy.bool for inj_name, injector_wrapper in self._wrapped_injectors.items(): inj = injector_wrapper.injector zs = injector_wrapper.active_zone_as_slices() if np.sum(t[zs]) == 0: t[zs] = 1 else: res.append(f"One of the injection ({zs}, {injector_wrapper.injector.__class__.__name__}) zones steps on another one") if self._wrapped_injectors: for inj_name, inj in self._wrapped_injectors.items(): if inj._active_zone.nbx == 0 or inj._active_zone.nby == 0: res.append(f"The active zone of the injector '{inj_name}' has no cell inside (it's too thin)") if inj._active_zone.find_intersection(self.header_wolf()) is None: res.append(f"The active zone of the injector '{inj_name}' is outside the simulation domain") if len(self._wrapped_injectors) >= 2: # Now we detect if we don't have (space) overlaping injectors. t = np.zeros((self.param_nx, self.param_ny), np.bool) # bool is stored as a a byte: https://numpy.org/devdocs/reference/arrays.scalars.html#numpy.bool for inj_name, injector_wrapper in self._wrapped_injectors.items(): inj = injector_wrapper.injector zs = injector_wrapper.active_zone_as_slices() if np.sum(t[zs]) == 0: t[zs] = 1 else: res.append(f"One of the injection ({zs}, {injector_wrapper.injector.__class__.__name__}) zones steps on another one") # Before testing the B.C. make sure their coordinates are at least # correct else the rest of the tests will be meaningless. fatal_bc_error = False for bc in self._boundary_conditions: if bc.i < 1: res.append(f"Weak boundary condition at i={bc.i},j={bc.j}: i must be one-based, it is {bc.i}") fatal_bc_error = True if bc.j < 1: res.append(f"Weak boundary condition at i={bc.i},j={bc.j}: j must be one-based, it is {bc.j}") fatal_bc_error = True if bc.i > self.param_nx: res.append(f"Weak boundary condition at i={bc.i},j={bc.j}: i is outside domain (should be <= {self.param_nx})") fatal_bc_error = True if bc.j > self.param_ny: res.append(f"Weak boundary condition at i={bc.i},j={bc.j}: j is outside domain (should be <= {self.param_ny})") fatal_bc_error = True if not fatal_bc_error: for bc in self._boundary_conditions: cell_numpy_crd = (bc.i-1, bc.j-1) x, y = bc.i-1, bc.j-1 # i,j are Fortran coords => translate to numpy # - Either the BC is on the left border of a cell which # is on the interior side of the border of the domain. # - Either the BC is on the left border of a cell which # is on the exterior side of the border of the domain. left_cell_crd = (x - 1, y) if bc.direction == Direction.LEFT and not( \ (x == 0 and self.nap[cell_numpy_crd] == 1) \ or (self.nap[cell_numpy_crd] == 1 and self.nap[left_cell_crd] == 0) \ or (self.nap[cell_numpy_crd] == 0 and self.nap[left_cell_crd] == 1)): res.append(f"Weak boundary condition at i={bc.i},j={bc.j} left border: this B.C. is not on the border of the computation domain") fatal_bc_error bottom_cell_crd = (x, y - 1) if bc.direction == Direction.BOTTOM and not( \ (y == 0 and self.nap[cell_numpy_crd] == 1) \ or (self.nap[cell_numpy_crd] == 1 and self.nap[bottom_cell_crd] == 0) \ or (self.nap[cell_numpy_crd] == 0 and self.nap[bottom_cell_crd] == 1)): res.append(f"Weak boundary condition at i={bc.i},j={bc.j} bottom border: this B.C. is not on the border of the computation domain") fatal_bc_error if not fatal_bc_error: # Sort the B.C.'s by their coordinates bc_by_pos = dict() for bc in self._boundary_conditions: k = (bc.i, bc.j) bc_by_pos[k] = bc_by_pos.get(k, []) + [bc] for bc_pos, bcs_at_pos in bc_by_pos.items(): # Check for duplicates seen_bc = set([]) for bc in bcs_at_pos: if (bc.ntype, bc.direction) not in seen_bc: seen_bc.add((bc.ntype, bc.direction)) else: res.append(f"Weak boundary conditions at ({bc.i}, {bc.j}) bottom border: there are more than one B.C. of type {bc.ntype}") # Look at all the bc at a given position. # check if one of these B.C. is a QX on on a left border has_qx_left = any(bc.ntype == BoundaryConditionsTypes.QX and bc.direction == Direction.LEFT for bc in bcs_at_pos) has_qy_left = any(bc.ntype == BoundaryConditionsTypes.QY and bc.direction == Direction.LEFT for bc in bcs_at_pos) if (has_qx_left and has_qy_left) or (not has_qx_left and not has_qy_left): pass else: res.append(f"Boundary condition Qx at left border of {bc_pos} has no corresponding Qy boundary condition") has_qx_bottom = any(bc.ntype == BoundaryConditionsTypes.QX and bc.direction == Direction.BOTTOM for bc in bcs_at_pos) has_qy_bottom = any(bc.ntype == BoundaryConditionsTypes.QY and bc.direction == Direction.BOTTOM for bc in bcs_at_pos) if (has_qx_bottom and has_qy_bottom) or (not has_qx_bottom and not has_qy_bottom): pass else: res.append(f"Boundary condition Qx at bottom border of {bc_pos} has no corresponding Qy boundary condition") for bc in self._boundary_conditions: if bc.ntype in (BoundaryConditionsTypes.QX, BoundaryConditionsTypes.QY): icp = self._interior_cell(bc) if self.h[icp[0], icp[1]] == 0: res.append(f"Boundary condition {bc.ntype} at {(bc.i,bc.j)} needs water at cell ({icp[0]+1},{icp[1]+1}) to work (because the discharge will be divided by the water depth).") if len(res) == 0: return None else: return res
[docs] def _fail_if_invalid(self): warnings = self.check_warnings() if warnings is not None: logging.warning( "\n\n" + ";\n\n".join(warnings)) msg = "**** The simulation is not completely valid but I will try to run it anyway. ****" logging.warning('*' * len(msg)) logging.warning(msg) logging.warning('*' * len(msg)) ce = self.check_errors() if ce is not None: raise Exception( "\n\n" + ";\n\n".join(ce)) return warnings
[docs] _injector_classes_registry: set[type] = set()
""" The injector registry is here to allow the simple simulation class to know about the injector classes. It is useful when loading a simulation from the disk: with it one can quickly check if the injector class in the loaded simulation is supported. """ @classmethod
[docs] def register_injector_class(cls, injector_class: Type[SimulationInjector]): # Note that the _injector_classes_registry is a set => it will not duplicate # records compa = injector_class.compatibility() from semantic_version import SimpleSpec, Version pyver = sys.version_info if not SimpleSpec(compa["python"]).match(Version(major=pyver.major, minor=pyver.minor, patch=pyver.micro)): raise Exception(f"The injector {injector_class} doesn't match current pyhon version") from wolfgpu.version import __version__ if not SimpleSpec(compa["wolfgpu"]).match(Version(__version__)): raise Exception(f"The injector {injector_class} doesn't match current wolfgpu version") cls._injector_classes_registry.add(injector_class)
@classmethod
[docs] def unregister_injector_class(cls, injector_class: Type[SimulationInjector]): cls._injector_classes_registry.remove(injector_class)
@classmethod
[docs] def get_registered_injector_classes(cls) -> set[type]: return cls._injector_classes_registry
@classmethod
[docs] def unregister_all_injector_classes(cls): cls._injector_classes_registry.clear()
@classmethod
[docs] def unregister_injector_class_by_name(cls, injector_cls_name: str): to_remove = None for kls in cls._injector_classes_registry: if kls.__name__ == injector_cls_name: to_remove = kls break if to_remove is not None: cls.unregister_injector_class(to_remove)
@classmethod
[docs] def register_injector_class_from_file(cls, injector_cls_name: str, path: Path) -> type: if not isinstance(path, Path): path = Path(path) p = str(path.parent) # str() is the way to convert to native representation if p not in sys.path: sys.path.append(p) module = importlib.import_module(path.stem) injector_class = getattr(module, injector_cls_name) # Remember it came from a file # FIXME Hackish, use a map for remembering the information setattr(injector_class, INJECTOR_FROM_FILE_MARK, path) cls.register_injector_class(injector_class) return injector_class
@classmethod
[docs] def load(cls, simulation_dir: Path | str): """ Load a simulation. :param store_dir: where the simulation model is stored (a directory). :type store_dir: Path """ # NOTE I thought about using a marshalling framework such as marshmallow # but they tend to over-engineer the code and this may be a bit of # a burden to maintain for people not accustomed to such patterns of # development. So I K.I.S.S. simulation_dir = Path(simulation_dir) assert simulation_dir.exists(), f"The directory {simulation_dir} doesn't exist." with open(simulation_dir / "parameters.json","r") as pfile: data = json.loads(pfile.read()) assert data["spec_version"] in ("0.1", "0.2", "0.3") spec_version = data["spec_version"] model = SimpleSimulation(data["parameters"]["nx"], data["parameters"]["ny"]) model.path = simulation_dir model.comment = data["comment"] model.creation_date = datetime.strptime(data["creation_date"], "%d/%m/%Y %H:%M:%S") model.param_dx = data["parameters"]["dx"] model.param_dy = data["parameters"]["dy"] model.param_nx = data["parameters"]["nx"] model.param_ny = data["parameters"]["ny"] model.param_courant = data["parameters"]["courant"] model.param_froude_max = data["parameters"]["froude_max"] if spec_version >= "0.2": model._param_froude_bc_limit_tolerance = data["parameters"]["froude_bc_tolerance"] model._param_timestep_strategy = TimeStepStrategy[data["parameters"]["timestep_strategy"]] model._param_timestep = data["parameters"]["timestep"] model._param_infiltration_lerp = InfiltrationInterpolation[ data["parameters"]["infiltration_interp"]] model._param_duration = SimulationDuration.from_dict(data["parameters"]["duration"]) model._param_report_period = SimulationDuration.from_dict(data["parameters"]["report_period"]) model.param_runge_kutta = data["parameters"]["runge_kutta"] model.param_base_coord_ll_x = data["parameters"]["base_coord_x"] model.param_base_coord_ll_y = data["parameters"]["base_coord_y"] model._h = np.load(simulation_dir / data["maps"]["h"]) model._qx = np.load(simulation_dir / data["maps"]["qx"]) model._qy = np.load(simulation_dir / data["maps"]["qy"]) model._bathymetry = np.load(simulation_dir / data["maps"]["bathymetry"]) model._manning = np.load(simulation_dir / data["maps"]["manning"]) if "NAP" in data["maps"]: model._nap = np.load(simulation_dir / data["maps"]["NAP"]) if "bridge_roof" in data["maps"]: model.bridge_roof = np.load(simulation_dir / data["maps"]["bridge_roof"]) if "infiltration_zones" in data["maps"]: model._infiltration_zones = np.load(simulation_dir / data["maps"]["infiltration_zones"]) # Clear the list because one can call load several times. model._boundary_conditions = [] for bc in data["boundary_conditions"]: model._boundary_conditions.append(boundary_condition_2D.from_dict(bc)) model._infiltrations_chronology = [] for infiltrations in data["infiltrations"]: start_time, inf_values = infiltrations["start_time"], infiltrations["q"] model._infiltrations_chronology.append( [start_time, inf_values] ) if spec_version >= "0.3": model._wrapped_injectors = {} for inj_name, inj_dict in data["injectors"].items(): t = inj_dict["type"] if "source" in inj_dict: src: str = inj_dict["source"] if src.startswith("file://"): SimpleSimulation.register_injector_class_from_file(t, src.replace("file://","")) else: raise Exception(f"I don't understand the protocol for source file {src}") injector_kls: SimulationInjector = None for kls in SimpleSimulation._injector_classes_registry: if kls.__name__ == t: injector_kls = kls break assert injector_kls is not None, f"The type of injector {t} is not known" name = inj_name active_zone = model._json_to_header_wolf(inj_dict["active_zone"]) time_to_first_injection = SimulationDuration.from_dict( inj_dict["time_to_first_injection"]) del inj_dict["time_to_first_injection"] del inj_dict["active_zone"] del inj_dict["type"] new_inj = injector_kls.make_from_dict(inj_dict) model.add_injector(name, new_inj, active_zone, time_to_first_injection) model._fail_if_invalid() return model
[docs] def _json_to_header_wolf(self, d) -> header_wolf: """ Construct a header_wolf object from a JSON-like dictionary. :param d: Dictionary with keys 'dx', 'dy', 'origx', 'origy', 'nbx', 'nby' :return: header_wolf instance """ hdr_grid = header_wolf.make((self.param_base_coord_ll_x, self.param_base_coord_ll_y), (self.param_nx, self.param_ny), (self.param_dx, self.param_dy)) hw = header_wolf.from_dict(d, grid=hdr_grid) return hw
[docs] def copy(self) -> SimpleSimulation: """ Create a new simulation from an existing one. """ return deepcopy(self)
[docs] def save(self, store_dir: Path = None): """ Save the model to a directory. If `store_dir` is None, the path used to load the model is used. If the model was not loaded from a directory, an exception is raised. """ assert isinstance(store_dir, Path), "store_dir must be a path" self._fail_if_invalid() if store_dir is None and self.path is None: raise Exception("The simulation has no directory. Please provide one.") path = store_dir or self.path assert path is not None, "I need a path to save to" if isinstance(path, str): path = Path(path) if not path.exists(): path.mkdir(parents=True) else: assert path.is_dir() for name, array in {"h":self._h, "qx":self._qx, "qy":self._qy, "bathymetry":self._bathymetry, "manning":self._manning, "infiltration_zones":self._infiltration_zones, "NAP": self.nap}.items(): array: np.ndarray if array is not None: np.save(path / name, array) if self.has_bridge(): np.save(path / "bridge_roof", self.bridge_roof) if store_dir is None and self.path is None: raise Exception("The simulation has no directory. Please provide one.") path = store_dir or self.path with open(path / "parameters.json","w") as pfile: pfile.write(json.dumps(self._save_json(), indent=2))
[docs] def _save_json(self) -> dict: """ Save the parameters to a json file. """ if self.creation_date is None: self.creation_date = datetime.now() maps = {} for name, array in {"h":self._h, "qx":self._qx, "qy":self._qy, "bathymetry":self._bathymetry, "manning":self._manning, "infiltration_zones":self._infiltration_zones, "NAP": self.nap}.items(): array: np.ndarray if array is not None: maps[name] = name + ".npy" if self.has_bridge(): maps["bridge_roof"] = "bridge_roof.npy" params = { "courant": self.param_courant, "froude_max": self.param_froude_max, "froude_bc_tolerance": self._param_froude_bc_limit_tolerance, "nx" : self._param_nbx, "ny" : self._param_nby, "dx" : self.param_dx, "dy" : self.param_dy, "timestep_strategy" : self._param_timestep_strategy.name, "timestep": self._param_timestep, "infiltration_interp" : self._param_infiltration_lerp.name, "duration": self._param_duration.to_dict(), "report_period": self._param_report_period.to_dict(), "runge_kutta": self.param_runge_kutta, "base_coord_x": self.param_base_coord_ll_x, "base_coord_y": self.param_base_coord_ll_y } bcs = [bc.to_dict() for bc in self._boundary_conditions] infs = [ { "start_time": start, "q": values} for start, values in self._infiltrations_chronology ] injectors_as_dicts = self._save_injectors_to_json() data = { "spec_version" : "0.3", "comment" : self.comment, "creation_date" : self.creation_date.strftime("%d/%m/%Y %H:%M:%S"), "parameters" : params, "boundary_conditions": bcs, "infiltrations" : infs, "maps" : maps, "injectors" : injectors_as_dicts } return data
[docs] def _save_injectors_to_json(self) -> dict: injectors_as_dicts = {} for inj_name, inj in self._wrapped_injectors.items(): inj: InjectorWrapper generic = { "type" : inj.injector.__class__.__name__, "active_zone" : inj._active_zone.to_dict(), "time_to_first_injection" : inj._time_to_first_injection.to_dict() } if hasattr(inj.injector.__class__, INJECTOR_FROM_FILE_MARK): generic["source"] = "file://" + str(getattr(inj.injector.__class__, INJECTOR_FROM_FILE_MARK)) specific = inj.injector.get_params() injectors_as_dicts[inj_name] = specific | generic return injectors_as_dicts
def __str__(self) -> str: """ Return a string representation of the simulation. """ manning = self._manning active = self.nap wet = self._h m = manning[active > 0] w,h = self._param_nbx, self._param_nby domain = w*h total_wet = np.count_nonzero(wet[:,:][active > 0] > 0) total_active = np.sum(active > 0) txt = [] txt.append(f"Domain : {w}x{h} meshes, (dx={self.param_dx:g} m.,dy={self.param_dx:g} m.), {100*total_active/domain:.1f}% active, {100*total_wet/domain:.1f}% wet") if self.param_timestep_strategy == TimeStepStrategy.FIXED_TIME_STEP: txt.append(f"Timestep : Fixed, {self._param_timestep:g} s.") else: txt.append(f"Timestep : Optimized") if self.param_report_period is not None: rep = f"1 report every {self.param_report_period}" # if self.param_report_period.type == SimulationDurationType.SECONDS: # rep += f" (time sampled every {TIME_SAMPLING_RESOLUTION} steps)" else: rep = "no reports recorded" txt.append(f"Duration : {self.param_duration}, {rep}") if self.param_runge_kutta == 1.0: txt.append("Ponderation : Euler") else: txt.append( f"Ponderation : Runge-Kutta {self.param_runge_kutta:g}/{1-self.param_runge_kutta:g}" ) if self.param_infiltration_lerp == InfiltrationInterpolation.NONE: interp = "None" elif self.param_infiltration_lerp == InfiltrationInterpolation.LINEAR: interp = "Linear" txt.append(f"Infiltration interpolation : {interp}") txt.append(f"Courant number : {self.param_courant:g}") # FIXME Non dimensional, correct ? txt.append(f"Froude limit : {self.param_froude_max:g}") txt.append(f"Froude BC tol. : {self.param_froude_bc_limit_tolerance:g}") #print(f"Dry up handling: {['Enabled','Disabled'][sim._no_dry is True]}") txt.append(f"Mean of Manning coefficients: {np.mean(m):.3f} [s.m^{{-1/3}}]") return "\n".join(txt)
[docs] def to_wolfparam(self) -> Wolf_Param: """ Return a Wolf_Param object that represents the parameters of the simulation. """ _wp = Wolf_Param(parent = None, title = "Simple simulation GPU", to_read=False, withbuttons=True, toShow=False, init_GUI=False, force_even_if_same_default = True) _wp.add_param(_('Scheme'), _('Time step strategy'), self.param_timestep_strategy.value, Type_Param.Integer, jsonstr={"Values":{"Fixed time step":1,"Optimized time step":2}}) _wp.add_param(_('Infiltration'), _('Interpolation mode'), self.param_infiltration_lerp.value, Type_Param.Integer, jsonstr={"Values":{"None":0, "Linear":1}}) _wp = self.add_to_wolfparam(_wp) return _wp
[docs] def add_to_wolfparam(self, dest:Wolf_Param) -> Wolf_Param: """ Return a Wolf_Param object that represents the parameters of the simulation. """ dest[(_('General'), _('X Lower left corner [m]'))] = self.param_base_coord_ll_x dest[(_('General'), _('Y Lower left corner [m]'))] = self.param_base_coord_ll_y dest[(_('Spatial resolution'), 'dx [m]')] = self.param_dx dest[(_('Spatial resolution'), 'dy [m]')] = self.param_dy dest[(_('Shape'), _('Number of meshes along X - nx'))] = self.param_nx dest[(_('Shape'), _('Number of meshes along Y - ny'))] = self.param_ny dest[(_('Scheme'), _('Courant number [-]'))] = self.param_courant dest[(_('Scheme'), _('Froude limit [-]'))] = self.param_froude_max dest[(_('Scheme'), _('Time step strategy'))] = self.param_timestep_strategy.value dest[(_('Scheme'), _('Time step [s]'))] = self.param_timestep dest[(_('Scheme'), _('Runge-Kutta ponderation [-]'))] = self.param_runge_kutta dest[(_('Duration'), _('Total duration [s]'))] = self.param_duration.duration dest[(_('Duration'), _('Report period [s]'))] = self.param_report_period.duration dest[(_('Boundary conditions'), _('Froude BC - tolerance on H [-]'))] = self.param_froude_bc_limit_tolerance dest[(_('Infiltration'), _('Interpolation mode'))] = self.param_infiltration_lerp.value return dest
[docs] def from_wolfparam(self, wp: Wolf_Param): """ Callback called when the Wolf_Param object is modified and apply routine is called. """ assert isinstance(wp, Wolf_Param), "I need a Wolf_Param object." self.param_base_coord_ll_x = wp[(_('General'), _('X Lower left corner [m]'))] self.param_base_coord_ll_y = wp[(_('General'), _('Y Lower left corner [m]'))] self.param_dx = wp[(_('Spatial resolution'), 'dx [m]')] self.param_dy = wp[(_('Spatial resolution'), 'dy [m]')] self.param_nx = wp[(_('Shape'), _('Number of meshes along X - nx'))] self.param_ny = wp[(_('Shape'), _('Number of meshes along Y - ny'))] self.param_courant = wp[(_('Scheme'), _('Courant number [-]'))] self.param_froude_max = wp[(_('Scheme'), _('Froude limit [-]'))] self.param_froude_bc_limit_tolerance = wp[(_('Boundary conditions'), _('Froude BC - tolerance on H [-]'))] strategy = wp[(_('Scheme'), _('Time step strategy'))] self.param_timestep_strategy = TimeStepStrategy.FIXED_TIME_STEP if strategy == 1 else TimeStepStrategy.OPTIMIZED_TIME_STEP self.param_timestep = wp[(_('Scheme'), _('Time step [s]'))] self.param_runge_kutta = wp[(_('Scheme'), _('Runge-Kutta ponderation [-]'))] self.param_duration.duration = wp[(_('Duration'), _('Total duration [s]'))] self.param_report_period.duration = wp[(_('Duration'), _('Report period [s]'))] lerp = wp[(_('Infiltration'), _('Interpolation mode'))] self.param_infiltration_lerp = InfiltrationInterpolation.NONE if lerp == 0 else InfiltrationInterpolation.LINEAR
[docs] def write_initial_condition_from_record(self, recpath:Path = None, id_rec:int = None, destpath:Path = None) -> int: """ Define the initial condition of the simulation from a previous record. :param recpath: the path to the records. if None, the default path is used and 'simul_gpu_results' as result directory. :param id_rec: the index of the record you want to start from. :param destpath: the path where to save the initial condition. If None, the current path is used. :return: 0 if everything went well, -1 if the record doesn't exist, -2 if the record path doesn't exist. """ if recpath is None: recpath = self.path / 'simul_gpu_results' assert isinstance(recpath, Path), "I need a Path. Voilà." # import here to avoid circular import from .results_store import ResultsStore from shutil import copyfile if not recpath.exists(): logging.error(f"Record path {recpath} doesn't exist.") return -2 # create a result store _store = ResultsStore(recpath, mode="r") # check if the record exists if id_rec is None: id_rec = _store.nb_results assert id_rec > 0, "The index of the record must be > 0" if id_rec > _store.nb_results: logging.error(f"Record {id_rec} doesn't exist.") return -1 # get the result t,dt,n_iter_dry_up_euler, n_iter_dry_up_rk, h,qx,qy = _store.get_result(id_rec) # check the shape assert h.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation." assert qx.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation." assert qy.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation." if destpath is None: destpath = self.path destpath.mkdir(parents=True, exist_ok=True) # save the initial condition if self.path == destpath: copyfile(self.path / "h.npy" , destpath / "_h_backup.npy") copyfile(self.path / "qx.npy", destpath / "_qx_backup.npy") copyfile(self.path / "qy.npy", destpath / "_qy_backup.npy") # write the initial condition mask = h == 99999.0 h[mask] = 0.0 qx[mask] = 0.0 qy[mask] = 0.0 np.save(destpath / 'h.npy', h.T) np.save(destpath / 'qx.npy', qx.T) np.save(destpath / 'qy.npy', qy.T) if self.path != destpath: # Copy bathymetry data only if the destination path is different from the current path np.save(destpath / 'bathymetry.npy', self.bathymetry) return 0
[docs] def copy_ic_from_simulation(self, source:SimpleSimulation): """ Copy the initial condition from a simulation to the current simulation. """ assert source is not None, "Source simulation is None." assert source.param_nx == self.param_nx, "Source simulation has a different nx than the current simulation." assert source.param_ny == self.param_ny, "Source simulation has a different ny than the current simulation." self.h = source.h.copy() self.qx = source.qx.copy() self.qy = source.qy.copy() mask = self.h == 99999.0 self.h[mask] = 0.0 self.qx[mask] = 0.0 self.qy[mask] = 0.0
[docs] def copy_ic_from_record(self, source:Path, id_rec:int = None): """ Copy the initial condition from a record to the current simulation. """ assert source.exists(), f"Source path {source} doesn't exist." # import here to avoid circular import from .results_store import ResultsStore # create a result store _store = ResultsStore(source, mode="r") # check if the record exists if id_rec is None: id_rec = _store.nb_results assert id_rec > 0, "The index of the record must be > 0" if id_rec > _store.nb_results: logging.error(f"Record {id_rec} doesn't exist.") return -1 # get the result t,dt,n_iter_dry_up_euler, n_iter_dry_up_rk, h,qx,qy = _store.get_result(id_rec) # check the shape assert h.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation." assert qx.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation." assert qy.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation." mask = h == 99999.0 self.h[mask] = 0.0 self.qx[mask] = 0.0 self.qy[mask] = 0.0 self.h = h.T self.qx = qx.T self.qy = qy.T return 0
[docs] def has_infiltration(self): return self.infiltration_zones is not None
[docs] def mask_nap_contour(self, nb_cells:int=1): """ Mask the contour of the NAP array. """ self.nap[:nb_cells, :] = 0 self.nap[-nb_cells:, :] = 0 self.nap[:, :nb_cells] = 0 self.nap[:, -nb_cells:] = 0
[docs] def make_void_arrays(self): """ Create void arrays from (nx, ny) shape. Arrays : - nap - h - qx - qy - manning - bathymetry """ assert self.param_nx > 0, "nx must be > 0" assert self.param_ny > 0, "ny must be > 0" self._nap = np.ones((self.param_nx, self.param_ny), dtype=np.uint8) self.mask_nap_contour() self._h = np.zeros((self.param_nx, self.param_ny), dtype=np.float32) self._qx = np.zeros((self.param_nx, self.param_ny), dtype=np.float32) self._qy = np.zeros((self.param_nx, self.param_ny), dtype=np.float32) self._manning = np.zeros((self.param_nx, self.param_ny), dtype=np.float32) self._bathymetry = np.zeros((self.param_nx, self.param_ny), dtype=np.float32)
[docs] def make_void_infiltration_zones(self): """ Create a void infiltration zones array. """ assert self.param_nx > 0, "nx must be > 0" assert self.param_ny > 0, "ny must be > 0" self._infiltration_zones = np.zeros((self.param_nx, self.param_ny), dtype=np.int32)
@property
[docs] def bridge_roof(self) -> np.ndarray: """ Return the bridge roof. """ if self._bridge_roof_elevation is None: self.make_void_bridge_roof() for cur in self._cell_parameters: cur:CellParameter if cur.ntype == CellParameterType.BRIDGE_ROOF_ELEVATION: self._bridge_roof_elevation[cur.i-1, cur.j-1] = cur.val + self.bathymetry[cur.i-1, cur.j-1] return self._bridge_roof_elevation
[docs] def has_bridge(self) -> bool: """ Return True if the simulation has a bridge roof. """ for cur in self._cell_parameters: cur:CellParameter if cur.ntype == CellParameterType.BRIDGE_ROOF_ELEVATION: return True return False
@bridge_roof.setter def bridge_roof(self, value:np.ndarray): """ Set the bridge roof. """ if value is None: self._bridge_roof_elevation = value self.clear_cell_parameters(CellParameterType.BRIDGE_ROOF_ELEVATION) return if self._check_np_array_type(value): self._bridge_roof_elevation = value self.convert_roof_array2params(value)
[docs] def make_void_bridge_roof(self): """ Create a void bridge roof array. """ assert self.param_nx > 0, "nx must be > 0" assert self.param_ny > 0, "ny must be > 0" self._bridge_roof_elevation = np.ones((self.param_nx, self.param_ny), dtype=np.float32) * 99999.0
[docs] def convert_roof_array2params(self, roof_array:np.ndarray = None): """ Convert a roof array to a list of parameters. It will remove all the existing bridge roof parameters before add the new ones. """ if roof_array is None: roof_array = self._bridge_roof_elevation if roof_array is None: logging.error("No roof elevation to convert.") return self.clear_cell_parameters(CellParameterType.BRIDGE_ROOF_ELEVATION) ij = np.where((roof_array > 0) & (roof_array != 99999.0) & (self.nap ==1)) if len(ij[0]) == 0: return for i,j in zip(ij[0], ij[1]): if roof_array[i,j] > self.bathymetry[i,j]: self.add_cell_parameter(i+1, j+1, CellParameterType.BRIDGE_ROOF_ELEVATION, roof_array[i,j] - self.bathymetry[i,j]) else: logging.warning(f"Bridge roof elevation at ({i+1},{j+1}) is lower or equal than the bathymetry. I skip it.") logging.debug(f"Converted {len(ij[0])} roof elevations to parameters.")
[docs] def _params_have_duplicated_values(self) -> str: """ Check if the parameters are not duplicated. """ ret = "" # for i,cur in enumerate(self._cell_parameters): # cur:CellParameter # for j,cur2 in enumerate(self._cell_parameters[i+1:]): # cur2:CellParameter # if i != j and cur.i == cur2.i and cur.j == cur2.j and cur.ntype == cur2.ntype: # logging.warning(f"Duplicate parameters at ({cur.i},{cur.j})") # ret += f"Duplicate parameters at ({cur.i},{cur.j})\n" tmp = list(set(self._cell_parameters)) if len(tmp) != len(self._cell_parameters): logging.warning("Duplicate parameters found - Check your parameters !") ret += "Duplicate parameters found - Check your parameters !\n" return ret
[docs] def ensure_nullvalue_outside_nap(self, nodata=99999.0): """ Force the nodata where nap is 0 for: - topo/bathymetry - manning Force zero value for initial conditions where nap is 0: - h - qx - qy """ msk= self.nap == 0 self._h[msk] = 0. self._qx[msk] = 0. self._qy[msk] = 0. self._manning[msk] = nodata self._bathymetry[msk] = nodata if self.infiltration_zones is not None: self._infiltration_zones[msk] = 0 if self.bridge_roof is not None: self.bridge_roof[msk] = nodata