"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
from abc import ABC
from wolfgpu.simple_simulation import SimulationDuration, SimulationDurationType
import numpy as np
from wolfgpu.simple_simulation import InfiltrationChronology
from wolfgpu.glsimulation import GLSimulation, GLSimulationGlobalState
from typing import Tuple, List
[docs]
class SimulationProxy:
""" A proxy to query and update a running simulation.
This class exists to hide the methods of the simulator from the caller
because they are numerous and many of them are of no interest to
the caller.
"""
# Implementation note: this proxy should just hold the data to be updated.
# It should not apply update on the GPU itself. Why ? Because if they are
# multiple injectors, then one may want to group all their updated data and
# apply them on the GPU in one go.
# Right now we d/l the whole bathymetry from the GPU which costs a *lot*.
# So we share that download across proxies.
def __init__(self, glsim: GLSimulation, global_state: GLSimulationGlobalState, zone_of_interest: Tuple[slice, slice], simulation_current_quantity: int,
original_infiltration_chronology: InfiltrationChronology):
assert isinstance(original_infiltration_chronology, InfiltrationChronology) or original_infiltration_chronology is None
self._glsim: GLSimulation = glsim
# FIXME global_state should come from glsim, no ??? Not yet because
# it comes from the simulation runner. So instead of passing a global state
# I would pass a SimulationRunner which is not any better.
# The more I progress, the more I copy stuff from the runner here.
# So, I'll need to put that stuff back in the runner and make
# the proxy a real proxy (i.e. something that hides the runner/simulator).
self._global_state = global_state
self._zone_of_interest = zone_of_interest
self._cached_bathymetry = None
self._cached_h_qx_qy = None
self._cached_infiltration_zones = None
self._bathymetry_was_set = False
self._h_qx_qy_was_set = False
self._infiltration_zones_was_set = False
self._infiltration_chronology_was_set = False
self._simulation_current_quantity = simulation_current_quantity
self._infiltration_chronology = original_infiltration_chronology
[docs]
def _ensure_array_size(self, a):
expected_shape = tuple([ self._zone_of_interest[i].stop - self._zone_of_interest[i].start for i in (0,1) ])
assert a.shape == expected_shape, f"The shape of the array you give {a.shape} is not what I expected {expected_shape}"
[docs]
def _set_zone_of_interest(self, zone_of_interest: Tuple[slice, slice]):
self._zone_of_interest = zone_of_interest
[docs]
def _updated_infiltration_chronology(self):
return self._infiltration_chronology
@property
[docs]
def current_sim_step(self) -> int:
""" The current step in the simulation (starting at zero).
"""
return self._global_state.simulation_step
@property
[docs]
def current_sim_time(self) -> float:
""" The current time in the simulation. Expressed in seconds (starting
at zero).
"""
return self._global_state.simulation_time
[docs]
def get_bathymetry(self) -> np.ndarray:
if self._cached_bathymetry is None:
self._cached_bathymetry = self._glsim.read_bathymetry()
return self._cached_bathymetry[self._zone_of_interest]
[docs]
def set_bathymetry(self, b: np.ndarray):
""" Set the bathymetry unknown over the zone of interest.
:param b: The bathymetry values to set. The array size must be equal to the size of the
zone of interest.
"""
self._cached_bathymetry[self._zone_of_interest] = b
self._bathymetry_was_set = True
[docs]
def get_infiltration_zones(self) -> np.ndarray:
if self._cached_infiltration_zones is None:
self._cached_infiltration_zones = self._glsim.read_infiltration_map() + 1
return self._cached_infiltration_zones[self._zone_of_interest]
[docs]
def set_infiltration_zones(self, b: np.ndarray):
self._ensure_array_size(b)
self._cached_infiltration_zones[self._zone_of_interest] = b
self._infiltration_zones_was_set = True
[docs]
def get_active_infiltration_quantities(self):
""" Get the current infiltration chronology's row: that is, the
currently infiltrated quantities.
:return: The current infitlration chronology's row. If none is
active (because current time is before the chronology beginning)
then `None` is returned.
"""
row_at_t = self._infiltration_chronology.get_row_at_time(self.current_sim_time)
if row_at_t:
t, values = row_at_t
return values
else:
return None
[docs]
def set_active_infiltration_quantities(self, q) -> None:
""" Set the current infiltration chronology's row: that is, set the
infiltrated quantities.
"""
t = self._infiltration_chronology.get_active_entry_start_time(self.current_sim_time)
assert t is not None, "You're trying to set the active infiltration row, but there's none active"
self._infiltration_chronology.set(t, q)
self._infiltration_chronology_was_set = True
[docs]
def insert_infiltration_quantities(self, t: float, q: List[float]) -> None:
""" Insert or replace an infiltration chronology's row.
:param t: Beginning time for the infiltration of the row.
:param q: Infiltrated quantities, as many as the number of infiltrated zones.
"""
self._infiltration_chronology.set(t, q)
self._infiltration_chronology_was_set = True
[docs]
def _h_qx_qy_cache(self):
if self._cached_h_qx_qy is None:
self._cached_h_qx_qy = self._glsim._read_full_quantity_result(self._simulation_current_quantity)
return self._cached_h_qx_qy
[docs]
def get_h(self):
return self._h_qx_qy_cache()[self._zone_of_interest[0], self._zone_of_interest[1], 0]
[docs]
def set_h(self, h : np.ndarray):
""" Set the h unknown over the zone of interest.
:param h: The h values to set. The array size must be equal to the size of the
zone of interest.
"""
self._ensure_array_size(h)
self._h_qx_qy_was_set = True
self._h_qx_qy_cache()[self._zone_of_interest[0], self._zone_of_interest[1],0] = h
[docs]
def get_qx(self):
return self._h_qx_qy_cache()[self._zone_of_interest[0], self._zone_of_interest[1],1]
[docs]
def set_qx(self, qx : np.ndarray):
""" Set the Qx unknown over the zone of interest.
:param qx: The Qx values to set. The array size must be equal to the size of the
zone of interest.
"""
self._ensure_array_size(qx)
self._h_qx_qy_was_set = True
self._h_qx_qy_cache()[self._zone_of_interest[0], self._zone_of_interest[1],1] = qx
[docs]
def get_qy(self):
return self._h_qx_qy_cache()[self._zone_of_interest[0], self._zone_of_interest[1],2]
[docs]
def set_qy(self, qy : np.ndarray):
""" Set the Qy unknown over the zone of interest.
:param qy: The Qy values to set. The array size must be equal to the size of the
zone of interest.
"""
self._ensure_array_size(qy)
self._h_qx_qy_was_set = True
self._h_qx_qy_cache()[self._zone_of_interest[0], self._zone_of_interest[1],2] = qy
# def get_infiltration_chronology(self) -> InfiltrationChronology:
# if self._infiltration_chronology:
# return self._infiltration_chronology
# else:
# raise RuntimeError("You can't query the infiltrations if none was set in the simulation")
# def set_infiltration_chronology(self, chrono: InfiltrationChronology):
# """ Set a new infiltration chronology in the simulator. WARNING! The
# infitlration chronology stricture (number of zones, number of
# chronology entries) must be left untouched. So you can't add zones
# nor entries in the chronology.
# """
# if self._infiltration_chronology is not None:
# assert chrono.nb_zones == self._glsim.nb_infiltration_zones, "You have changed the number of infiltration zones. It's not supported."
# assert chrono.nb_entries == self._glsim.nb_infiltration_lines, "You have changed the number of entries in the infiltration chronology. It's not supported, you can only modify existing entries."
# self._infiltration_chronology_was_set = True
# self._infiltration_chronology = chrono
# else:
# raise RuntimeError("You can't set the infiltrations if none was set in the simulation")
[docs]
class SimulationInjector(ABC):
""" Injectors allow small python scripts to be run at several point in time during
the simulation. The goal of these script should be to update the simulation in
real time, potentially affecting the simulation outcome.
Inkjectors are called between simulation steps (that is after the end of a
step and before the beginning of the next step).
"""
[docs]
def active_zone(self) -> tuple[slice, slice]:
""" The zone that this injector wants to update expressed as a tuple of two slices.
The first slice gives the columns span, the second gives the rows span.
This zone must be constant across the simulation. In particular, it
must be evaluable before the simulation has started.
We limit the region where updates are necessary to avoid
downloading/uploading potentially big textures to the GPU.
"""
raise NotImplementedError()
[docs]
def time_to_new_injection(self, current_step: int, current_time: float) -> SimulationDuration:
""" How long the simulator must wait before calling
`None` means the injector should not be called anymore.
This method is independent because it will be called once
at the beginning of the simulation to get the time of the first
update.
:param current_step: The current step in the simulation.
:param current_time: The current time in the simulation, expressed in seconds.
:return: A `SimulationDuration`. `None` means the injector should not be called anymore.
"""
return None
[docs]
def do_updates(self, sim_proxy:SimulationProxy) -> None:
""" This method will be called after the `time_to_first_injection` or
after the time it itself returned.
:return: Return the time the simulator must wait before calling this
method again. Return `None` if you don't want to be called anymore.
:rtype: SimulationDuration
"""
pass