"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
from typing import Callable
from collections import deque
from datetime import timedelta
from enum import Enum
import numpy as np
from .glsimulation import SimulationDurationType, SimulationDuration
from .utils import EveryNSeconds
[docs]
class TimeSamplingStrategy(Enum):
"""When a simulation is run, interrogating the GPU is costly.
Therefore we interrogate it not at each step but on a regular basis.
This enumeration describes the various strategies of interrogation.
When reporting, the sync time is a multiple of
"""
""" Interrogate at fixed intervals """
""" Interrogate at a changing intervals """
[docs]
class TimeSampler:
""" Example:
- read GPU every 100 iterations
- report every 111 iterations
- update bathymetry every 3 seconds +/-.
To solve this we make three timelines:
- read at 100, 200, 300,...
- read at 111, 222, 333,...
- read at 3s, 6.25, 8,75,...
For the first two time lines, knowing the current iteration allows to
know when the next stop is. For the third one, only a planner knows.
"""
def __init__(self, strategy: TimeSamplingStrategy, name:str=None):
assert isinstance(strategy, TimeSamplingStrategy)
self._strategy = strategy
self._period = None
self._query_function = None
self.name = name
@classmethod
[docs]
def make_one_shot_timer(klass, period: SimulationDuration, name=None) -> "TimeSampler":
""" A one-shot timer which have a fixed period. Fixed means you can't
change it over time.
:param period: If `int` then the period is expressed as a number of iterations.
If `timedelta` then the period is expressed as a number of seconds.
"""
# Note: The manager can update periods, but not the user.
ts = TimeSampler(TimeSamplingStrategy.ONE_SHOT, name=name)
if isinstance(period, SimulationDuration):
ts._period = period
elif isinstance(period, timedelta):
ts._period = SimulationDuration.from_seconds(period.total_seconds())
else:
raise ValueError(f"Unsupported type for duration, you gave: {type(period)}. I need `timedelta` or `SimulationDuration`")
return ts
@classmethod
[docs]
def make_dynamic_timer(klass, query_function: Callable, duration_type: SimulationDurationType, name=None) -> "TimeSampler":
""" A timer which has a varying period. To know the time to next timer
trigger time, the caller must provide a query function that will tell us
how long to wait before the next trigger in function of the current time/iteration.
:param duration_type: The type of duration used by this timer. This must be constant
over time (so you can't go from steps to time or vice versa).
"""
ts = TimeSampler(TimeSamplingStrategy.DYNAMIC, name=name)
ts._query_function = query_function
ts._duration_type = duration_type
return ts
@classmethod
[docs]
def make_periodic_wall_clock_timer(klass, period, triggers_on_zero=False, name: str = None):
""" A timer which triggers every `period` seconds (seconds are wall
clock seconds, not simulation seconds).
:param period: Either a `timedelta` or a `SimulationDuration`.
"""
ts = TimeSampler(TimeSamplingStrategy.WALL_CLOCK_DYNAMIC, name=name)
if isinstance(period, SimulationDuration):
ts._period = period
elif isinstance(period, timedelta):
ts._period = SimulationDuration.from_seconds(period.total_seconds())
else:
raise ValueError(f"Unsupported type for duration, you gave: {type(period)}. I need `timedelta` or `SimulationDuration`")
ts._timer = EveryNSeconds(ts._period.duration, trigger_start = triggers_on_zero, name=name)
return ts
@classmethod
[docs]
def make_periodic_timer(klass, period: SimulationDuration, name=None) -> "TimeSampler":
""" A recurring timer which have a fixed period.
:param period: If `int` then the period is expressed as a number of iterations.
If `timedelta` then the period is expressed as a number of seconds.
"""
ts = TimeSampler(TimeSamplingStrategy.PERIODIC, name=name)
if isinstance(period, SimulationDuration):
ts._period = period
elif isinstance(period, timedelta):
ts._period = SimulationDuration.from_seconds(period.total_seconds())
else:
raise ValueError(f"Unsupported type for duration, you gave: {type(period)}. I need `timedelta` or `SimulationDuration`")
return ts
@property
[docs]
def strategy(self) -> TimeSamplingStrategy:
return self._strategy
[docs]
def get_next_duration(self, current_iteration: int, current_time: timedelta) -> SimulationDuration:
"""
:return: If `int` then it is a number of iterations. If `timedelta` then it is a duration.
"""
match self._strategy:
case TimeSamplingStrategy.ONE_SHOT | TimeSamplingStrategy.PERIODIC:
r = self._period
case TimeSamplingStrategy.DYNAMIC:
r = self._query_function(current_iteration, current_time)
if isinstance(r, timedelta):
r = SimulationDuration.from_seconds(r.total_seconds())
assert r.type == self._duration_type, f"The type of period for this timer is {self._duration_type}, you gave {r.type}"
case _:
raise Exception(f"Unsupported time strategy {self._strategy}")
assert isinstance(r, SimulationDuration)
return r
def __str__(self):
return f"{self._strategy} {self.name} {self._period}"
[docs]
class TimerManager:
def __init__(self, sampling_period: int):
"""
:param sampling_period: The frequency at which time is sampled (expressed in simulation iterations).
This drives the accuracy of all timers based on duration expressed
in time rather than in iterations.
"""
assert isinstance(sampling_period, int)
assert sampling_period > 0
self._sampling_period = sampling_period
self.reset()
[docs]
def reset(self):
""" Reset the manager but keeps the initial sampling period.
"""
self._strategies: list[TimeSampler] = list()
self._strategies_duration_type: dict[TimeSampler, SimulationDurationType] = dict()
self._strategies_next_stop_iteration: list[int] = list()
self._strategies_next_stop_iteration_planned: list[int] = list()
self._simulation_time_steps = None
# The time (in seconds, not in iterations) when the next stop should occur
self._strategies_next_stop_time: list[float] = list()
# How many records to hold (for time/duration relationship)
self._nb_records = 25
self._recorded_times = deque(maxlen=self._nb_records)
self._recorded_iterations = deque(maxlen=self._nb_records)
self._recorded_times.append(0)
self._recorded_iterations.append(0)
# When will the next timer trigger.
self.next_timer_trigger_iteration = None
""" The iteration when the next trigger will occur. This is obvioulsy
in the future.
"""
ticker_strategy = TimeSampler.make_periodic_timer(SimulationDuration.from_steps(self._sampling_period), name="BaseTicker")
self.add_strategy(ticker_strategy)
def __str__(self):
return f"Next iter stop {self._strategies_next_stop_iteration}; Next time stop {self._strategies_next_stop_time}; Strategies: {[str(s) for s in self._strategies]}"
[docs]
def ticker(self) -> TimeSampler:
return self._strategies[0]
[docs]
def _remove_strategy(self, strategy: TimeSampler):
ndx = self._strategies.index(strategy)
self._strategies.remove(strategy)
del self._strategies_next_stop_iteration[ndx]
del self._strategies_next_stop_time[ndx]
del self._strategies_next_stop_iteration_planned[ndx]
[docs]
def add_strategy(self, time_sampler: TimeSampler):
assert isinstance(time_sampler, TimeSampler)
assert time_sampler not in self._strategies, "You already added this one :-)"
self._strategies.append(time_sampler)
self._strategies_next_stop_iteration.append(None)
self._strategies_next_stop_time.append(None)
self._strategies_next_stop_iteration_planned.append(None)
if time_sampler.strategy == TimeSamplingStrategy.DYNAMIC:
# For the dynamic strategy I can't do anything else than calling
# get_next_duration to guess they type of the period returned.
self._strategies_duration_type[time_sampler] = time_sampler._duration_type
self._plan_next_stop_for_timer(len(self._strategies) - 1, 0,0)
self._compute_next_timer_trigger()
[docs]
def set_simulation_time_steps(self, steps):
""" Set the simulation times steps database that can be used to estimate
the next iteration upon which a time sampler must be triggered.
:param steps: A collection of simulation step durations (in seconds).
"""
self._simulation_time_steps = np.array(steps)
[docs]
def _record(self, current_iteration, current_time):
if current_iteration > 0:
self._recorded_iterations.append(current_iteration)
self._recorded_times.append(current_time)
[docs]
def _estimate_iterations_for_duration(self, seconds):
""" Try to convert a time into a number of iterations using
past knowledge.
We use two databases to do that. The first one is built
in this manager based on the iteration/times provided to
it when querying/updating its state. It's not very accurate.
The second one is a list of time steps which is provided by
the caller whenever it has up to date data. It's meant to be
more accurate.
"""
assert self._can_estimate_time(), "This only makes sesnse when there are recordings."
assert seconds >= 0
if self._simulation_time_steps is not None:
# Use the time step durations database provided by the user.
N = len(self._simulation_time_steps)
if N <= 10:
iteration_time = np.mean(self._simulation_time_steps)
else:
p = int(N*0.7)
iteration_time = (np.mean(self._simulation_time_steps) + np.mean(self._simulation_time_steps[p:]))/2
return int(round(seconds/iteration_time))
else:
# Use our own step durations database provided by the user.
N = len(self._recorded_times)
times = np.array(self._recorded_times)
iterations = np.array(self._recorded_iterations)
times = times[1:]- times[0:-1]
iterations = iterations[1:]- iterations[0:-1]
ratios = iterations / times
if N < 10:
mean = np.mean(ratios)
else:
p = int(N*0.7)
mean = (np.mean(ratios) + np.mean(ratios[p:]))/2
# w = np.arange(len(ratios))
# w = w / np.sum(w)
# mean = np.sum(ratios * w)
nb_iter = int(round(mean * seconds))
return nb_iter
[docs]
def update_one_shot_timer(self, timer:TimeSampler, new_period: SimulationDuration, current_iteration, current_time):
assert timer in self._strategies, "I don't know that timer"
assert timer.strategy == TimeSamplingStrategy.ONE_SHOT, "This only works with one shot timer"
assert not new_period.is_zero()
timer._period = new_period
self._plan_next_stop_for_timer(self._strategies.index(timer), current_iteration, current_time)
self.update(current_iteration, current_time)
[docs]
def _plan_next_stop_for_timer(self, ndx:int, current_iteration: int, current_time):
time_sampler = self._strategies[ndx]
if time_sampler.strategy != TimeSamplingStrategy.WALL_CLOCK_DYNAMIC:
# FIXED, DYNAMIC, PERIODIC time strategy
next_duration = time_sampler.get_next_duration(current_iteration, current_time)
match next_duration.type:
case SimulationDurationType.STEPS:
#print(f"time sampler {time_sampler.name} {time_sampler.strategy} {current_iteration} {next_duration.type} {next_duration.duration}" )
if time_sampler.strategy == TimeSamplingStrategy.PERIODIC:
self._strategies_next_stop_iteration[ndx] = current_iteration + next_duration.duration
case SimulationDurationType.SECONDS:
# We still only have a *time* planned, not an *iteration*. In this case
# we just look after the fact that we may pass that time (before being
# able to properly estimate the number of iterations to do).
if self._strategies_next_stop_iteration[ndx] is None \
and self._strategies_next_stop_time[ndx] is not None \
and current_time > self._strategies_next_stop_time[ndx]:
# Past the time => stop now!
self._strategies_next_stop_iteration[ndx] = current_iteration
self._strategies_next_stop_time[ndx] = current_time + next_duration.duration
self._strategies_next_stop_iteration_planned[ndx] = (current_iteration, current_time, 0, next_duration.duration)
elif self._can_estimate_time():
# We can estimate time. If we can't, then there's nothing to do but
# wait until we can. That's because without time estimation we can
# not make new prediction regarding the stop iteration.
# We build a correction ratio that will take into account
# the error we did on the previous prediction. This will be
# used to correct the next prediction.
self._strategies_next_stop_time[ndx] = current_time + next_duration.duration
iterations_to_go = self._estimate_iterations_for_duration(next_duration.duration)
self._strategies_next_stop_iteration[ndx] = current_iteration + iterations_to_go
self._strategies_next_stop_iteration_planned[ndx] = (current_iteration, current_time, iterations_to_go, next_duration.duration)
case _:
raise Exception(f"Unsupported duration type : {type(next_duration)}")
[docs]
def _can_estimate_time(self):
""" Do we have enough data to estimate iteration number out of a time ?
"""
return len(self._recorded_iterations) >= 2
[docs]
def _compute_next_timer_trigger(self):
""" when will the next timer trigger ?
"""
new_min = None
#acting_sampler = None
for ndx, sampler in enumerate(self._strategies):
if self._strategies_next_stop_iteration[ndx] is not None:
if new_min is None:
new_min = self._strategies_next_stop_iteration[ndx]
#acting_sampler = sampler
elif self._strategies_next_stop_iteration[ndx] < new_min:
new_min = self._strategies_next_stop_iteration[ndx]
#acting_sampler = sampler
# if new_min != self.next_timer_trigger_iteration:
# print(f"_compute_next_timer_trigger: {new_min} {acting_sampler.name}")
self.next_timer_trigger_iteration = new_min
[docs]
def _reevaluate_time_samplers(self, current_iteration, current_time):
"""
Reevaluate the iteration that is planned for the trigger of each "time"
(instead of "iterations") timers. We do that systematically, that is as
often as possible.
"""
# Of course we don that only if we have the ability to compute
# iterations from time.
if self._can_estimate_time():
for ndx, strategy in enumerate(self._strategies):
if strategy.strategy != TimeSamplingStrategy.WALL_CLOCK_DYNAMIC:
if (strategy.strategy == TimeSamplingStrategy.DYNAMIC and strategy._duration_type == SimulationDurationType.SECONDS) \
or strategy._period.type == SimulationDurationType.SECONDS:
if self._strategies_next_stop_time[ndx] is None:
# No time was planned yet. Since we know we can estimate iteration
# based on time now, we can start counting time from current time.
duration = strategy.get_next_duration(current_iteration, current_time)
assert isinstance(duration, SimulationDuration), "I only accept SimulationDuration"
assert duration.type == SimulationDurationType.SECONDS, "Time sampler for simulation time must have a duration in seconds"
self._strategies_next_stop_time[ndx] = current_time + duration.duration
# The actual update
if current_time < self._strategies_next_stop_time[ndx]:
self._strategies_next_stop_iteration[ndx] = current_iteration + self._estimate_iterations_for_duration(self._strategies_next_stop_time[ndx] - current_time)
[docs]
def update_clock_wall_timers(self):
""" Update the wall clock timers.
The wall clock timers being based on the real passage of time, they must
be checked more often than the timers inside a simulation to be
accurate. Since querying timers inside simulation is very expensive, we
moved the clock wall timer update here, outside the more general timers
update mechanism (in the update() method). This way, they can be updated
as fast as needed without updating the more expensive simulation timers.
:return: The set of wall clock timers that have triggered.
"""
triggered = set()
for ndx, strategy in enumerate(self._strategies):
if strategy.strategy == TimeSamplingStrategy.WALL_CLOCK_DYNAMIC:
if strategy._timer.has_shot():
#print(f"Checked {strategy.name}, triggered!")
triggered.add(strategy)
return triggered
[docs]
def update(self, current_iteration: int, current_time) -> set[TimeSampler]:
""" Update all timers.
:return: The set of timers that have triggered.
"""
assert isinstance(current_iteration, int)
self._record(current_iteration, current_time)
# This will re-evaluate the next timer trigger as well.
self._reevaluate_time_samplers(current_iteration, current_time)
triggered = set()
# Since we have the ticker timer, this is guaranteed to run
# at least once.
if self.next_timer_trigger_iteration is None or current_iteration >= self.next_timer_trigger_iteration:
for ndx, stop_iteration in enumerate(self._strategies_next_stop_iteration):
sampler = self._strategies[ndx]
# We now we have to update if we have reached the stop iteration
# (that's the general case) or if the simulation time has passed
# the initial period of a time-based sampler (this occurs when
# we have not yet been able to estimate the stop iteration for
# that timer).
if (stop_iteration is not None and current_iteration >= stop_iteration) \
or (sampler.strategy == TimeSamplingStrategy.DYNAMIC and stop_iteration is None and current_time >= self._strategies_next_stop_time[ndx]):
self._plan_next_stop_for_timer(ndx, current_iteration, current_time)
triggered.add(sampler)
# This is done here and not on a per-update basis (on each sampler)
# because the minimum must be found globally.
self._compute_next_timer_trigger()
return triggered