Source code for wolfgpu.simplesimu.infiltrations

from enum import Enum

import numpy as np
from typing import List, Tuple


[docs] class InfiltrationInterpolation(Enum): """ Describes the way the infiltration are computed across an infiltration time interval.""" # These are the values used by Wolf
[docs] NONE = 0 # Constant value over time interval
""" The infiltration is constant over the infiltration time interval. """
[docs] LINEAR = 1 # Linear interpolation over the interval
""" The infiltration is interpolated over the infiltration time interval. The initial value is the one associated with interval, the final value is the one associated with the next interval. """
[docs] class InfiltrationChronology: """ This is optimized for access where inserts and reads are not frequently mixed togehter. """ def __init__(self):
[docs] self._chronology: dict[float, list[float]] = dict()
[docs] self._nb_zones = None
[docs] self._timeline = None
# self._dirty indicates that he times in the chronology do not # correspond to the _timeline array anymore. Therefore, _timeline must # be rebuilt asap.
[docs] self._dirty = False
[docs] def _sorted_timeline(self): if self._dirty or self._timeline is None: self._dirty = False self._timeline = np.array(sorted(self._chronology.keys())) return self._timeline
def __str__(self): s = "" for ndx, chrono_item in enumerate(self._chronology.items()): time, infiltrations_for_zone = chrono_item s += f"{ndx} At {time:.3f} s. {','.join([f'{i:.2f}' for i in infiltrations_for_zone])}\n" return s @classmethod
[docs] def from_array(klass, npa): ic = InfiltrationChronology() for row in npa: ic.set(row[0], row[1:].tolist()) return ic
@classmethod
[docs] def from_list(klass, chronology:list[float,list[float]]): ic = InfiltrationChronology() for row in chronology: ic.set(row[0], row[1]) return ic
@property
[docs] def nb_zones(self) -> int: return self._nb_zones
@property
[docs] def nb_entries(self) -> int: return len(self._chronology)
def __iter__(self): self._times_iterator = iter(self._sorted_timeline()) return self def __next__(self): k = next(self._times_iterator) return k, self._chronology[k] def __len__(self): return len(self._chronology)
[docs] def to_array(self): array = [] for t in sorted(self._chronology.keys()): row = [t] + self._chronology[t] array.append(row) return np.array(array, dtype=np.float32)
[docs] def get_row_at_time(self, time: float) -> Tuple[float, List[float]]: """ Get infiltration chronology row active at time `time`. :param time: a positive number of seconds. """ assert time >= 0, "Time must be a positive number of seconds" chronology_entry_time = self._get_closest_row_time_by_time(time) if chronology_entry_time is not None: return (chronology_entry_time, self._chronology[chronology_entry_time]) else: return None
[docs] def get_active_entry_start_time(self, time: float) -> float: return self._get_closest_row_time_by_time(time)
[docs] def set(self, time: float, quantities:list[float]): """ Sets or replaces an entry in the chronology. :param time: the moment at which the water must be injected. It is expressed in seconds. If there's already something at that moment, it will be discarded. :param quantities: A list giving the quantities of water to inject at the given `time`. Each quantity in the list corresponds to a infiltration zone of the infiltration map. """ assert type(quantities) is list, f"I want a list for the quantities, not a {type(quantities)}" if self._nb_zones is None: assert len(quantities) >= 1, "There's no point in setting an empty list of quantities" self._nb_zones = len(quantities) elif len(quantities) != self._nb_zones: raise ValueError(f"The number of infiltration zones is different than previous ones. I was expecting {self._nb_zones} but you gave {len(quantities)}") self._dirty = time not in self._chronology self._chronology[time] = quantities
[docs] def _get_closest_row_time_by_time(self, time: float): """ Can return None if no row is active. This happens when `time` is before all the chronology entries. """ ndx = np.searchsorted(self._sorted_timeline(), time, side="right") if ndx >= 1: return self._sorted_timeline()[ndx - 1] else: return None