Source code for wolfgpu.simple_simulation

from __future__ import annotations
# Run this as python -m wolfgpu.simple_simulation else you'll enter a
# dependency/import Alice In Wonderland loop.

import json
from enum import Enum
from pathlib import Path
from datetime import datetime
from typing import Union
import numpy as np
import numpy.typing as npt
import logging
from copy import deepcopy

from .utils import parse_duration_to_seconds

from wolfhece.PyParams import Wolf_Param, Type_Param
from wolfhece.PyTranslate import _

# FIXME This should be imported from WolfHECE
[docs] class BoundaryConditionsTypes(Enum): """The boundary conditions types. Note: The values match the numbers in Wolf's simulations parameters. """
[docs] H = 1
""" Weak bondary condition over :math:`h+b`, the water free surface height (that is, height of water plus bathymetry level). """
[docs] QX = 2
""" Weak bondary condition over :math:`q_x`, the discharge over the x-axis. """
[docs] QY = 3
""" Weak bondary condition over :math:`q_y`, the discharge over the x-axis. """
[docs] NONE = 4
""" This is used to represent the absence of boundary condition. """
[docs] QBX = 5
""" Not used. """
[docs] QBY = 6
""" Not used. """
[docs] HMOD = 7
""" Weak bondary condition over :math:`h`, the water free surface height (that is, height of water, measured from the top of the bathymetry). """
[docs] FROUDE_NORMAL = 8
""" Weak boundary condition for Froude limited height. Water height becomes :math:`\\min(h, \\frac{u^2}{g \\times \\text{Fr}^2})` where :math:`\\text{Fr}` is given. """
[docs] class Direction(Enum):
[docs] LEFT = 1
[docs] BOTTOM = 2
[docs] X = 1
[docs] Y = 2
# FIXME This *MUST* be imported from WolfHECE
[docs] class boundary_condition_2D: ''' Type des CL générales, faibles et fortes @author Pierre Archambeau ''' def __init__(self, i: int, j: int, ntype: BoundaryConditionsTypes, val: float, direction: Direction) -> None: assert i >= 1 assert j >= 1 assert isinstance(ntype, BoundaryConditionsTypes) assert isinstance(direction, Direction) self.i = i # indice de colonne dans le plus petit maillage self.j = j # indice de ligne dans le plus petit maillage self.ntype = ntype # type de cl (h=1,qx=2,qy=3,rien=4,qbx=5,qby=6,hmod=7,fr=8) self.val = val # valeur à imposer self.direction = direction
[docs] def to_dict(self): return { "i" : self.i, "j" : self.j, "type": self.ntype.name, "val" : float(self.val), # float makes sure the load and save to JSON has same format ("xxx.yyy") event when passing int's. "direction": self.direction.name, }
@classmethod
[docs] def from_dict(kls, js): return boundary_condition_2D(js["i"], js["j"], BoundaryConditionsTypes[js["type"]], float(js["val"]), Direction[js["direction"]])
[docs] class SimulationDurationType(Enum): """ A type of duration. This was introduced to abstract the duration of a simulation. """ # Note that the way we put it, there's no way you can do # a simulation that has no planned finite time (it's time # step could be infinitely small though).
[docs] SECONDS=0
""" A duration expressed in seconds. """
[docs] STEPS=1
""" A duration expressed in steps. """
[docs] class ReportFrequencyType(Enum): """ When specifying the report period, one can use two different units. """ # FIXME Can't we just use a simulationduration ? # These are the values used by Wolf
[docs] ITERATION = 1
""" Report period in iterations """
[docs] SIMULATION_TIME = 0
""" Report period in simulation time (seconds) """
[docs] class SimulationDuration: """ In the GPU simulator, we represent durations about the simulation either as number of simulation steps either as simulation time (in seconds). This class can represent both. """ def __init__(self, _type:SimulationDurationType, duration: Union[float, int]): assert isinstance(_type, SimulationDurationType) assert duration > 0, f"A duration must be strictly positive. You gave {duration}." if _type == SimulationDurationType.STEPS: assert type(duration) == int, "Simulation total number of steps must be an integer" self.type = _type self.duration = duration # Expressed in seconds def __str__(self): if self.type == SimulationDurationType.STEPS: if self.duration <= 1: return f"{self.duration} step" else: return f"{self.duration} steps" elif self.type == SimulationDurationType.SECONDS: return f"{self.duration:g} s."
[docs] def to_dict(self): return { "type": self.type.name, "duration": self.duration }
[docs] def from_dict(self, d): self.type = SimulationDurationType[d["type"]] if self.type == SimulationDurationType.SECONDS: self.duration = float(d["duration"]) elif self.type == SimulationDurationType.STEPS: self.duration = int(d["duration"]) else: raise Exception(f"Unexpected duration type, you gave {self.type}")
def __eq__(self, __value: object) -> bool: a = self.type == __value.type b = self.duration == __value.duration return self.type == __value.type and self.duration == __value.duration @classmethod
[docs] def from_seconds(cls, seconds) -> SimulationDuration: return SimulationDuration(SimulationDurationType.SECONDS, parse_duration_to_seconds(seconds))
@classmethod
[docs] def from_steps(cls, steps) -> SimulationDuration: return SimulationDuration(SimulationDurationType.STEPS, int(steps))
@classmethod
[docs] def from_str(cls, d: str) -> SimulationDuration: # An integer is a number of steps # Anything else (float or hms) is a duration try: steps = int(d) except ValueError: return cls.from_seconds(d) return cls.from_steps(steps)
[docs] class TimeStepStrategy(Enum): """ Describes the way the time step is computed on each simulation step. """
[docs] FIXED_TIME_STEP=1
""" The time step is constant over all simulation steps. """
[docs] OPTIMIZED_TIME_STEP=2
""" The time step is the maximum of the minimum time steps authorized by the Courant formula. """
[docs] class InfiltrationInterpolation(Enum): """ Describes the way the infiltration are computed across an infiltration time interval.""" # These are the values used by Wolf
[docs] NONE = 0 # Constant value over time interval
""" The infiltration is constant over the infiltration time interval. """
[docs] LINEAR = 1 # Linear interpolation over the interval
""" The infiltration is interpolated over the infiltration time interval. The initial value is the one associated with interval, the final value is the one associated with the next interval. """
[docs] class SimpleSimulation: """ A simple simulation scenario. IMPORTANT: althoug we use numpy arrays to represent various matrices, be careful: we use shapes as (nb of columns, nb of rows). This is transposed comprared to regular numpy ! """
[docs] path: Path
""" Directory where the simulation was loaded or will be saved."""
[docs] creation_date: datetime
""" Creation or last save date. """
[docs] comment: str
""" A free text comment, no formatting supported. """
[docs] param_froude_max: float
""" Froude limit """
[docs] param_runge_kutta: float
""" Runge-Kutta ponderation. The value is the ponderation of the estimator. Thus 1-ponderation is applied to the corrector. None means no RK but simple Euler step."""
[docs] _param_dx: float
""" Size of a single mesh along x axis, in meters. """
[docs] _param_dy: float
""" Size of a single mesh along y axis, in meters. """
[docs] _param_nx: int
""" Size of the computation domain along the x axis, in number of meshes. """
[docs] _param_ny: int
""" Size of the computation domain along the y axis, in number of meshes. """
[docs] param_base_coord_ll_x: float
""" The computation domain coordinates are relative to "base" coordinates. This parameter is the x component of the base coordinates. The basis of the coordinates system is located, by convention, on the lower left corner of the domain (i.e. at (0,0) in the local coordinates system). """
[docs] param_base_coord_ll_y: float
""" The computation domain coordinates are relative to "base" coordinates. This parameter is the y component of the base coordinates. The basis of the coordinates system is located, by convention, on the lower left corner of the domain (i.e. at (0,0) in the local coordinates system). """
[docs] _param_timestep_strategy: TimeStepStrategy
""" Strategy used to compute the time step at each iteration. """
[docs] _param_timestep: float
""" In case of a constant time step strategy, this is the duration of the time step. It will be ignored in other cases. """
[docs] _param_infiltration_lerp: InfiltrationInterpolation
""" Type of infiltration interpolation: constant values, linear interpolation,... """
[docs] _param_duration: SimulationDuration
""" Duration of the simulation, either in steps or in simulation time (seconds) """
[docs] _param_report_period: SimulationDuration
""" Duration of the period for reporting, either in steps or in simulation time (seconds) """
[docs] _h : npt.NDArray[np.float32]
""" Matrix of water height, in meters. """
[docs] _qx : npt.NDArray[np.float32]
""" Matrix of quantity of movement along x axis, in m²/s. """
[docs] _qy : npt.NDArray[np.float32]
""" Matrix of quantity of movement along y axis, in m²/s. """
[docs] _bathymetry : npt.NDArray[np.float32]
""" Bathymetry, in meters. """
[docs] _manning : npt.NDArray[np.float32]
""" Manning coefficients. """
[docs] infiltration_zones : npt.NDArray[np.int32]
""" Infiltration zones indices. Indices are numbered from one. 0 means no infiltration in the mesh. """
[docs] nap : npt.NDArray[np.uint8]
""" Matrix representing which mesh are part of the computation domain or not. 1 means a mesh is part of the computation domain, 0 means it's not."""
[docs] _wp : Wolf_Param
""" Attributes of the simulation as Wolf_Param instance. """ def __init__(self, nx: int=0, ny: int=0): """ Create a new simple simulation. When created, a simulation is empty. One must at least set the NAP array and the `h` array to have something that can be simulated. nx, ny : number of meshes horizontally and vertically. For technical reasons, nx and ny must be both >= 5. """ if nx > 0 and ny > 0: assert nx >= 5, "The number of meshes on the x axis must be at least 5" assert ny >= 5, "The number of meshes on the y axis must be at least 5" self.path = None self._h = None self._qx = None self._qy = None self._manning = None self._bathymetry = None self.infiltration_zones = None self.nap = None self._boundary_conditions: list[boundary_condition_2D] = [] self._infiltrations_chronology =[] self._infiltrations_timings_as_gpu_array_cache = None self.param_courant = 0.5 self.param_dx, self.param_dy = 0, 0 self._param_nx, self._param_ny = nx, ny self.param_froude_max = 3 self._param_timestep_strategy = TimeStepStrategy.OPTIMIZED_TIME_STEP self._param_timestep = 0.0 self.param_runge_kutta = 0.3 self._param_duration = SimulationDuration.from_steps(1000) self._param_report_period = SimulationDuration.from_steps(10) self._param_infiltration_lerp = InfiltrationInterpolation.NONE self.param_base_coord_ll_x = 0.0 self.param_base_coord_ll_y = 0.0 self.comment = None self.creation_date = None
[docs] def _check_np_array_type(self, a: np.array, dtype=np.float32, check_size=True): if not isinstance(a, np.ndarray): raise TypeError("I expect a numpy array") elif not a.dtype == dtype: raise TypeError(f"I expect the type {dtype}, you gave {a.dtype}") elif not check_size or a.shape != (self._param_nx, self._param_ny): raise TypeError(f"I expect the array to have shape ({self._param_nx},{self._param_ny})") else: return True
@property
[docs] def param_dx(self): return self._param_dx
@param_dx.setter def param_dx(self, dx): self._infiltrations_timings_as_gpu_array_cache = None self._param_dx = dx @property
[docs] def param_dy(self): return self._param_dy
@param_dy.setter def param_dy(self, dy): self._infiltrations_timings_as_gpu_array_cache = None self._param_dy = dy @property
[docs] def h(self) -> npt.NDArray[np.float32]: return self._h
@h.setter def h(self, value: npt.NDArray[np.float32]): if self._check_np_array_type(value): self._h = value @property
[docs] def qx(self) -> npt.NDArray[np.float32]: return self._qx
@qx.setter def qx(self, value: npt.NDArray[np.float32]): if self._check_np_array_type(value): self._qx = value @property
[docs] def qy(self) -> npt.NDArray[np.float32]: return self._qy
@qy.setter def qy(self, value: npt.NDArray[np.float32]): if self._check_np_array_type(value): self._qy = value @property
[docs] def manning(self) -> npt.NDArray[np.float32]: return self._manning
@manning.setter def manning(self, value: npt.NDArray[np.float32]): if self._check_np_array_type(value): self._manning = value @property
[docs] def bathymetry(self) -> npt.NDArray[np.float32]: return self._bathymetry
@bathymetry.setter def bathymetry(self, value: npt.NDArray[np.float32]): if self._check_np_array_type(value): self._bathymetry = value @property
[docs] def param_nx(self) -> int: """ Dimension of the computation domain on the x axis. """ return self._param_nx
@param_nx.setter def param_nx(self, value: int): self._param_nx = value @property
[docs] def param_ny(self) -> int: """ Dimension of the computation domain on the y axis. """ return self._param_ny
@param_ny.setter def param_ny(self, value: int): self._param_ny = value @property
[docs] def shape(self) -> tuple[int, int]: return (self._param_nx, self._param_ny)
@property
[docs] def param_duration(self) -> SimulationDuration: """ Duration of the simulation, either in steps or in simulation time (seconds). The duration is represented by an object of class `SimulationDuration`. """ return self._param_duration
@param_duration.setter def param_duration(self, value: SimulationDuration): if isinstance(value, SimulationDuration): self._param_duration = value else: raise TypeError(f"For simulation duration, I expect an object of type SimulationDuration. You gave a '{value.__class__.__name__}'.") @property
[docs] def param_report_period(self) -> SimulationDuration: """ Duration of the period for reporting, either in steps or in simulation time (seconds). The duration is represented by an object of class `SimulationDuration`. """ return self._param_report_period
@param_report_period.setter def param_report_period(self, value: SimulationDuration): if isinstance(value, SimulationDuration): self._param_report_period = value else: raise TypeError(f"For reporting period, I expect an object of type SimulationDuration. You gave a '{value.__class__.__name__}'.") @property
[docs] def param_timestep(self) -> float: """ In case of a constant timestep strategy, this is the duration of a step, expressed in seconds. """ return self._param_timestep
@param_timestep.setter def param_timestep(self, value: float): if isinstance(value, float): self._param_timestep = value else: raise TypeError(f"For time step, I expect a number of seconds in float.") @property
[docs] def param_timestep_strategy(self) -> TimeStepStrategy: """ Strategy used to compute the time step at each iteration. The strategy is represented by an object of class `TimeStepStrategy`. """ return self._param_timestep_strategy
@param_timestep_strategy.setter def param_timestep_strategy(self, value: TimeStepStrategy): if isinstance(value, TimeStepStrategy): self._param_timestep_strategy = value else: raise TypeError(f"For time step strategy, I expect an object of type TimeStepStrategy. You gave a '{value.__class__.__name__}'.") @property
[docs] def param_infiltration_lerp(self) -> InfiltrationInterpolation: """ Type of infiltration interpolation: constant values, linear interpolation,... The interpolation type is represented by an object of class `InfiltrationInterpolation`. """ return self._param_infiltration_lerp
@param_infiltration_lerp.setter def param_infiltration_lerp(self, value: InfiltrationInterpolation): if isinstance(value, InfiltrationInterpolation): self._param_infiltration_lerp = value else: raise TypeError(f"For infiltration interpolation strategy, I expect an object of type InfiltrationInterpolation. You gave a '{value.__class__.__name__}'.")
[docs] def add_boundary_condition(self, i: int, j: int, bc_type:BoundaryConditionsTypes, bc_value: float, border:Direction): """ Add a boundary condition to the model. i, j: position of the boundary condition (1-based) bc_type: type of boundary condition bc_value: value/parameter of the boundary condition border: mesh's border on which the boundary condtion applies """ if isinstance(bc_type, int): for bc in BoundaryConditionsTypes: if bc.value == bc_type: bc_type = bc break self._boundary_conditions.append(boundary_condition_2D(i, j, bc_type, bc_value, border))
[docs] def clear_boundary_conditions(self): """ Clears all the boundary conditions. Useful if one wants to build a new simulation starting with an existing one. """ self._boundary_conditions = []
[docs] def add_infiltration(self, time_start: float, zones_values: list[float]): """ Add an infiltration point in the infiltration chronology. :param: time_start start time of the infiltration (in seconds) :param: zones_values an array representing the quantity of water per second to add to each infiltration zones, starting at time_start. The first item corrsesponds to the zone 1, second item corresponds to zone 2 and so on. """ # Invalidate the timings cache. self._infiltrations_timings_as_gpu_array_cache = None self._infiltrations_chronology.append( [time_start, zones_values] )
[docs] def clear_infiltration_chronology(self): """ Clear the infiltration chronology. Useful if one wants to build a new simulation starting with an existing one. """ self._infiltrations_timings_as_gpu_array_cache = None self._infiltrations_chronology = []
@property
[docs] def boundary_condition(self) -> list[boundary_condition_2D]: return self._boundary_conditions
@property
[docs] def infiltrations_chronology(self) -> list[float,list[float]]: """ Returns the chronology of infiltration. This is a list. Each entry of the list is: [start_time, [list of values for each infitlration zone]] """ return self._infiltrations_chronology
@infiltrations_chronology.setter def infiltrations_chronology(self, infiltrations_chronology): self._infiltrations_timings_as_gpu_array_cache = None self._infiltrations_chronology = infiltrations_chronology
[docs] def _infiltrations_timings_as_gpu_array(self): # Maybe a bit premature buthere I cache the construction # of the infiltration timing GPU array (because it could # be slow if one provides a lot of timings) if self._infiltrations_timings_as_gpu_array_cache is None and \ len(self._infiltrations_chronology) >= 1 : shape = (len(self._infiltrations_chronology), 1 + len(self._infiltrations_chronology[0][1])) inf_timings = np.zeros( shape, dtype=np.float32) for ndx, inf_data in enumerate(self._infiltrations_chronology): inf_start, inf_values = inf_data assert len(inf_data[1]) == len(self._infiltrations_chronology[0][1]), "Not all infiltration zones values arays have the same length." inf_timings[ndx, 0] = inf_start inf_timings[ndx, 1:len(inf_values)+1] = inf_values # Spread each infiltration's the water over its zone's meshes. logging.info(f"Spread infiltrations over the mesh") # Precompute the number/surface of cells in each zone. # For large mesh, large number of infiltration zones and # numerous time steps, this could be **very** slow to # compute this for each row of inf_timings. # FIXME : To be optimized if number of zones evolves with time. b=[] for zone_ndx in range(1, inf_timings[0].shape[0]): a = self.infiltration_zones == zone_ndx b.append(np.sum(a) * self.param_dx * self.param_dy) for row in inf_timings: for zone_ndx, cur_b in zip(range(1, row.shape[0]), b): original_inf = row[zone_ndx] if cur_b > 0: row[zone_ndx] /= cur_b # Infiltration is in m³/s for a complete zone # It is divided by the infitlrated surface => m²/s / m² becomes m/s. logging.debug(f"Infiltration t_start={row[0]:.1f} s, zone {zone_ndx} has {cur_b} cells with {original_inf} m³/s => infiltration per cell = {row[zone_ndx]:.4g} m/s") self._infiltrations_timings_as_gpu_array_cache = inf_timings logging.info(f"Spread infiltrations over the mesh - Done !") return self._infiltrations_timings_as_gpu_array_cache
[docs] def _check_infiltrations(self, infiltration_zones: np.ndarray, infiltration_timings, interp_infiltration: InfiltrationInterpolation) -> list[str]: """ Check if the infiltration data is consistent. """ from scipy.ndimage import labeled_comprehension errors = [] nb_infiltration_lines = infiltration_timings.shape[0] if not np.all(np.diff(infiltration_timings[:,0]) > 0): errors.append(f"The infiltration timings are not strictly increasing: {infiltration_timings[:,0]}") if interp_infiltration == InfiltrationInterpolation.LINEAR: if infiltration_timings[0,0] != 0: errors.append(f"The first infiltration time must be zero if you use interpolation") if nb_infiltration_lines < 2: errors.append(f"You need at least two data points in infiltrations if you use interpolation") if errors: return errors zones_numbers = np.unique(infiltration_zones[infiltration_zones > 0]).tolist() logging.debug(f"Infiltration zones numbers are : {zones_numbers}") # Check if each zone actually has an infiltration. # This test has been disabled because sometimes, users like to disable # an infiltration by setting its infiltrated quantity to zero. # used_zones = set() # unused_zones = set() # for row in infiltration_timings: # # On each line, we look at every infiltrated/exfiltrated quantities # for zone_ndx in range(1, row.shape[0]): # if abs(row[zone_ndx]) > 0.0: # used_zones.add(zone_ndx) # else: # unused_zones.add(zone_ndx) num_count = labeled_comprehension(infiltration_zones, infiltration_zones, np.arange(1, infiltration_zones.max()+1), np.count_nonzero, int, 0) for zone_ndx in range(1, infiltration_timings.shape[1]): if num_count[zone_ndx-1] == 0: errors.append(f"The zone {zone_ndx+1} doesn't appear in the infiltration map.") # for zone_ndx in range(1,infiltration_timings.shape[1]): # a = infiltration_zones == zone_ndx # b = np.sum(a) # if b == 0: # logging.warning(f"The zone {zone_ndx} doesn't appear in the infiltration map.") return []
[docs] def plot_infiltration(self, figax = None, toshow:bool=True): """ Plot the infiltration chronology. :param figax: a tuple (fig, ax) of matplotlib objects. If None, a new figure is created. :param toshow: if True, the figure is shown. :return: the figure and the axis. """ import matplotlib.pyplot as plt if figax is None: fig, ax = plt.subplots() inf = self.infiltrations_chronology times = [x[0] for x in inf] values = [x[1] for x in inf] if self.param_infiltration_lerp == InfiltrationInterpolation.LINEAR: fig.suptitle(_("Infiltration chronology (linear interpolation)")) for i in range(len(values[0])): ax.plot(times, [x[i] for x in values], label=f"Zone {i+1}") elif self.param_infiltration_lerp == InfiltrationInterpolation.NONE: fig.suptitle(_("Infiltration chronology (constant values)")) for i in range(len(values[0])): ax.step(times, [x[i] for x in values], label=f"Zone {i+1}", where="post") ax.set_xlabel(_("Time (s)")) ax.set_ylabel(_("Flow rate ($m^3s^{-1}$)")) ax.legend() if toshow: fig.show() return fig, ax
[docs] def check_errors(self) -> Union[None, list[str]]: """ Check if the model currently set in this object is valid. A number of tests are made, they are not exhaustive. Returns None if no problem was found. Returns a list of error messages else. """ res = [] if self.infiltration_zones is not None: if self.infiltration_zones.dtype != np.int32: res.append(f"Infiltration zones must be a np.int32 array") if self.infiltration_zones.shape != self.shape: res.append(f"Infiltration zones shape must be (param. nx, param. ny)") if not np.all(self.infiltration_zones >= 0): l = np.unique(self.infiltration_zones).tolist() res.append(f"In the infiltration zones array, the zones are numbered starting at 1. Zero means \"mesh has no infiltration\". But I see these values: {','.join(map(str,l))}.") if self.nap is not None: if self.nap.dtype != np.uint8: res.append(f"NAP matrix must be a uint8 array") if self.nap.shape != self.shape: res.append(f"NAP shape must be (param. nx, param. ny)") else: res.append("The NAP array has not been set") for name, array in {"h":self._h, "qx":self._qx, "qy":self._qy, "bathymetry":self._bathymetry, "manning":self._manning}.items(): array: np.ndarray if array is not None: if array.dtype != np.float32: res.append(f"The {name} array's type must be np.float32") if array.shape != self.shape: res.append(f"{name} shape must be (param. nx, param. ny)") else: res.append(f"{name} array must be specified") if self._h is not None: if np.count_nonzero(self._h < 0) > 0: res.append("I have found negative heights of water. I can't handle that.") if self._h is not None and self._qx is not None: if np.count_nonzero( (self._h == 0) & (self._qx > 0)): res.append("There are meshes where you have qx > 0 but h == 0 !") if self._h is not None and self._qy is not None: if np.count_nonzero((self._h == 0) & (self._qy > 0)): res.append("There are meshes where you have qy > 0 but h == 0 !") if self._bathymetry is not None: if self.nap is None: if np.count_nonzero(self._bathymetry < 0) > 0: res.append("I have found negative heights in the bathymetry. I can't handle that.") else: if np.count_nonzero(self._bathymetry[self.nap > 0] < 0) > 0: res.append("I have found negative heights in the bathymetry. I can't handle that.") if not (self.param_courant > 0): res.append("Courant number is wrong") if not (self.param_dx > 0): res.append(f"dx ({self.param_dx}) is wrong, it should be > 0") if not (self.param_dy > 0): res.append(f"dy ({self.param_dy}) is wrong, it should be > 0") if not (self._param_nx > 4 ): res.append(f"nx is wrong ({self._param_nx}), it should be an integer > 4 (we have 2 cells wide borders)") if not (self._param_ny > 4 ): res.append(f"ny is wrong ({self._param_ny}), it should be an integer > 4 (we have 2 cells wide borders)") if self.infiltration_zones is not None and self._infiltrations_chronology == []: res.append("If you give infiltrations zones then you must give infiltration chronolgy too") if self.infiltration_zones is None and self._infiltrations_chronology != []: res.append("If you give an infiltration chronolgy then you must give infiltrations zones too") if self.infiltration_zones is not None and self._infiltrations_chronology != []: for inf in self._infiltrations_chronology: if len(inf) < 2: res.append("In the infiltration chronology array, each line must have at least 2 elements: one for the time and at least one for the infiltration quantities") break if inf[0] < 0: res.append("In infiltration chronolgy array, the time component (first element of a line) must be >= 0") if len(inf) != len(self._infiltrations_chronology[0]): res.append("In infiltration chronolgy array, all lines must have the same number of elements") res.extend( self._check_infiltrations( self.infiltration_zones, self._infiltrations_timings_as_gpu_array(), self._param_infiltration_lerp)) if len(res) == 0: return None else: return res
[docs] def _fail_if_invalid(self): ce = self.check_errors() if ce is not None: raise Exception( "\n" + ";\n".join(ce))
@classmethod
[docs] def load(cls, simulation_dir: Path): """ Load a simulation. :param store_dir: where the simulation model is stored. :type store_dir: Path """ # NOTE I thought about using a marshalling framework such as marshmallow # but they tend to over-engineer the code and this may be a bit of # a burden to maintain for people not accustomed to such patterns of # development. So I K.I.S.S. assert isinstance(simulation_dir, Path), "I need a Path. Voilà." with open(simulation_dir / "parameters.json","r") as pfile: data = json.loads(pfile.read()) assert data["spec_version"] == "0.1" model = SimpleSimulation(data["parameters"]["nx"], data["parameters"]["ny"]) model.path = simulation_dir model.comment = data["comment"] model.creation_date = datetime.strptime(data["creation_date"], "%d/%m/%Y %H:%M:%S") model.param_dx = data["parameters"]["dx"] model.param_dy = data["parameters"]["dy"] model.param_nx = data["parameters"]["nx"] model.param_ny = data["parameters"]["ny"] model.param_courant = data["parameters"]["courant"] model.param_froude_max = data["parameters"]["froude_max"] model._param_timestep_strategy = TimeStepStrategy[data["parameters"]["timestep_strategy"]] model._param_timestep = data["parameters"]["timestep"] model._param_infiltration_lerp = InfiltrationInterpolation[ data["parameters"]["infiltration_interp"]] model._param_duration.from_dict(data["parameters"]["duration"]) model._param_report_period.from_dict(data["parameters"]["report_period"]) model.param_runge_kutta = data["parameters"]["runge_kutta"] model.param_base_coord_ll_x = data["parameters"]["base_coord_x"] model.param_base_coord_ll_y = data["parameters"]["base_coord_y"] model._h = np.load(simulation_dir / data["maps"]["h"]) model._qx = np.load(simulation_dir / data["maps"]["qx"]) model._qy = np.load(simulation_dir / data["maps"]["qy"]) model._bathymetry = np.load(simulation_dir / data["maps"]["bathymetry"]) model._manning = np.load(simulation_dir / data["maps"]["manning"]) if "NAP" in data["maps"]: model.nap = np.load(simulation_dir / data["maps"]["NAP"]) if "infiltration_zones" in data["maps"]: model.infiltration_zones = np.load(simulation_dir / data["maps"]["infiltration_zones"]) # Clear the list because one can call load several times. model._boundary_conditions = [] for bc in data["boundary_conditions"]: model._boundary_conditions.append(boundary_condition_2D.from_dict(bc)) model._infiltrations_chronology = [] for infiltrations in data["infiltrations"]: start_time, inf_values = infiltrations["start_time"], infiltrations["q"] model._infiltrations_chronology.append( [start_time, inf_values] ) model._fail_if_invalid() return model
[docs] def copy(self) -> SimpleSimulation: """ Create a new simulation from an existing one. """ return deepcopy(self)
[docs] def save(self, store_dir: Path = None): """ Save the model to a directory. If store_dir is None, the path used to load the model is used. If the model was not loaded from a directory, an exception is raised. """ self._fail_if_invalid() if store_dir is None and self.path is None: raise Exception("The simulation has no directory. Please provide one.") path = store_dir or self.path assert path is not None if not path.exists(): path.mkdir(parents=True) else: assert path.is_dir() for name, array in {"h":self._h, "qx":self._qx, "qy":self._qy, "bathymetry":self._bathymetry, "manning":self._manning, "infiltration_zones":self.infiltration_zones, "NAP": self.nap}.items(): array: np.ndarray if array is not None: np.save(path / name, array) self.save_json(path)
[docs] def save_json(self, store_dir: Path = None): """ Save the parameters to a json file. """ if store_dir is None and self.path is None: raise Exception("The simulation has no directory. Please provide one.") path = store_dir or self.path if self.creation_date is None: self.creation_date = datetime.now() maps = {} for name, array in {"h":self._h, "qx":self._qx, "qy":self._qy, "bathymetry":self._bathymetry, "manning":self._manning, "infiltration_zones":self.infiltration_zones, "NAP": self.nap}.items(): array: np.ndarray if array is not None: maps[name] = name + ".npy" with open(path / "parameters.json","w") as pfile: params = { "courant": self.param_courant, "froude_max": self.param_froude_max, "nx" : self._param_nx, "ny" : self._param_ny, "dx" : self.param_dx, "dy" : self.param_dy, "timestep_strategy" : self._param_timestep_strategy.name, "timestep": self._param_timestep, "infiltration_interp" : self._param_infiltration_lerp.name, "duration": self._param_duration.to_dict(), "report_period": self._param_report_period.to_dict(), "runge_kutta": self.param_runge_kutta, "base_coord_x": self.param_base_coord_ll_x, "base_coord_y": self.param_base_coord_ll_y } bcs = [bc.to_dict() for bc in self._boundary_conditions] infs = [ { "start_time": start, "q": values} for start, values in self._infiltrations_chronology ] data = { "spec_version" : "0.1", "comment" : self.comment, "creation_date" : self.creation_date.strftime("%d/%m/%Y %H:%M:%S"), "parameters" : params, "boundary_conditions": bcs, "infiltrations" : infs, "maps" : maps } pfile.write(json.dumps(data, indent=2))
def __str__(self) -> str: """ Return a string representation of the simulation. """ manning = self._manning active = self.nap wet = self._h m = manning[active > 0] w,h = self._param_nx, self._param_ny domain = w*h total_wet = np.count_nonzero(wet[:,:][active > 0] > 0) total_active = np.sum(active > 0) txt = [] txt.append(f"Domain : {w}x{h} meshes, (dx={self.param_dx:g} m.,dy={self.param_dx:g} m.), {100*total_active/domain:.1f}% active, {100*total_wet/domain:.1f}% wet") if self.param_timestep_strategy == TimeStepStrategy.FIXED_TIME_STEP: txt.append(f"Timestep : Fixed, {self._param_timestep:g} s.") else: txt.append(f"Timestep : Optimized") if self.param_report_period is not None: rep = f"1 report every {self.param_report_period}" # if self.param_report_period.type == SimulationDurationType.SECONDS: # rep += f" (time sampled every {TIME_SAMPLING_RESOLUTION} steps)" else: rep = "no reports recorded" txt.append(f"Duration : {self.param_duration}, {rep}") if self.param_runge_kutta == 1.0: txt.append("Ponderation : Euler") else: txt.append( f"Ponderation : Runge-Kutta {self.param_runge_kutta:g}/{1-self.param_runge_kutta:g}" ) if self.param_infiltration_lerp == InfiltrationInterpolation.NONE: interp = "None" elif self.param_infiltration_lerp == InfiltrationInterpolation.LINEAR: interp = "Linear" txt.append(f"Infiltration interpolation : {interp}") txt.append(f"Courant number : {self.param_courant:g}") # FIXME Non dimensional, correct ? txt.append(f"Froude limit : {self.param_froude_max:g}") #print(f"Dry up handling: {['Enabled','Disabled'][sim._no_dry is True]}") txt.append(f"Mean of Manning coefficients: {np.mean(m):.3f} [s.m^{{-1/3}}]") return "\n".join(txt)
[docs] def to_wolfparam(self) -> Wolf_Param: """ Return a Wolf_Param object that represents the parameters of the simulation. """ _wp = Wolf_Param(parent = None, title = "Simple simulation GPU", to_read=False, withbuttons=True, toShow=False, init_GUI=False, force_even_if_same_default = True) _wp[(_('General'), _('X Lower left corner [m]'))] = self.param_base_coord_ll_x _wp[(_('General'), _('Y Lower left corner [m]'))] = self.param_base_coord_ll_y _wp[(_('Spatial resolution'), 'dx [m]')] = self.param_dx _wp[(_('Spatial resolution'), 'dy [m]')] = self.param_dy _wp[(_('Shape'), _('Number of meshes along X - nx'))] = self.param_nx _wp[(_('Shape'), _('Number of meshes along Y - ny'))] = self.param_ny _wp[(_('Scheme'), _('Courant number [-]'))] = self.param_courant _wp[(_('Scheme'), _('Froude limit [-]'))] = self.param_froude_max _wp.add_param(_('Scheme'), _('Time step strategy'), self.param_timestep_strategy.value, Type_Param.Integer, jsonstr={"Values":{"Fixed time step":1,"Optimized time step":2}}) _wp[(_('Scheme'), _('Time step [s]'))] = self.param_timestep _wp[(_('Scheme'), _('Runge-Kutta ponderation [-]'))] = self.param_runge_kutta _wp[(_('Duration'), _('Total duration [s]'))] = self.param_duration.duration _wp[(_('Duration'), _('Report period [s]'))] = self.param_report_period.duration _wp.add_param(_('Infiltration'), _('Interpolation mode'), self.param_infiltration_lerp.value, Type_Param.Integer, jsonstr={"Values":{"None":0, "Linear":1}}) return _wp
[docs] def from_wolfparam(self, wp: Wolf_Param): """ Callback called when the Wolf_Param object is modified and apply routine is called. """ assert isinstance(wp, Wolf_Param), "I need a Wolf_Param object." self.param_base_coord_ll_x = wp[(_('General'), _('X Lower left corner [m]'))] self.param_base_coord_ll_y = wp[(_('General'), _('Y Lower left corner [m]'))] self.param_dx = wp[(_('Spatial resolution'), 'dx [m]')] self.param_dy = wp[(_('Spatial resolution'), 'dy [m]')] self.param_nx = wp[(_('Shape'), _('Number of meshes along X - nx'))] self.param_ny = wp[(_('Shape'), _('Number of meshes along Y - ny'))] self.param_courant = wp[(_('Scheme'), _('Courant number [-]'))] self.param_froude_max = wp[(_('Scheme'), _('Froude limit [-]'))] strategy = wp[(_('Scheme'), _('Time step strategy'))] self.param_timestep_strategy = TimeStepStrategy.FIXED_TIME_STEP if strategy == 1 else TimeStepStrategy.OPTIMIZED_TIME_STEP self.param_timestep = wp[(_('Scheme'), _('Time step [s]'))] self.param_runge_kutta = wp[(_('Scheme'), _('Runge-Kutta ponderation [-]'))] self.param_duration.duration = wp[(_('Duration'), _('Total duration [s]'))] self.param_report_period.duration = wp[(_('Duration'), _('Report period [s]'))] lerp = wp[(_('Infiltration'), _('Interpolation mode'))] self.param_infiltration_lerp = InfiltrationInterpolation.NONE if lerp == 0 else InfiltrationInterpolation.LINEAR
[docs] def write_initial_condition_from_record(self, recpath:Path = None, id_rec:int = None, destpath:Path = None) -> int: """ Define the initial condition of the simulation from a previous record. :param recpath: the path to the records. if None, the default path is used and 'simul_gpu_results' as result directory. :param id_rec: the index of the record you want to start from. :param destpath: the path where to save the initial condition. If None, the current path is used. :return: 0 if everything went well, -1 if the record doesn't exist, -2 if the record path doesn't exist. """ if recpath is None: recpath = self.path / 'simul_gpu_results' assert isinstance(recpath, Path), "I need a Path. Voilà." # import here to avoid circular import from .results_store import ResultsStore from shutil import copyfile if not recpath.exists(): logging.error(f"Record path {recpath} doesn't exist.") return -2 # create a result store _store = ResultsStore(recpath, mode="r") # check if the record exists if id_rec is None: id_rec = _store.nb_results assert id_rec > 0, "The index of the record must be > 0" if id_rec > _store.nb_results: logging.error(f"Record {id_rec} doesn't exist.") return -1 # get the result t,dt,n_iter_dry_up_euler, n_iter_dry_up_rk, h,qx,qy = _store.get_result(id_rec) # check the shape assert h.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation." assert qx.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation." assert qy.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation." if destpath is None: destpath = self.path destpath.mkdir(parents=True, exist_ok=True) # save the initial condition if self.path == destpath: copyfile(self.path / "h.npy" , destpath / "_h_backup.npy") copyfile(self.path / "qx.npy", destpath / "_qx_backup.npy") copyfile(self.path / "qy.npy", destpath / "_qy_backup.npy") # write the initial condition np.save(destpath / 'h.npy', h.T) np.save(destpath / 'qx.npy', qx.T) np.save(destpath / 'qy.npy', qy.T) return 0
[docs] def copy_ic_from_simulation(self, source:SimpleSimulation): """ Copy the initial condition from a simulation to the current simulation. """ assert source is not None, "Source simulation is None." assert source.param_nx == self.param_nx, "Source simulation has a different nx than the current simulation." assert source.param_ny == self.param_ny, "Source simulation has a different ny than the current simulation." self.h = source.h.copy() self.qx = source.qx.copy() self.qy = source.qy.copy()
[docs] def copy_ic_from_record(self, source:Path, id_rec:int = None): """ Copy the initial condition from a record to the current simulation. """ assert source.exists(), f"Source path {source} doesn't exist." # import here to avoid circular import from .results_store import ResultsStore # create a result store _store = ResultsStore(source, mode="r") # check if the record exists if id_rec is None: id_rec = _store.nb_results assert id_rec > 0, "The index of the record must be > 0" if id_rec > _store.nb_results: logging.error(f"Record {id_rec} doesn't exist.") return -1 # get the result t,dt,n_iter_dry_up_euler, n_iter_dry_up_rk, h,qx,qy = _store.get_result(id_rec) # check the shape assert h.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation." assert qx.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation." assert qy.T.shape == self.shape, f"Record {id_rec} has a different shape than the simulation." self.h = h.T self.qx = qx.T self.qy = qy.T return 0