Source code for wolfgpu.sampled_timer

"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

from typing import Callable
from collections import deque
from datetime import timedelta
from enum import Enum

import numpy as np
from .glsimulation import SimulationDurationType, SimulationDuration
from .utils import EveryNSeconds

[docs] class TimeSamplingStrategy(Enum): """When a simulation is run, interrogating the GPU is costly. Therefore we interrogate it not at each step but on a regular basis. This enumeration describes the various strategies of interrogation. When reporting, the sync time is a multiple of """
[docs] ONE_SHOT = 1
""" Interrogate at fixed intervals """
[docs] DYNAMIC = 2
""" Interrogate at a changing intervals """
[docs] WALL_CLOCK_DYNAMIC = 3
[docs] PERIODIC = 4
[docs] class TimeSampler: """ Example: - read GPU every 100 iterations - report every 111 iterations - update bathymetry every 3 seconds +/-. To solve this we make three timelines: - read at 100, 200, 300,... - read at 111, 222, 333,... - read at 3s, 6.25, 8,75,... For the first two time lines, knowing the current iteration allows to know when the next stop is. For the third one, only a planner knows. """ def __init__(self, strategy: TimeSamplingStrategy, name:str=None): assert isinstance(strategy, TimeSamplingStrategy) self._strategy = strategy self._period = None self._query_function = None self.name = name @classmethod
[docs] def make_one_shot_timer(klass, period: SimulationDuration, name=None) -> "TimeSampler": """ A one-shot timer which have a fixed period. Fixed means you can't change it over time. :param period: If `int` then the period is expressed as a number of iterations. If `timedelta` then the period is expressed as a number of seconds. """ # Note: The manager can update periods, but not the user. ts = TimeSampler(TimeSamplingStrategy.ONE_SHOT, name=name) if isinstance(period, SimulationDuration): ts._period = period elif isinstance(period, timedelta): ts._period = SimulationDuration.from_seconds(period.total_seconds()) else: raise ValueError(f"Unsupported type for duration, you gave: {type(period)}. I need `timedelta` or `SimulationDuration`") return ts
@classmethod
[docs] def make_dynamic_timer(klass, query_function: Callable, duration_type: SimulationDurationType, name=None) -> "TimeSampler": """ A timer which has a varying period. To know the time to next timer trigger time, the caller must provide a query function that will tell us how long to wait before the next trigger in function of the current time/iteration. :param duration_type: The type of duration used by this timer. This must be constant over time (so you can't go from steps to time or vice versa). """ ts = TimeSampler(TimeSamplingStrategy.DYNAMIC, name=name) ts._query_function = query_function ts._duration_type = duration_type return ts
@classmethod
[docs] def make_periodic_wall_clock_timer(klass, period, triggers_on_zero=False, name: str = None): """ A timer which triggers every `period` seconds (seconds are wall clock seconds, not simulation seconds). :param period: Either a `timedelta` or a `SimulationDuration`. """ ts = TimeSampler(TimeSamplingStrategy.WALL_CLOCK_DYNAMIC, name=name) if isinstance(period, SimulationDuration): ts._period = period elif isinstance(period, timedelta): ts._period = SimulationDuration.from_seconds(period.total_seconds()) else: raise ValueError(f"Unsupported type for duration, you gave: {type(period)}. I need `timedelta` or `SimulationDuration`") ts._timer = EveryNSeconds(ts._period.duration, trigger_start = triggers_on_zero, name=name) return ts
@classmethod
[docs] def make_periodic_timer(klass, period: SimulationDuration, name=None) -> "TimeSampler": """ A recurring timer which have a fixed period. :param period: If `int` then the period is expressed as a number of iterations. If `timedelta` then the period is expressed as a number of seconds. """ ts = TimeSampler(TimeSamplingStrategy.PERIODIC, name=name) if isinstance(period, SimulationDuration): ts._period = period elif isinstance(period, timedelta): ts._period = SimulationDuration.from_seconds(period.total_seconds()) else: raise ValueError(f"Unsupported type for duration, you gave: {type(period)}. I need `timedelta` or `SimulationDuration`") return ts
@property
[docs] def strategy(self) -> TimeSamplingStrategy: return self._strategy
[docs] def get_next_duration(self, current_iteration: int, current_time: timedelta) -> SimulationDuration: """ :return: If `int` then it is a number of iterations. If `timedelta` then it is a duration. """ match self._strategy: case TimeSamplingStrategy.ONE_SHOT | TimeSamplingStrategy.PERIODIC: r = self._period case TimeSamplingStrategy.DYNAMIC: r = self._query_function(current_iteration, current_time) if isinstance(r, timedelta): r = SimulationDuration.from_seconds(r.total_seconds()) assert r.type == self._duration_type, f"The type of period for this timer is {self._duration_type}, you gave {r.type}" case _: raise Exception(f"Unsupported time strategy {self._strategy}") assert isinstance(r, SimulationDuration) return r
def __str__(self): return f"{self._strategy} {self.name} {self._period}"
[docs] class TimerManager: def __init__(self, sampling_period: int): """ :param sampling_period: The frequency at which time is sampled (expressed in simulation iterations). This drives the accuracy of all timers based on duration expressed in time rather than in iterations. """ assert isinstance(sampling_period, int) assert sampling_period > 0 self._sampling_period = sampling_period self.reset()
[docs] def reset(self): """ Reset the manager but keeps the initial sampling period. """ self._strategies: list[TimeSampler] = list() self._strategies_duration_type: dict[TimeSampler, SimulationDurationType] = dict() self._strategies_next_stop_iteration: list[int] = list() self._strategies_next_stop_iteration_planned: list[int] = list() self._simulation_time_steps = None # The time (in seconds, not in iterations) when the next stop should occur self._strategies_next_stop_time: list[float] = list() # How many records to hold (for time/duration relationship) self._nb_records = 25 self._recorded_times = deque(maxlen=self._nb_records) self._recorded_iterations = deque(maxlen=self._nb_records) self._recorded_times.append(0) self._recorded_iterations.append(0) # When will the next timer trigger. self.next_timer_trigger_iteration = None """ The iteration when the next trigger will occur. This is obvioulsy in the future. """ ticker_strategy = TimeSampler.make_periodic_timer(SimulationDuration.from_steps(self._sampling_period), name="BaseTicker") self.add_strategy(ticker_strategy)
def __str__(self): return f"Next iter stop {self._strategies_next_stop_iteration}; Next time stop {self._strategies_next_stop_time}; Strategies: {[str(s) for s in self._strategies]}"
[docs] def ticker(self) -> TimeSampler: return self._strategies[0]
[docs] def _remove_strategy(self, strategy: TimeSampler): ndx = self._strategies.index(strategy) self._strategies.remove(strategy) del self._strategies_next_stop_iteration[ndx] del self._strategies_next_stop_time[ndx] del self._strategies_next_stop_iteration_planned[ndx]
[docs] def add_strategy(self, time_sampler: TimeSampler): assert isinstance(time_sampler, TimeSampler) assert time_sampler not in self._strategies, "You already added this one :-)" self._strategies.append(time_sampler) self._strategies_next_stop_iteration.append(None) self._strategies_next_stop_time.append(None) self._strategies_next_stop_iteration_planned.append(None) if time_sampler.strategy == TimeSamplingStrategy.DYNAMIC: # For the dynamic strategy I can't do anything else than calling # get_next_duration to guess they type of the period returned. self._strategies_duration_type[time_sampler] = time_sampler._duration_type self._plan_next_stop_for_timer(len(self._strategies) - 1, 0,0) self._compute_next_timer_trigger()
[docs] def set_simulation_time_steps(self, steps): """ Set the simulation times steps database that can be used to estimate the next iteration upon which a time sampler must be triggered. :param steps: A collection of simulation step durations (in seconds). """ self._simulation_time_steps = np.array(steps)
[docs] def _record(self, current_iteration, current_time): if current_iteration > 0: self._recorded_iterations.append(current_iteration) self._recorded_times.append(current_time)
[docs] def _estimate_iterations_for_duration(self, seconds): """ Try to convert a time into a number of iterations using past knowledge. We use two databases to do that. The first one is built in this manager based on the iteration/times provided to it when querying/updating its state. It's not very accurate. The second one is a list of time steps which is provided by the caller whenever it has up to date data. It's meant to be more accurate. """ assert self._can_estimate_time(), "This only makes sesnse when there are recordings." assert seconds >= 0 if self._simulation_time_steps is not None: # Use the time step durations database provided by the user. N = len(self._simulation_time_steps) if N <= 10: iteration_time = np.mean(self._simulation_time_steps) else: p = int(N*0.7) iteration_time = (np.mean(self._simulation_time_steps) + np.mean(self._simulation_time_steps[p:]))/2 return int(round(seconds/iteration_time)) else: # Use our own step durations database provided by the user. N = len(self._recorded_times) times = np.array(self._recorded_times) iterations = np.array(self._recorded_iterations) times = times[1:]- times[0:-1] iterations = iterations[1:]- iterations[0:-1] ratios = iterations / times if N < 10: mean = np.mean(ratios) else: p = int(N*0.7) mean = (np.mean(ratios) + np.mean(ratios[p:]))/2 # w = np.arange(len(ratios)) # w = w / np.sum(w) # mean = np.sum(ratios * w) nb_iter = int(round(mean * seconds)) return nb_iter
[docs] def update_one_shot_timer(self, timer:TimeSampler, new_period: SimulationDuration, current_iteration, current_time): assert timer in self._strategies, "I don't know that timer" assert timer.strategy == TimeSamplingStrategy.ONE_SHOT, "This only works with one shot timer" assert not new_period.is_zero() timer._period = new_period self._plan_next_stop_for_timer(self._strategies.index(timer), current_iteration, current_time) self.update(current_iteration, current_time)
[docs] def _plan_next_stop_for_timer(self, ndx:int, current_iteration: int, current_time): time_sampler = self._strategies[ndx] if time_sampler.strategy != TimeSamplingStrategy.WALL_CLOCK_DYNAMIC: # FIXED, DYNAMIC, PERIODIC time strategy next_duration = time_sampler.get_next_duration(current_iteration, current_time) match next_duration.type: case SimulationDurationType.STEPS: #print(f"time sampler {time_sampler.name} {time_sampler.strategy} {current_iteration} {next_duration.type} {next_duration.duration}" ) if time_sampler.strategy == TimeSamplingStrategy.PERIODIC: self._strategies_next_stop_iteration[ndx] = current_iteration + next_duration.duration case SimulationDurationType.SECONDS: # We still only have a *time* planned, not an *iteration*. In this case # we just look after the fact that we may pass that time (before being # able to properly estimate the number of iterations to do). if self._strategies_next_stop_iteration[ndx] is None \ and self._strategies_next_stop_time[ndx] is not None \ and current_time > self._strategies_next_stop_time[ndx]: # Past the time => stop now! self._strategies_next_stop_iteration[ndx] = current_iteration self._strategies_next_stop_time[ndx] = current_time + next_duration.duration self._strategies_next_stop_iteration_planned[ndx] = (current_iteration, current_time, 0, next_duration.duration) elif self._can_estimate_time(): # We can estimate time. If we can't, then there's nothing to do but # wait until we can. That's because without time estimation we can # not make new prediction regarding the stop iteration. # We build a correction ratio that will take into account # the error we did on the previous prediction. This will be # used to correct the next prediction. self._strategies_next_stop_time[ndx] = current_time + next_duration.duration iterations_to_go = self._estimate_iterations_for_duration(next_duration.duration) self._strategies_next_stop_iteration[ndx] = current_iteration + iterations_to_go self._strategies_next_stop_iteration_planned[ndx] = (current_iteration, current_time, iterations_to_go, next_duration.duration) case _: raise Exception(f"Unsupported duration type : {type(next_duration)}")
[docs] def _can_estimate_time(self): """ Do we have enough data to estimate iteration number out of a time ? """ return len(self._recorded_iterations) >= 2
[docs] def _compute_next_timer_trigger(self): """ when will the next timer trigger ? """ new_min = None #acting_sampler = None for ndx, sampler in enumerate(self._strategies): if self._strategies_next_stop_iteration[ndx] is not None: if new_min is None: new_min = self._strategies_next_stop_iteration[ndx] #acting_sampler = sampler elif self._strategies_next_stop_iteration[ndx] < new_min: new_min = self._strategies_next_stop_iteration[ndx] #acting_sampler = sampler # if new_min != self.next_timer_trigger_iteration: # print(f"_compute_next_timer_trigger: {new_min} {acting_sampler.name}") self.next_timer_trigger_iteration = new_min
[docs] def _reevaluate_time_samplers(self, current_iteration, current_time): """ Reevaluate the iteration that is planned for the trigger of each "time" (instead of "iterations") timers. We do that systematically, that is as often as possible. """ # Of course we don that only if we have the ability to compute # iterations from time. if self._can_estimate_time(): for ndx, strategy in enumerate(self._strategies): if strategy.strategy != TimeSamplingStrategy.WALL_CLOCK_DYNAMIC: if (strategy.strategy == TimeSamplingStrategy.DYNAMIC and strategy._duration_type == SimulationDurationType.SECONDS) \ or strategy._period.type == SimulationDurationType.SECONDS: if self._strategies_next_stop_time[ndx] is None: # No time was planned yet. Since we know we can estimate iteration # based on time now, we can start counting time from current time. duration = strategy.get_next_duration(current_iteration, current_time) assert isinstance(duration, SimulationDuration), "I only accept SimulationDuration" assert duration.type == SimulationDurationType.SECONDS, "Time sampler for simulation time must have a duration in seconds" self._strategies_next_stop_time[ndx] = current_time + duration.duration # The actual update if current_time < self._strategies_next_stop_time[ndx]: self._strategies_next_stop_iteration[ndx] = current_iteration + self._estimate_iterations_for_duration(self._strategies_next_stop_time[ndx] - current_time)
[docs] def update_clock_wall_timers(self): """ Update the wall clock timers. The wall clock timers being based on the real passage of time, they must be checked more often than the timers inside a simulation to be accurate. Since querying timers inside simulation is very expensive, we moved the clock wall timer update here, outside the more general timers update mechanism (in the update() method). This way, they can be updated as fast as needed without updating the more expensive simulation timers. :return: The set of wall clock timers that have triggered. """ triggered = set() for ndx, strategy in enumerate(self._strategies): if strategy.strategy == TimeSamplingStrategy.WALL_CLOCK_DYNAMIC: if strategy._timer.has_shot(): #print(f"Checked {strategy.name}, triggered!") triggered.add(strategy) return triggered
[docs] def update(self, current_iteration: int, current_time) -> set[TimeSampler]: """ Update all timers. :return: The set of timers that have triggered. """ assert isinstance(current_iteration, int) self._record(current_iteration, current_time) # This will re-evaluate the next timer trigger as well. self._reevaluate_time_samplers(current_iteration, current_time) triggered = set() # Since we have the ticker timer, this is guaranteed to run # at least once. if self.next_timer_trigger_iteration is None or current_iteration >= self.next_timer_trigger_iteration: for ndx, stop_iteration in enumerate(self._strategies_next_stop_iteration): sampler = self._strategies[ndx] # We now we have to update if we have reached the stop iteration # (that's the general case) or if the simulation time has passed # the initial period of a time-based sampler (this occurs when # we have not yet been able to estimate the stop iteration for # that timer). if (stop_iteration is not None and current_iteration >= stop_iteration) \ or (sampler.strategy == TimeSamplingStrategy.DYNAMIC and stop_iteration is None and current_time >= self._strategies_next_stop_time[ndx]): self._plan_next_stop_for_timer(ndx, current_iteration, current_time) triggered.add(sampler) # This is done here and not on a per-update basis (on each sampler) # because the minimum must be found globally. self._compute_next_timer_trigger() return triggered