"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
import logging
import os
import sys
import time
from datetime import datetime
from datetime import timedelta
from math import floor, sqrt, isnan, ceil
from pathlib import Path
from typing import Union, Tuple, List
from enum import Enum
import warnings
import os
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
import pygame
import numpy as np
from scipy.ndimage import labeled_comprehension
import tqdm
from OpenGL.GL import GL_FRAMEBUFFER, GL_NONE, GL_RGB32F, GL_RGBA32F, glGetIntegerv, GL_COLOR_ATTACHMENT0, GL_COLOR_ATTACHMENT2, \
GL_COLOR_BUFFER_BIT, GL_FRAMEBUFFER, GL_NONE, GL_RGB32F, GL_RGBA32F, GL_VIEWPORT, \
glBindFramebuffer, glClear, glGetIntegerv, glUseProgram, GL_MAJOR_VERSION, \
GL_TEXTURE_RECTANGLE, GL_RGBA32F, GL_R32F, GL_R32I
from .gl_utils import load_program, load_shader_from_file, read_texture, total_textures_size, gl_clear_all_caches, estimate_best_window_size, upload_np_array_to_gpu, update_texture
from .glsimulation import FINISH, SHADER_PATH, STRIPES_ON_NAP, GLSimulation, log_mem_used, time_it, TIME_SAMPLING_RESOLUTION, GLSimulationGlobalState, TilePackingMode
from .results_store import ResultsStore, SimulationEventType, ResultType
from .simple_simulation import SimulationDurationType, SimpleSimulation, SimulationDuration, InfiltrationChronology
from .loaders.simple_sim_loader import load_simple_sim_to_gpu
from .utils import EveryNSeconds, seconds_to_duration_str
from .sampled_timer import TimeSamplingStrategy, TimerManager, TimeSampler
from .injector import SimulationInjector, SimulationProxy
[docs]
class GlWindowManagerInterface(Enum):
@classmethod
[docs]
def for_glfw( klass, glfw_window):
""" If one wants to call the swap buffer function in glfw,
one needs to have a reference to the window.
"""
wmi = GlWindowManagerInterface.GLFW
wmi.window = glfw_window
return wmi
if MEM_TRACKING:
from pympler import tracker
[docs]
MOSTLY_DRY_MESHES_THRESHOLD = 1e-5
""" Number of steps recorded in the database of simulation times
used by the injection triggering to estimate the next trigger.
"""
[docs]
INJECTION_TRIGGER_TIME_ESTIMATOR_DEPTH = 100
[docs]
def count_infiltration_cells(a, nb_zones):
"""
:param a: infiltration zones array. Zone indices are 1 baseD.
:return: a numpy array (1D) with all the counts, one per zone.
"""
# nb_cells_per_zone = np.zeros( nb_zones)
# # The "1" thing is because we're one based...
# unique_values, counts = np.unique(a[a >= 1], return_counts = True)
# nb_cells_per_zone[unique_values - 1] = counts
nb_cells_per_zone = labeled_comprehension(a, a, np.arange(1, nb_zones+1), np.count_nonzero, int, 0)
return nb_cells_per_zone
from abc import ABC, abstractmethod
from wolfgpu.glsimulation import SimulationDuration
[docs]
class SimulatoryQuery:
""" Get data over the active zone, not the whoel domain!!!!!
"""
# This class exists to ensure the user does not fiddl with the
# SimulationRunner or the GLSimulation objects. Moreover, it makes sure
# every data query is bound to the modifiable area.
def __init__(self, simulator: GLSimulation, modifiable_area: tuple):
self._sim = simulator
self._area_i, self._area_j, self._area_width, self._area_height = modifiable_area
self._np_index = (slice(self._area_i, self._area_i+self._area_width), slice(self._area_j, self._area_j+self._area_height))
[docs]
def read_bathymetry(self) -> np.array:
return self._sim.read_bathymetry()[self._np_index]
[docs]
def read_infiltration_zones(self) -> np.array:
active = self._sim.active_infiltration_line(t=10)
return self._sim._infiltration_zones_values_array_cache[active, :]
[docs]
def set_current_infiltration_quantities(self, q: np.array):
active = self._sim.active_infiltration_line(t=10)
self._sim._infiltration_zones_values_array_cache[active, :] = q
from OpenGL.GL import GL_RED, GL_FLOAT, glTextureSubImage2D
glTextureSubImage2D(
self._sim.infiltration_zones_values_array,
0, # level
self._area_i,
self._area_j,
self._area_width,
self._area_height,
GL_RED,
GL_FLOAT,
q)
[docs]
def set_bathymetry(self, inf: np.array):
pass
[docs]
class InjectorWrapper:
def __init__(self, injector: SimulationInjector, sim: SimpleSimulation, infiltration_chronolgy: InfiltrationChronology):
self._injector = injector
self._injector_time_sampler = TimeSampler.make_one_shot_timer(
self._injector.time_to_first_injection( current_iteration=0, current_time=0),
name="DataInjection")
# Compute some data over the infiltrations over the active zone of this
# injector.
if sim.has_infiltration():
self._infiltration_zones_injection_zone = sim.infiltration_zones[injector.active_zone()]
nb_zones = infiltration_chronolgy.nb_zones
self._non_updated_infiltration_zones_count_active_zone = count_infiltration_cells(
self._infiltration_zones_injection_zone, nb_zones)
@property
[docs]
def injector(self):
return self._injector
@property
[docs]
def time_sampler(self):
return self._injector_time_sampler
[docs]
class OpenGLContext:
def __init__(self, sim: SimpleSimulation, gl_wmi: GlWindowManagerInterface = GlWindowManagerInterface.PYGAME):
# For the sake of backward compatibility we reuse any currently active OpenGL context.
self.gl_context_active = False
self._gl_wmi = gl_wmi
if gl_wmi == GlWindowManagerInterface.PYGAME:
import pygame
try:
pygame.display.Info()
self.gl_context_active = True
except:
pass
if not self.gl_context_active:
pygame.init()
nfo = pygame.display.Info()
window_width, window_height = estimate_best_window_size(nfo.current_w, nfo.current_h, sim.param_nx, sim.param_ny)
pygame.display.set_mode((window_width, window_height), pygame.OPENGL|pygame.DOUBLEBUF, vsync = 0)
self.page_flipper = pygame.display.flip
elif gl_wmi == GlWindowManagerInterface.GLFW:
import glfw
try:
# Hides GLFW complaining about being not initialized
warnings.simplefilter("ignore")
glfw.get_video_mode( glfw.get_primary_monitor())
self.gl_context_active = True
except:
# Most likely GLFW is not initialize
pass
finally:
warnings.resetwarnings()
if not self.gl_context_active:
import glfw # late binding to stay light on the dependencies
glfw.init()
vm = glfw.get_video_mode( glfw.get_primary_monitor())
window_width, window_height = estimate_best_window_size(vm.size.width, vm.size.height, sim.param_nx, sim.param_ny)
window = glfw.create_window(window_width, window_height, f"WolfGPU - glfw {self.gl_context_active}", None, None)
gl_wmi.window = window
glfw.make_context_current(window)
def glfw_page_flip():
glfw.poll_events()
glfw.swap_buffers(gl_wmi.window)
self.page_flipper = glfw_page_flip
[docs]
def destroy_context(self):
gl_clear_all_caches()
if not self.gl_context_active:
if self._gl_wmi == GlWindowManagerInterface.PYGAME:
pygame.quit()
elif self._gl_wmi == GlWindowManagerInterface.GLFW:
import glfw
glfw.destroy_window(self._gl_wmi.window)
glfw.terminate()
[docs]
class SimulationRunner:
[docs]
record: bool # True if record simulation progress
def __init__(self,
glsim: GLSimulation, record_path: Union[Path, str]=Path("."),
early_out_delta_max: float=1e-6,
record: bool=True,
page_flip_func=pygame.display.flip,
enable_alpha_recording: bool=False,
data_injector: SimulationInjector=None): # FIXME this is for backward compatibility
"""
:param record: Whether we need recording or not. This is used mainly
during the unit tests to avoid recording too much stuff.
:param record_path: Where to put the result store.
"""
assert early_out_delta_max is None or early_out_delta_max > 0, \
f"The early out threshold must be strictly positive to be useful. You gave {early_out_delta_max}."
self._glsim = glsim
self._early_out_delta_max=early_out_delta_max
self.current_quantity = 0
self._enable_alpha_recording = enable_alpha_recording
if isinstance(record_path, str):
self.record_path = Path(record_path)
else:
self.record_path = record_path
# This is used mainly during the unit tests to avoid recording too much stuff.
self._record = record
# self._results_store is None if we don't want to record.
self._results_store = None
self.quantities = np.array([]) # init like that to silence PyLance's warning
self.simulation_finished = False
# self.h_convergence = None
# self._last_h = None
self.last_recorded_time = 0
self.last_step_duration = None
# Full timer will be initialized at full_run UNLESS it was
# previously initialized by a restart from record.
self._full_timer = None
# delta represents the last difference measured on h,qx,qy from
# one frame to the other. It reminds how decision about early
# simulation stop, was made.
self._last_early_out_delta = None
self._zoom = 1
self._mean = 0
self.records = []
self._previous_results = None
self._still_simulation_count = 0
# When working with a synthetic scenario, that's the place
# where you can find the bathymetry update file...
# .npy is importnat because if you use np.save(...) it will
# prepend that suffix automatically :-)
if self.record_path:
self._bathy_file = self.record_path / "simul.top_update.npy"
if self._bathy_file.exists():
# Note the last modification time so that we can detect a
# new modification later on.
self._bathy_file_mtime = os.stat(self._bathy_file).st_mtime
else:
self._bathy_file_mtime = None
else:
self._bathy_file = None
# Upon initialisation, the GL Viewport must be set to the
# needs of the user. We'll use it for the progress plotting.
self._gl_window_viewport = glGetIntegerv(GL_VIEWPORT)
self._gl_page_flip_func = page_flip_func
self._timer_strategies = TimerManager(TIME_SAMPLING_RESOLUTION)
self._last_global_state_snapshot_timestep = None
self._reporting_time_sampler = None # defined here so one can call "run one step" easily
self._setup_injectors(data_injector, glsim)
[docs]
def _setup_injectors(self, data_injector, glsim: GLSimulation):
if isinstance(data_injector, InjectorWrapper):
self._data_injectors: List[InjectorWrapper] = [ data_injector]
elif isinstance(data_injector, list):
self._data_injectors: List[InjectorWrapper] = data_injector
elif data_injector is None:
self._data_injectors: List[InjectorWrapper] = []
else:
raise Exception(f"Don't know what to do with {data_injector}")
# Now we detect if we don't have (space) overlaping injectors.
if len(self._data_injectors) >= 2:
t = np.zeros((glsim.width, glsim.height), np.bool8)
for injector_wrapper in self._data_injectors:
inj = injector_wrapper.injector
if np.sum(t[inj.active_zone()]) == 0:
t[inj.active_zone()] = 1
else:
raise ValueError(f"One of the injection ({inj.active_zone()}) zones steps on another one")
self._injector_time_sampler = None # defined here so one can call "run one step" easily
@property
[docs]
def early_out_threshold(self) -> float:
return self._early_out_delta_max
@classmethod
[docs]
def continue_a_simulation(
klass,
sim:SimpleSimulation,
result_store_path: Union[Path, str],
refresh_view: float=0.1,
early_out_threshold = None,
injector:SimulationInjector = None ) -> ResultsStore:
opengl_context = OpenGLContext(sim, GlWindowManagerInterface.GLFW)
gpu_simulator:GLSimulation = load_simple_sim_to_gpu(sim)
simulation_runner = SimulationRunner(
gpu_simulator,
record=True,
record_path=result_store_path, early_out_delta_max=early_out_threshold,
page_flip_func=opengl_context.page_flipper,
data_injector=injector)
# For debugging
klass._simulation_runner = simulation_runner
simulation_runner.restart_from_record(None) #full_run(refresh_view=refresh_view)
simulation_runner.full_run(refresh_view=refresh_view)
# To avoid issues when having several simulations runs. But be aware
# that because of this, one cannot query the `sim` passed that point
# (since OpenGL stuff will be wiped out).
gpu_simulator.drop_gl_resources()
opengl_context.destroy_context()
return simulation_runner.results_store()
@classmethod
[docs]
def _prepare_quick_run(klass, sim:SimpleSimulation, record_path: Union[Path, str],
refresh_view: float=0.1,
early_out_threshold = None,
gl_wmi: GlWindowManagerInterface = GlWindowManagerInterface.PYGAME,
injector:SimulationInjector = None,
tiling:TilePackingMode = TilePackingMode.REGULAR) -> "SimulationRunner":
""" Run a full simulation quickly. By quickly we mean a lot of the setup of the simulator
(such as OpenGL context) is done automatically. In particular a progress window is
displayed.
:param sim: the simulation you want to run
:param record_path: a directory where to record the results of the simulation. We
accept `str` as well as `Path`.
:param refresh_view: how often the progress window is displyaed (every N seconds).
:return: At the end of the simulation a ´ResultStore´ is returned. It is open in
read only mode, so that it can be used to query your results.
"""
assert isinstance(sim, SimpleSimulation)
assert isinstance(gl_wmi, GlWindowManagerInterface)
assert isinstance(record_path, str) or isinstance(record_path, Path) or record_path is None
if isinstance(record_path, str):
record_path = Path(record_path)
# For the sake of backward compatibility we reuse any currently active OpenGL context.
opengl_context = OpenGLContext(sim, gl_wmi)
gpu_simulator:GLSimulation = load_simple_sim_to_gpu(sim)
infiltration_chronology = InfiltrationChronology.from_list( sim.infiltrations_chronology)
if isinstance(injector, SimulationInjector):
injector = [InjectorWrapper(injector, sim, infiltration_chronology)]
elif isinstance( injector, list):
injector = [InjectorWrapper(inj, sim, infiltration_chronology) for inj in injector]
elif injector is not None:
raise Exception(f"Don't know what to do with {injector}")
simulation_runner = SimulationRunner(gpu_simulator,record=record_path is not None,
record_path=record_path, early_out_delta_max=early_out_threshold,
page_flip_func=opengl_context.page_flipper,
data_injector=injector)
# FIXME pîggybacking simulation_runner with glcontext is so-so.
simulation_runner._opengl_context = opengl_context
if injector and sim.has_infiltration():
# We have an injector at work and we have some infiltrations zones.
# The difficult part here is that if the injector updates the
# infiltration zones, then the quantity of infiltrated water per
# cell may change (and thus needs to be recomputed). To make that
# computation faster, we keep a copy of the infiltration zones which
# belong to the active zone of the injector. We also keep a count of
# the number of cells in each zones in the non-modified (by the
# injector) infiltration zones. This way we may update these numbres
# more easily.
# The un-converted chronology (where the quantities where not divided by the
# area of infitlration zones)
simulation_runner._infiltration_chronology_user = infiltration_chronology
simulation_runner._infiltration_zones_count = count_infiltration_cells(
sim.infiltration_zones, infiltration_chronology.nb_zones)
else:
simulation_runner._infiltration_chronology_user = None
simulation_runner._infiltration_zones_injection_zone = None
simulation_runner._infiltration_zones_count = None
simulation_runner._infiltration_zones_count_active_zone = None
# For debugging
klass._simulation_runner = simulation_runner
return simulation_runner
@classmethod
[docs]
def quick_run(klass, sim:SimpleSimulation, record_path: Union[Path, str],
refresh_view: float=0.1,
early_out_threshold = None,
gl_wmi: GlWindowManagerInterface = GlWindowManagerInterface.PYGAME,
injector:Union[SimulationInjector, List[SimulationInjector]] = None,
tiling:TilePackingMode = TilePackingMode.REGULAR ) -> ResultsStore:
""" Run a full simulation quickly. By quickly we mean a lot of the setup of the simulator
(such as OpenGL context) is done automatically. In particular a progress window is
displayed.
:param sim: the simulation you want to run
:param record_path: a directory where to record the results of the simulation. We
accept `str` as well as `Path`.
:param refresh_view: how often the progress window is displyaed (every N seconds).
:param injector: A `SimulationInjector` or a list of them.
:return: At the end of the simulation a ´ResultStore´ is returned. It is open in
read only mode, so that it can be used to query your results.
"""
assert isinstance(sim, SimpleSimulation)
assert isinstance(gl_wmi, GlWindowManagerInterface)
assert isinstance(record_path, str) or isinstance(record_path, Path) or record_path is None
simulation_runner = SimulationRunner._prepare_quick_run(
sim, record_path,
refresh_view,
early_out_threshold,
gl_wmi,
injector,
tiling )
simulation_runner.full_run(refresh_view=refresh_view)
return SimulationRunner._finish_quickrun(simulation_runner)
@classmethod
[docs]
def _finish_quickrun(klass, simulation_runner: "SimulationRunner") -> ResultsStore:
# To avoid issues when having several simulations runs. But be aware
# that because of this, one cannot query the `sim` passed that point
# (since OpenGL stuff will be wiped out).
simulation_runner._glsim.drop_gl_resources()
simulation_runner._opengl_context.destroy_context()
if simulation_runner.record_path is not None:
return ResultsStore(simulation_runner.results_path(), "r")
else:
return None
[docs]
def scanned_file(self):
return self._bathy_file
@property
[docs]
def step_num(self):
""" The step that is going to be computed. Starts at zero.
"""
return self._glsim._current_step
[docs]
def restart_from_record(self, start_rec=None):
"""
Restart a simulation from a previous record. This is useful when you
want to continue a simulation that has been interrupted.
When restarting from a record, it means that the record in question will
be deleted and simulation will go on from there.
:param start_rec: the index of the record you want to start from. If
None, the simulation will restart from the last record.
"""
# start_rec == None: restart from end of the simulation.
if not self.record_file().exists():
raise Exception("While trying to restart from previous records, I didn't "
f"find {self.record_file()}. Maybe you are trying to "
"restart a simulation that has never been started before ?")
# Load the result store with current data
self._results_store = ResultsStore( self.record_file(), mode="a")
if start_rec is not None:
self._results_store.truncate_at_report(start_rec)
t,dt,n_iter_dry_up_euler, n_iter_dry_up_rk, h,qx,qy = self._results_store.get_last_result()
# Reload the state on the step we want to start from
self.last_recorded_time = self._results_store.get_last_named_result(ResultType.T)
# We can't know the last Δt for sure because most often we record every
# N steps. Therefore the last Δt doesn't make much sense.
self.last_step_duration = None
# The first time step numbe is 0. It represents the initial conditions.
# When one asks a 100 step simulation, then we'll have the step numbers
# 0,1,...,100: 0 (I.c.) + 100 steps == 101 records.
s = self._results_store.get_last_named_result(ResultType.STEP_NUM)
self._full_timer = time.time() - self._results_store.get_last_named_result(ResultType.CLOCK_T)
self._glsim.reset_globals(self.last_recorded_time, self._glsim.time_step, s, h, qx, qy)
# Yeah, tricky. That's because what we have is the start time of the
# last record (which may be a group of simulation steps, of which we
# can't know the overall duration if it is an optimized time step) so we
# have to put ourselves back there...
self._results_store.truncate_at_report(self._results_store.nb_results - 1)
self.simulation_finished = False
logging.info(f"Ready to restart from step {self._glsim._current_step}.")
[docs]
def results_path(self) -> Path:
assert self._results_store is not None, "Simulation has not produced any results"
return self._results_store.path
[docs]
def results_store(self) -> ResultsStore:
""" Gives a result store back. The result store will be opened in read mode.
"""
return ResultsStore(self.results_path(), "r")
[docs]
def record_file(self):
# Path of the result store (a directory)
return self.record_path
[docs]
def do_record(self, texture_ndx, global_state, force=False):
""" Record current state of the simulation.
Be careful, this implies reading data from the GPU which is *slow* !
"""
assert self.step_num >= 0, "must be zero based"
assert global_state is not None
alarm_value = global_state.time_step
emergency_stop = alarm_value is not None and (isnan(alarm_value) or alarm_value < 0)
if force: # We can force the reporting (used in debug)
should_we_record = True
elif self._record: # Has the user enabled the reporting ?
should_we_record = True
elif emergency_stop: # Something went worng in the computation
should_we_record = True
else:
should_we_record = False
if should_we_record:
if emergency_stop:
logging.error("Emergency stop! Recording additional matrices in current directory.")
np.save("q0", self._glsim.read_quantity_result(0))
np.save("q1", self._glsim.read_quantity_result(1))
np.save("alpha0", self._glsim.read_alpha(0))
np.save("alpha1", self._glsim.read_alpha(1))
with time_it("record step"):
self.last_recorded_time, self.last_step_duration, nb_dryup_iterations, nb_active_tiles_cumulated = \
global_state.simulation_time, global_state.simulation_step, global_state.nb_dryup_iterations, global_state.total_active_tiles
# print(f"t={self.simulation_time:.4f} acive line={active_inf_line} zones={inf_zones}")
# print(self._glsim.read_active_cells())
# print(read_texture(self._glsim._infiltration_fb, GL_COLOR_ATTACHMENT0, 64, 64, GL_R32I)[55:,55:])
#assert self.last_step_duration >= 0
#print(f"\nnb_dryup_iterations: {nb_dryup_iterations}")
self.quantities = self._glsim.read_tile_packed_quantity_result(texture_ndx)
h, hu, hv = self.quantities[:,:,0], self.quantities[:,:,1], self.quantities[:,:,2]
#np.save(Path(self.record_path, f"{self._glsim.basefilename}{self.step_num:07}.GPU_RH"), h)
# if False:
# plt.imshow(h,origin='lower')
# plt.show()
if MEM_TRACKING:
self.records.append([
self.last_recorded_time, np.max(h), np.mean(h),
np.max(hu), np.max(hv)])
# Our step num has not been incremented yet, but glsim's done already.
assert self.step_num == self._glsim._current_step, \
"The steps numbers must be in sync because they're used" \
" while restarting a sim from an unfinished one " \
f"{self.step_num} == {self._glsim._current_step}"
# A mostly dry mesh is a mesh where water height is not zero but
# very small (< MOSTLY_DRY_MESHES_THRESHOLD m).
nb_mostly_dry_meshes = np.count_nonzero((h < MOSTLY_DRY_MESHES_THRESHOLD) & (h > 0))
self._results_store.append_result(
self.step_num,
self.last_recorded_time, self.last_step_duration,
nb_dryup_iterations, nb_active_tiles_cumulated,
h, hu, hv, None, None, clock_time=time.time() - self._full_timer,
delta_early_out=self._last_early_out_delta or 0,
nb_mostly_dry_meshes=nb_mostly_dry_meshes)
if self._enable_alpha_recording:
# Stores the last alpha maps
# FIXME I save the two of them because the correct one to take
# depends on the number of dry up cancellation iterations that were
# done and I don't know that yet (I should put it in the
# globals).
alpha_correction = self._glsim.read_tile_packed_alpha(0)
self._results_store.append_additional_result("alpha_values0", alpha_correction[:,:,0])
self._results_store.append_additional_result("alpha_masks0", alpha_correction[:,:,1])
alpha_correction = self._glsim.read_tile_packed_alpha(1)
self._results_store.append_additional_result("alpha_values1", alpha_correction[:,:,0])
self._results_store.append_additional_result("alpha_masks1", alpha_correction[:,:,1])
active_cells_packed = self._glsim.read_active_cells_map()
self._results_store.append_additional_result(
"active_cells_packed", active_cells_packed
)
# alpha_correction = read_texture(
# self._glsim.access_fb, GL_COLOR_ATTACHMENT0, self._glsim.width, self._glsim.height, GL_RGB32F)
# self._results_store.additional_result("alpha", alpha_correction)
# if self._glsim._debugging:
# dbg1 = read_texture(self._glsim.access_fb,
# GL_COLOR_ATTACHMENT2, self._glsim.width, self._glsim.height, GL_RGBA32F)
# self._results_store.additional_result("debug1", dbg1)
if emergency_stop:
msg = f"NaN's were detected, simulation has been saved. Last written flip/flop={texture_ndx}"
logging.error(msg)
self._results_store.close()
raise Exception(msg)
return (h, hu, hv)
else:
return None
[docs]
def texture_written_by_last_step(self) -> int:
""" After running `run_one_step` this gives the number of the texture
that was *written* to (zero or one; rememeber the GPU code often uses
flip-flop textures, that's what you query here).
"""
return self.current_quantity
[docs]
def texture_read_by_last_step(self) -> int:
return 1 - self.current_quantity
[docs]
def _check_simulation_is_progressing(self, h, hu, hv):
# We end the simulation if things don't move anymore
if self._previous_results is None:
self._previous_results = (h, hu, hv)
else:
prev_h, prev_hu, prev_hv = self._previous_results
delta_h = np.abs(h - prev_h)
delta_hu = np.abs(hu - prev_hu)
delta_hv = np.abs(hv - prev_hv)
sum_delta = np.sum(delta_h + delta_hu + delta_hv)
self._previous_results = (h, hu, hv)
self._last_early_out_delta = sum_delta
if self._early_out_delta_max is not None:
#print(sum_delta)
if sum_delta <= self._early_out_delta_max:
self._still_simulation_count += 1
if self._still_simulation_count >= 3:
logging.warning(f"Simulation is still, stopping at t={self.last_recorded_time}s after {self.step_num} iterations.")
self.simulation_finished = True
self.cancelled_early = True
else:
logging.info(f"Simulation is slowing down: |Δh|+|ΔQx|+|ΔQy| = {sum_delta}")
else:
# Reset count
self._still_simulation_count = 0
[docs]
def _refresh_display_parameters(self):
# Recompute the min/max of the array
# min/max is used to improve the display readability
with time_it("Refresh zoom, read_tile_packed_quantity_result"):
#print("read qty")
p = self._glsim.read_tile_packed_quantity_result(0) # 1-self.current_quantity)
h = p[:,:,0] # read water height
msk = h > 0
if msk.any():
valid_heights = h[msk] # Avoid mean on empty arrays
self._mean = (np.mean(valid_heights) + self._mean)/2
std = np.std(valid_heights)
else:
self._mean = 0
std = 0
# p = self._glsim.read_tile_packed_quantity_result(self.current_quantity)
# h = p[:,:,0] # read water height
# h_max = max(np.max(h), h_max)
# How this works:
# I average the old zoom value and the new one.
# I want color to be distributed around the mean (0.5), from 0 to 1.
# So the max deviation is 0.5.
# I want to see 2 std => 2*std
# I want everythign to be xformed back into 0..1 => I divide by 2*std.
# If std happens to be zero, then we set it to 0.1
self._zoom = (0.5/(2*max(0.1,std)) + self._zoom)/2
logging.debug(f"Rezooming at mean={self._mean:3g}, zoom={self._zoom:3g}")
# if self.h_convergence and self._last_h is not None:
# if abs(np.max(h - self._last_h)) < self.h_convergence:
# logging.info("Simulation has converged")
# self.simulation_finished = True
# self._last_h = h
[docs]
def _read_simulation_global_state(self) -> GLSimulationGlobalState:
""" Read the current global state of the simulation.
The raison d'être of this method is to allow to to cache the result
of the query if the global state because it is expensive.
"""
# This caching mechanism is important because reading the global state
# may be expensive (although I didn't actually check the cost).
if self._last_global_state_snapshot_timestep != self._glsim._current_step:
self._snapshot_global_state = self._glsim.read_global_state()
return self._snapshot_global_state
[docs]
def _handle_data_injector(self, proxy: SimulationProxy, data_injector_wrapper: InjectorWrapper, sim_finished_step: int, sim_finished_step_time: float, source_quantity: int,
infiltration_chronology) -> SimulationProxy :
data_injector: SimulationInjector = data_injector_wrapper.injector
next_stop = data_injector.do_updates(proxy)
if next_stop:
self._timer_strategies.update_one_shot_timer(data_injector_wrapper.time_sampler, next_stop, sim_finished_step, sim_finished_step_time)
# FIXME _next_stop is not used anymore
self._glsim._next_stop = self._timer_strategies.next_timer_trigger_iteration
else:
# None means don't call the update anymore.
pass
return proxy
[docs]
def _run_all_injectors(self, source_quantity:int, triggered_timers: set[TimeSampler], global_state:GLSimulationGlobalState) -> SimulationProxy:
proxy = None
for data_injector in self._data_injectors:
if data_injector.time_sampler in triggered_timers:
if proxy is None:
# Proxy is created only if at least on injector needs it
proxy = SimulationProxy(
self._glsim,
global_state,
None,
source_quantity,
self._infiltration_chronology_user)
proxy._set_zone_of_interest(data_injector._injector.active_zone())
self._handle_data_injector(
proxy, data_injector,
# At the end of the simulation step (in glsimulation), the
# current step is increased by one. So if we refer to the
# prefious sim time, we need to refere to the previous step
# too, hence the -1.
global_state.simulation_step,
# The previous sim time is the time at teh beginning of the step.
# FIXME We could argue the injectors are ran between two steps...
global_state.simulation_time,
source_quantity,
self._infiltration_chronology_user)
return proxy
[docs]
def _apply_changes_in_proxy(self, proxy: SimulationProxy, source_quantity: int):
# Some injector triggered, hence a proxy was set up.
if proxy is not None:
# Now let's check if some injector actually used the proxy
if proxy._bathymetry_was_set:
upload_np_array_to_gpu(
GL_TEXTURE_RECTANGLE,
GL_R32F,
self._glsim._tile_packer.shuffle_and_pack_array( proxy._cached_bathymetry),
texture_id=self._glsim.bathymetry_tex)
if proxy._h_qx_qy_was_set:
upload_np_array_to_gpu(
GL_TEXTURE_RECTANGLE,
GL_RGBA32F,
self._glsim._tile_packer.shuffle_and_pack_array( proxy._cached_h_qx_qy),
texture_id=self._glsim.quantity_tex[source_quantity])
if self._infiltration_chronology_user is not None:
all_updated_counts = np.zeros( (self._infiltration_chronology_user.nb_zones,))
all_preupdate_counts = np.zeros( (self._infiltration_chronology_user.nb_zones,))
if proxy._infiltration_zones_was_set:
for data_injector in self._data_injectors:
# Count over zone of interest
all_updated_counts += count_infiltration_cells(
proxy._cached_infiltration_zones[data_injector.injector.active_zone()],
self._infiltration_chronology_user.nb_zones)
all_preupdate_counts += data_injector._non_updated_infiltration_zones_count_active_zone
from wolfgpu.glsimulation import MESH_WITHOUT_INFILTRATION
packed = self._glsim._tile_packer.shuffle_and_pack_array(
np.ascontiguousarray(proxy._cached_infiltration_zones - 1),
neutral_values=MESH_WITHOUT_INFILTRATION)
upload_np_array_to_gpu(
GL_TEXTURE_RECTANGLE,
GL_R32I,
packed,
texture_id=self._glsim.infiltration_zones_texture)
if proxy._infiltration_chronology_was_set or proxy._infiltration_zones_was_set:
self._glsim.set_buffers( infiltration_timings=
self._glsim._infiltrations_timings_as_gpu_array(
self._infiltration_zones_count - all_preupdate_counts + all_updated_counts,
self._infiltration_chronology_user))
[docs]
def run_one_step(self, dont_close_results:bool= False):
""" Run only one step of the simulation.
Globally, this method does the following:
- Run the GLSimulation for one step with the correct `source_quantity`
- Record the results if needed
- Inverse the `source_quantity` index
- Perform some estimation of the next stop time
:param dont_close_results: If True, the results store will not be closed
"""
if self.simulation_finished:
raise Exception("Simulation is finished, you can't run it anymore")
# step_num is currently self._glsim._current_step
if self._glsim._current_step == 0:
# Recording initial conditions first
self.do_record(texture_ndx=0, global_state=self._read_simulation_global_state())
with time_it("Run one step"):
source_quantity = self.current_quantity
dest_quantity = 1 - source_quantity
self._glsim.simulation_step(source_quantity)
sim_finished_step = self._glsim._current_step
triggered_timers = self._timer_strategies.update_clock_wall_timers()
# print(f"{sim_finished_step} >= ? {self._timer_strategies.next_timer_trigger_iteration} {self._last_global_state_snapshot_timestep}")
if sim_finished_step < 10 or self._timer_strategies.next_timer_trigger_iteration is None or sim_finished_step >= self._timer_strategies.next_timer_trigger_iteration:
gs = self._read_simulation_global_state()
if (self._glsim.simulation_duration.type == SimulationDurationType.STEPS and gs.simulation_step >= self._glsim.simulation_duration.duration) \
or (self._glsim.simulation_duration.type == SimulationDurationType.SECONDS and gs.simulation_time > self._glsim.simulation_duration.duration):
#print(f"Finished {gs.simulation_step} steps / {gs.simulation_time} >= {self._glsim.simulation_duration}")
self.simulation_finished = True
# Circular buffer
b = gs.simulation_step % INJECTION_TRIGGER_TIME_ESTIMATOR_DEPTH
if gs.simulation_step >= INJECTION_TRIGGER_TIME_ESTIMATOR_DEPTH:
steps = gs.steps_times[b:] + gs.steps_times[0:b]
else:
steps = gs.steps_times[0:b]
# Give the past memorized time steps to the timer manager
# so that it can better estimate next stop programmed
# on a time basis.
self._timer_strategies.set_simulation_time_steps(steps)
# Updating all the timers
triggered_timers |= self._timer_strategies.update(gs.simulation_step, gs.simulation_time)
self._glsim._next_stop = self._timer_strategies.next_timer_trigger_iteration
# print(f"synced at step {sim_finished_step}, time {sim_finished_step_time}")
# print(f"Current step {sim_finished_step}. Next stop: {self._timer_strategies.next_timer_trigger_iteration}")
# print(self._timer_strategies)
# print(f"Triggered : {triggered_timers}")
if self._reporting_time_sampler in triggered_timers:
# self._glsim._current_step is increased at the end of a simulation step
#print(f"Record step {gs.simulation_step} time {gs.simulation_time}")
recorded_res = self.do_record(dest_quantity, global_state=gs)
if recorded_res:
h, hu, hv = recorded_res
# ATTENTION! checking if the simulation is prorgressing requires
# reading the unknowns vector. Since that is very expensive,
# we decide that, for the moment, we only check the stop condition
# for "free", that is when we do have some reporting. So, disabling
# the reporting implies disabling the progress checking
# FIXME I have been caught by that so I guess a regular user
# will be surprised too...
self._check_simulation_is_progressing(h, hu, hv)
proxy = self._run_all_injectors(dest_quantity, triggered_timers, gs)
if proxy is not None:
# Some injector triggered, hence a proxy was set up.
self._apply_changes_in_proxy(proxy, dest_quantity)
# FIXME REAAALLLLY dirty. I do that because I need glsim to generate the "constans.frs" include
# before I can load these shaders...
if not hasattr(self, "_vertex_shader2"):
self._vertex_shader2 = load_shader_from_file(Path(SHADER_PATH,"simplevertexshader.vs"))
self._active_tiles_shader2 = load_shader_from_file(Path(SHADER_PATH,"active_tiles_quad.gs"))
self._texture_viewer_shader2 = load_shader_from_file(Path(SHADER_PATH,"textureViewerShader.frs"))
self._texture_viewer_program2 = load_program(self._vertex_shader2, self._texture_viewer_shader2, self._active_tiles_shader2)
self._vertex_shader = load_shader_from_file(Path(SHADER_PATH,"simplevertexshader.vs"))
self._active_tiles_shader = load_shader_from_file(Path(SHADER_PATH,"active_tiles_quad.gs"))
self._texture_viewer_shader = load_shader_from_file(Path(SHADER_PATH,"textureViewerShader.frs"))
self._texture_viewer_program = load_program(self._vertex_shader, self._texture_viewer_shader)
with time_it("Run one step - admin"):
# --------------------------------------------------------------------
# Flip/flop the (source) current_quantity index
if self.current_quantity == 0:
self.current_quantity = 1
else:
self.current_quantity = 0
if self.simulation_finished and not dont_close_results and self._results_store is not None:
self._results_store.close()
return triggered_timers
[docs]
def plot_progress2(self):
# FIXME Accessing _glsim's private stuff is bad abstraction...
domain_size = self._glsim.width, self._glsim.height
uniforms = {
"zoom": float(self._zoom),
"mean": float(self._mean),
# FragCoord (screen) to texture coord
"textureCoordinateTransform": (
float(domain_size[0]/self._gl_window_viewport[2]),
float(domain_size[1]/self._gl_window_viewport[3]))
}
textures= {
"quantityTexture": self._glsim.quantity_tex[1-self.current_quantity],
"tileIndirection" : self._glsim._tile_indirection_tex,
}
#print(self._glsim.nbx, self._glsim.nby)
glUseProgram(self._texture_viewer_program2)
glBindFramebuffer(GL_FRAMEBUFFER, GL_NONE) # draw on screen
glClear(GL_COLOR_BUFFER_BIT)
self._glsim._draw_active_tiles(self._texture_viewer_program2, uniforms, textures, self._gl_window_viewport)
# def plot_progress(self):
# if self._glsim._optim_geom_shaders:
# return self.plot_progress2()
# # glUseProgram(self._texture_viewer_program)
# # #self._zoom = 0.5
# # set_uniform(self._texture_viewer_program, "zoom", float(self._zoom))
# # set_uniform(self._texture_viewer_program, "mean", float(self._mean))
# # # For reaon unbeknown to me, this doesn't give what I want
# # # as window width/height...
# # # window_width, window_height = pygame.display.get_surface().get_size()
# # r = (self._glsim.nbx/self._glsim._window_width,
# # self._glsim.nby/self._glsim._window_height)
# # set_uniform(self._texture_viewer_program, "textureCoordinateTransform", r)
# # glBindFramebuffer(GL_FRAMEBUFFER, GL_NONE) # draw on screen
# # glClear(GL_COLOR_BUFFER_BIT)
# # # FIXME Encapsulation is no good here
# # set_texture(self._texture_viewer_program, "sTexture",
# # self._glsim.quantity_tex[1-self.current_quantity], 0)
# # x,y,w,h = 0,0,self._glsim._window_width,self._glsim._window_height
# # self._glsim._draw_active_tiles(self._texture_viewer_program,x,y,w,h)
# # # self._glsim._draw_active_tiles(self._texture_viewer_program,
# # # 0, self._glsim.height//4,
# # # self._glsim.width, self._glsim.height//4)
# def has_handle(self, fpath):
# for proc in psutil.process_iter():
# try:
# for item in proc.open_files():
# if fpath == item.path:
# return True
# except Exception as ex:
# print(ex)
# pass
# return False
[docs]
def file_in_use(self, f):
try:
os.rename(f, f)
return False
except OSError:
return True
[docs]
def _try_reload_bathymetry(self):
try:
# Using a try catch seems overkill, but we had something
# like that happening in 2023. Error was:
# [WinError 5] Accès refusé ...
bath_exists = self._bathy_file is not None and self._bathy_file.exists()
except Exception as ex:
logging.error(f"Can't check if {self._bathy_file} exists: {ex}")
bath_exists = False
if bath_exists:
try:
new_time = os.stat(str(self._bathy_file)).st_mtime
if new_time != self._bathy_file_mtime:
print() # Nicer CLI
logging.info(f"Loading bathymetry file for update: {self._bathy_file}")
self._bathy_file_mtime = new_time
if not self.file_in_use(str(self._bathy_file)):
# from wolfhece.wolf_array import WolfArray_Sim2D
# a = WolfArray_Sim2D(fname=str(self._bathy_file))
# a.preload = True
# a.read_all()
# logging.info(f"Bathymetry file shape is {a.array.shape} {a.nbx}x{a.nby}")
# self._glsim.set_buffers(bathymetry_array=np.ascontiguousarray(np.transpose(a.array)))
try:
# We transpose because we expect the user will pass us an array
# following the usual Wolf convention.
a = np.load(self._bathy_file).transpose()
except Exception as ex:
logging.error(f"Can't load bathymetry file {self._bathy_file}. Numpy can't read it, it says: \"{ex}\"")
return
if a.shape != (self._glsim.nby, self._glsim.nbx):
if self._glsim.nby == a.shape[1] and self._glsim.nbx == a.shape[0]:
suggestion = " Maybe you need to transpose ?"
else:
suggestion = ""
logging.error(f"The shape of the bathymetry update file is not correct. You gave x={a.shape[0]}, y={a.shape[1]} columns. I expect {self._glsim.nbx} and {self._glsim.nby}.{suggestion}")
return
self._glsim.set_buffers(bathymetry_array=a)
logging.info(f"Loaded bathymetry file {self._bathy_file}, shape is {a.shape}")
if self._results_store is not None:
gs = self._glsim.read_global_state()
self._results_store.add_event(SimulationEventType.BathymetryUpdate, simulation_time= gs.simulation_time, simulation_step=self.step_num, data=self._bathy_file)
# z = np.zeros((128,128), dtype=np.float32)
# z[:,0:10] = 100
# self._glsim.set_buffers(bathymetry_array=z)
else:
logging.error(f"The bathymetry file {self._bathy_file} is opened by another process")
# else:
# logging.warn(f"The bathymetry file {self.record_path} didn't change since the last time I've seen it")
except Exception as ex:
logging.error(f"Something went wrong: {ex}")
[docs]
def init_result_store(self):
if self._record:
if self._results_store is None:
self._results_store = ResultsStore( self.record_file(), mode="w", tile_packer=self._glsim.tile_packer())
[docs]
def full_run(self, refresh_view:int = 1, dont_close_results:bool= False, refresh_colormap:int = 10):
""" Perform a full simulation run.
:param refresh_view: Refresh the view every `refresh_view` seconds
:param dont_close_results: Leave the result store open
:param refresh_colormap: Refresh the colormap every `refresh_colormap * refresh_view` seconds
"""
self._setup_full_run(refresh_view, refresh_update_colormap= refresh_colormap)
NB_ITER_STATS=100
iteration_times = np.zeros( (NB_ITER_STATS,))
if MEM_TRACKING:
mem_tracker = tracker.SummaryTracker()
# {percentage:3.0f}%
with tqdm.tqdm(total=100, bar_format="{desc}|{bar}|{postfix}", ncols=80) as taqadum:
running = True
# Inidcates if the user or the early-out condition was met before
# the planned end of the simulation.
self.cancelled_early = False
# result store might be already opened if the user wants
# to continue an existing simulation.
self.init_result_store()
while running and not self.simulation_finished:
triggered_timers = self.run_one_step(dont_close_results)
# if triggered_timers:
# print([str(tt) for tt in triggered_timers])
with time_it("-- Page flip"):
refresh = self._refresh_view_time_sampler in triggered_timers
if (self.step_num == 1 or refresh):
# Two plot progress else double buffering shows flicker
if self._display_parameters_time_sampler in triggered_timers:
self._refresh_display_parameters()
self.plot_progress2()
# Swapping buffers avoids flickering
# It is only useful if we are in a windowed mode
self._gl_page_flip_func()
if True:
if MEM_TRACKING and self.step_num % 40 == 0:
mem_tracker.print_diff()
log_mem_used()
self._admin_tasks(iteration_times, triggered_timers, taqadum)
running = self._must_continue_event()
self._teardown_full_run(iteration_times)
[docs]
def _admin_tasks(self, iteration_times:np.ndarray, triggered_timers:set, taqadum:tqdm.tqdm):
""" Perform administrative tasks such as updating the progress bar,
compute indicators or reload bathymetry. """
NB_ITER_STATS = iteration_times.shape[0]
with time_it("-- admin tasks"):
iteration_times[self.step_num % NB_ITER_STATS] = time.time()
if self.step_num > NB_ITER_STATS:
# step_num+1 MOD ... is the *first* in the list !!!
stats_total_duration = iteration_times[self.step_num % NB_ITER_STATS] - iteration_times[(self.step_num +1) % NB_ITER_STATS]
average_duration = stats_total_duration / NB_ITER_STATS
active_meshes_per_second = self._glsim.nb_active_meshes/average_duration
pt = f"{(active_meshes_per_second/1_000_000)/average_duration:.1f}"
self.meshes_per_second = pt
else:
pt = "?? "
#print("yo-4")
# Round so that we actually see the time step increments
#print(f"debug: {self._glsim.last_step_duration}")
# self.last_step_duration > 0, the step duration is recorded at the beginning
# of the iteration => it may have never been initialized (at begin of sim, or at beginiing of restart)
gs = self._read_simulation_global_state()
if gs.simulation_step is not None and gs.simulation_step > 0:
# total_time_simulation_str = str(
# round(self._glsim.last_gpu_time,
# ceil(abs(log10(self._glsim.current_simulation_time_step))))) + "s"
total_time_simulation_str = seconds_to_duration_str(gs.simulation_time)
last_duration_str = f"{gs.time_step:.4g}s"
elif gs.simulation_time is not None:
total_time_simulation_str = seconds_to_duration_str(gs.simulation_time)
last_duration_str = "--"
else:
total_time_simulation_str = "--"
last_duration_str = "--"
if self._glsim.simulation_duration.type == SimulationDurationType.SECONDS:
#print(100 * self.last_recorded_time / self._glsim.simulation_duration.duration)
percent_done = 100* max(self.last_recorded_time or 0, gs.simulation_time or 0) / self._glsim.simulation_duration.duration
else:
percent_done = 100*self.step_num / self._glsim.simulation_duration.duration
percent_done = min(round(percent_done),100)
stats_total_duration = iteration_times[self.step_num % NB_ITER_STATS] - iteration_times[(self.step_num +1) % NB_ITER_STATS]
average_duration = stats_total_duration / NB_ITER_STATS
iter_per_sec = 1 / average_duration
# logging_iterations += 1
if self._logger_time_sampler in triggered_timers or self.simulation_finished:
if self._record:
if self._results_store.nb_results > 1:
rec_flag = f"[{self._results_store.nb_results} records] "
elif self._results_store.nb_results == 1:
rec_flag = f"[{self._results_store.nb_results} record] "
else:
rec_flag = "[No record]"
else:
rec_flag = ""
fact_rk = 2 if self._glsim.ponderation_runge_kutta is not None and self._glsim.ponderation_runge_kutta != 1 else 1
taqadum.set_postfix_str(f"{iter_per_sec:.2f} it./sec.")
taqadum.set_description(f"{rec_flag} t={total_time_simulation_str} Δt={last_duration_str} {gs.nb_active_tiles * fact_rk * 16*16/1_000_000:.2f}M cells/sec. {gs.nb_dryup_iterations} dryups iter.")
taqadum.update(percent_done - taqadum.n)
# logging_iterations = 0
self._try_reload_bathymetry()
[docs]
def _must_continue_event(self) -> bool:
""" Check if the simulation was cancelled early by the user or by an
early-out condition.
:return bool: True if the simulation should continue, False if it should stop.
"""
# FIXME Dirty. We're bypassing a glfw context here...
if self._gl_page_flip_func == pygame.display.flip:
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
self.cancelled_early = True
elif event.type == pygame.KEYDOWN:
if event.key in (pygame.K_ESCAPE, pygame.K_q):
running = False
self.cancelled_early = True
return not self.cancelled_early
[docs]
def _setup_full_run(self, refresh_view:int=1, refresh_update_colormap:int = 10):
""" Prepare the simulation for a full run.
:param refresh_view: Refresh the view every `refresh_view` seconds.
:param refresh_update_colormap: Refresh the colormap every `refresh_view * refresh_update_colormap` seconds.
"""
logging.debug(f"FINISH={FINISH} STRIPES={STRIPES_ON_NAP}")
if self._glsim.nb_active_meshes <= 64*64:
# If sim too small, then one-shot textures get in the way.
warning = "(!!! very approximate !!!)"
else:
warning = "(approximate)"
logging.info(f"Total memory for textures: {total_textures_size()/1000_000:.1f}Mb, bytes per mesh {warning}:{int(total_textures_size()/self._glsim.nb_active_meshes)}")
# Make sure the reporting frequency is meaningful
if self._glsim.report_frequency.type == self._glsim.simulation_duration.type:
assert self._glsim.report_frequency.duration <= self._glsim.simulation_duration.duration, \
f"Report frequency {self._glsim.report_frequency} is above simulation's duration {self._glsim.simulation_duration}"
# debug
if self._full_timer is None:
# Init unless already initialized (most likely because we restarted a simulation)
self._full_timer = time.time()
self._reporting_time_sampler = TimeSampler.make_periodic_timer(self._glsim.report_frequency, name="WriteReport")
self._refresh_view_time_sampler = TimeSampler.make_periodic_wall_clock_timer(period=SimulationDuration.from_seconds(refresh_view), triggers_on_zero=True, name="RefreshViewGl")
self._display_parameters_time_sampler = TimeSampler.make_periodic_wall_clock_timer(period=SimulationDuration.from_seconds(refresh_view*refresh_update_colormap), triggers_on_zero=True, name="RefreshViewParam")
self._logger_time_sampler = TimeSampler.make_periodic_wall_clock_timer(period=SimulationDuration.from_seconds(0.5), triggers_on_zero=True, name="ConsoleLogger")
self._timer_strategies.reset()
self._timer_strategies.add_strategy( self._reporting_time_sampler)
self._timer_strategies.add_strategy( self._refresh_view_time_sampler)
self._timer_strategies.add_strategy( self._display_parameters_time_sampler)
self._timer_strategies.add_strategy( self._logger_time_sampler)
for data_injector in self._data_injectors:
self._timer_strategies.add_strategy(data_injector.time_sampler)
[docs]
def _teardown_full_run(self, iteration_times):
NB_ITER_STATS = iteration_times.shape[0]
if self.cancelled_early:
# When you stop early or by hand in the middle of two record steps
# which are 1 hours apart, you don't want to loose too much
# information (say half an hour). So we record the last moment.
self.do_record(texture_ndx=1 - self.current_quantity, force=True, global_state=self._glsim.read_global_state())
# Yes, I had the equality once. So, you see, these things, they happen.
new_time = time.time()
while new_time == self._full_timer:
new_time = time.time()
self.total_duration = time.time() - self._full_timer
self.iter_per_second = self.step_num / self.total_duration
if self._record:
logging.info(f"WolfGPU records stored in {self.record_file().resolve()}")
logging.debug(f"Last delta = {self._last_early_out_delta}.")
if self.step_num >= NB_ITER_STATS:
stats_total_duration = iteration_times[self.step_num % NB_ITER_STATS] - iteration_times[(self.step_num +1) % NB_ITER_STATS]
average_duration = stats_total_duration / NB_ITER_STATS
if self._record:
recs = f"{self._results_store.nb_results} records."
else:
recs = "No records."
logging.info(f"{self.iter_per_second:g} iter/sec overall. {1/average_duration:.2f} it./s over the last {NB_ITER_STATS} iterations. {self.step_num} steps. {recs}")