Source code for wolfgpu.SimulationRunner

"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import logging
import os
import sys
import time
from datetime import datetime
from datetime import timedelta
from math import floor, sqrt, isnan, ceil
from pathlib import Path
from typing import Union, Tuple, List
from enum import Enum
import warnings
import os

os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
import pygame
import numpy as np
from scipy.ndimage import labeled_comprehension

import tqdm
from OpenGL.GL import GL_FRAMEBUFFER, GL_NONE, GL_RGB32F, GL_RGBA32F, glGetIntegerv, GL_COLOR_ATTACHMENT0, GL_COLOR_ATTACHMENT2, \
    GL_COLOR_BUFFER_BIT, GL_FRAMEBUFFER, GL_NONE, GL_RGB32F, GL_RGBA32F, GL_VIEWPORT, \
    glBindFramebuffer, glClear, glGetIntegerv, glUseProgram, GL_MAJOR_VERSION, \
    GL_TEXTURE_RECTANGLE, GL_RGBA32F, GL_R32F, GL_R32I

from .gl_utils import load_program, load_shader_from_file, read_texture, total_textures_size, gl_clear_all_caches, estimate_best_window_size, upload_np_array_to_gpu, update_texture

from .glsimulation import FINISH, SHADER_PATH, STRIPES_ON_NAP, GLSimulation, log_mem_used, time_it, TIME_SAMPLING_RESOLUTION, GLSimulationGlobalState, TilePackingMode
from .results_store import ResultsStore, SimulationEventType, ResultType
from .simple_simulation import SimulationDurationType, SimpleSimulation, SimulationDuration, InfiltrationChronology
from .loaders.simple_sim_loader import load_simple_sim_to_gpu
from .utils import EveryNSeconds, seconds_to_duration_str
from .sampled_timer import TimeSamplingStrategy, TimerManager, TimeSampler
from .injector import SimulationInjector, SimulationProxy

[docs] class GlWindowManagerInterface(Enum):
[docs] PYGAME = "pygame"
[docs] GLFW = "glfw"
@classmethod
[docs] def for_glfw( klass, glfw_window): """ If one wants to call the swap buffer function in glfw, one needs to have a reference to the window. """ wmi = GlWindowManagerInterface.GLFW wmi.window = glfw_window return wmi
[docs] MEM_TRACKING=False
if MEM_TRACKING: from pympler import tracker
[docs] MOSTLY_DRY_MESHES_THRESHOLD = 1e-5
""" Number of steps recorded in the database of simulation times used by the injection triggering to estimate the next trigger. """
[docs] INJECTION_TRIGGER_TIME_ESTIMATOR_DEPTH = 100
[docs] def count_infiltration_cells(a, nb_zones): """ :param a: infiltration zones array. Zone indices are 1 baseD. :return: a numpy array (1D) with all the counts, one per zone. """ # nb_cells_per_zone = np.zeros( nb_zones) # # The "1" thing is because we're one based... # unique_values, counts = np.unique(a[a >= 1], return_counts = True) # nb_cells_per_zone[unique_values - 1] = counts nb_cells_per_zone = labeled_comprehension(a, a, np.arange(1, nb_zones+1), np.count_nonzero, int, 0) return nb_cells_per_zone
from abc import ABC, abstractmethod from wolfgpu.glsimulation import SimulationDuration
[docs] class SimulatoryQuery: """ Get data over the active zone, not the whoel domain!!!!! """ # This class exists to ensure the user does not fiddl with the # SimulationRunner or the GLSimulation objects. Moreover, it makes sure # every data query is bound to the modifiable area. def __init__(self, simulator: GLSimulation, modifiable_area: tuple): self._sim = simulator self._area_i, self._area_j, self._area_width, self._area_height = modifiable_area self._np_index = (slice(self._area_i, self._area_i+self._area_width), slice(self._area_j, self._area_j+self._area_height))
[docs] def read_bathymetry(self) -> np.array: return self._sim.read_bathymetry()[self._np_index]
[docs] def read_infiltration_zones(self) -> np.array: active = self._sim.active_infiltration_line(t=10) return self._sim._infiltration_zones_values_array_cache[active, :]
[docs] def set_current_infiltration_quantities(self, q: np.array): active = self._sim.active_infiltration_line(t=10) self._sim._infiltration_zones_values_array_cache[active, :] = q from OpenGL.GL import GL_RED, GL_FLOAT, glTextureSubImage2D glTextureSubImage2D( self._sim.infiltration_zones_values_array, 0, # level self._area_i, self._area_j, self._area_width, self._area_height, GL_RED, GL_FLOAT, q)
[docs] def set_bathymetry(self, inf: np.array): pass
[docs] class InjectorWrapper: def __init__(self, injector: SimulationInjector, sim: SimpleSimulation, infiltration_chronolgy: InfiltrationChronology): self._injector = injector self._injector_time_sampler = TimeSampler.make_one_shot_timer( self._injector.time_to_first_injection( current_iteration=0, current_time=0), name="DataInjection") # Compute some data over the infiltrations over the active zone of this # injector. if sim.has_infiltration(): self._infiltration_zones_injection_zone = sim.infiltration_zones[injector.active_zone()] nb_zones = infiltration_chronolgy.nb_zones self._non_updated_infiltration_zones_count_active_zone = count_infiltration_cells( self._infiltration_zones_injection_zone, nb_zones) @property
[docs] def injector(self): return self._injector
@property
[docs] def time_sampler(self): return self._injector_time_sampler
[docs] class OpenGLContext: def __init__(self, sim: SimpleSimulation, gl_wmi: GlWindowManagerInterface = GlWindowManagerInterface.PYGAME): # For the sake of backward compatibility we reuse any currently active OpenGL context. self.gl_context_active = False self._gl_wmi = gl_wmi if gl_wmi == GlWindowManagerInterface.PYGAME: import pygame try: pygame.display.Info() self.gl_context_active = True except: pass if not self.gl_context_active: pygame.init() nfo = pygame.display.Info() window_width, window_height = estimate_best_window_size(nfo.current_w, nfo.current_h, sim.param_nx, sim.param_ny) pygame.display.set_mode((window_width, window_height), pygame.OPENGL|pygame.DOUBLEBUF, vsync = 0) self.page_flipper = pygame.display.flip elif gl_wmi == GlWindowManagerInterface.GLFW: import glfw try: # Hides GLFW complaining about being not initialized warnings.simplefilter("ignore") glfw.get_video_mode( glfw.get_primary_monitor()) self.gl_context_active = True except: # Most likely GLFW is not initialize pass finally: warnings.resetwarnings() if not self.gl_context_active: import glfw # late binding to stay light on the dependencies glfw.init() vm = glfw.get_video_mode( glfw.get_primary_monitor()) window_width, window_height = estimate_best_window_size(vm.size.width, vm.size.height, sim.param_nx, sim.param_ny) window = glfw.create_window(window_width, window_height, f"WolfGPU - glfw {self.gl_context_active}", None, None) gl_wmi.window = window glfw.make_context_current(window) def glfw_page_flip(): glfw.poll_events() glfw.swap_buffers(gl_wmi.window) self.page_flipper = glfw_page_flip
[docs] def destroy_context(self): gl_clear_all_caches() if not self.gl_context_active: if self._gl_wmi == GlWindowManagerInterface.PYGAME: pygame.quit() elif self._gl_wmi == GlWindowManagerInterface.GLFW: import glfw glfw.destroy_window(self._gl_wmi.window) glfw.terminate()
[docs] class SimulationRunner:
[docs] record: bool # True if record simulation progress
[docs] _glsim: GLSimulation
def __init__(self, glsim: GLSimulation, record_path: Union[Path, str]=Path("."), early_out_delta_max: float=1e-6, record: bool=True, page_flip_func=pygame.display.flip, enable_alpha_recording: bool=False, data_injector: SimulationInjector=None): # FIXME this is for backward compatibility """ :param record: Whether we need recording or not. This is used mainly during the unit tests to avoid recording too much stuff. :param record_path: Where to put the result store. """ assert early_out_delta_max is None or early_out_delta_max > 0, \ f"The early out threshold must be strictly positive to be useful. You gave {early_out_delta_max}." self._glsim = glsim self._early_out_delta_max=early_out_delta_max self.current_quantity = 0 self._enable_alpha_recording = enable_alpha_recording if isinstance(record_path, str): self.record_path = Path(record_path) else: self.record_path = record_path # This is used mainly during the unit tests to avoid recording too much stuff. self._record = record # self._results_store is None if we don't want to record. self._results_store = None self.quantities = np.array([]) # init like that to silence PyLance's warning self.simulation_finished = False # self.h_convergence = None # self._last_h = None self.last_recorded_time = 0 self.last_step_duration = None # Full timer will be initialized at full_run UNLESS it was # previously initialized by a restart from record. self._full_timer = None # delta represents the last difference measured on h,qx,qy from # one frame to the other. It reminds how decision about early # simulation stop, was made. self._last_early_out_delta = None self._zoom = 1 self._mean = 0 self.records = [] self._previous_results = None self._still_simulation_count = 0 # When working with a synthetic scenario, that's the place # where you can find the bathymetry update file... # .npy is importnat because if you use np.save(...) it will # prepend that suffix automatically :-) if self.record_path: self._bathy_file = self.record_path / "simul.top_update.npy" if self._bathy_file.exists(): # Note the last modification time so that we can detect a # new modification later on. self._bathy_file_mtime = os.stat(self._bathy_file).st_mtime else: self._bathy_file_mtime = None else: self._bathy_file = None # Upon initialisation, the GL Viewport must be set to the # needs of the user. We'll use it for the progress plotting. self._gl_window_viewport = glGetIntegerv(GL_VIEWPORT) self._gl_page_flip_func = page_flip_func self._timer_strategies = TimerManager(TIME_SAMPLING_RESOLUTION) self._last_global_state_snapshot_timestep = None self._reporting_time_sampler = None # defined here so one can call "run one step" easily self._setup_injectors(data_injector, glsim)
[docs] def _setup_injectors(self, data_injector, glsim: GLSimulation): if isinstance(data_injector, InjectorWrapper): self._data_injectors: List[InjectorWrapper] = [ data_injector] elif isinstance(data_injector, list): self._data_injectors: List[InjectorWrapper] = data_injector elif data_injector is None: self._data_injectors: List[InjectorWrapper] = [] else: raise Exception(f"Don't know what to do with {data_injector}") # Now we detect if we don't have (space) overlaping injectors. if len(self._data_injectors) >= 2: t = np.zeros((glsim.width, glsim.height), np.bool8) for injector_wrapper in self._data_injectors: inj = injector_wrapper.injector if np.sum(t[inj.active_zone()]) == 0: t[inj.active_zone()] = 1 else: raise ValueError(f"One of the injection ({inj.active_zone()}) zones steps on another one") self._injector_time_sampler = None # defined here so one can call "run one step" easily
@property
[docs] def early_out_threshold(self) -> float: return self._early_out_delta_max
@classmethod
[docs] def continue_a_simulation( klass, sim:SimpleSimulation, result_store_path: Union[Path, str], refresh_view: float=0.1, early_out_threshold = None, injector:SimulationInjector = None ) -> ResultsStore: opengl_context = OpenGLContext(sim, GlWindowManagerInterface.GLFW) gpu_simulator:GLSimulation = load_simple_sim_to_gpu(sim) simulation_runner = SimulationRunner( gpu_simulator, record=True, record_path=result_store_path, early_out_delta_max=early_out_threshold, page_flip_func=opengl_context.page_flipper, data_injector=injector) # For debugging klass._simulation_runner = simulation_runner simulation_runner.restart_from_record(None) #full_run(refresh_view=refresh_view) simulation_runner.full_run(refresh_view=refresh_view) # To avoid issues when having several simulations runs. But be aware # that because of this, one cannot query the `sim` passed that point # (since OpenGL stuff will be wiped out). gpu_simulator.drop_gl_resources() opengl_context.destroy_context() return simulation_runner.results_store()
@classmethod
[docs] def _prepare_quick_run(klass, sim:SimpleSimulation, record_path: Union[Path, str], refresh_view: float=0.1, early_out_threshold = None, gl_wmi: GlWindowManagerInterface = GlWindowManagerInterface.PYGAME, injector:SimulationInjector = None, tiling:TilePackingMode = TilePackingMode.REGULAR) -> "SimulationRunner": """ Run a full simulation quickly. By quickly we mean a lot of the setup of the simulator (such as OpenGL context) is done automatically. In particular a progress window is displayed. :param sim: the simulation you want to run :param record_path: a directory where to record the results of the simulation. We accept `str` as well as `Path`. :param refresh_view: how often the progress window is displyaed (every N seconds). :return: At the end of the simulation a ´ResultStore´ is returned. It is open in read only mode, so that it can be used to query your results. """ assert isinstance(sim, SimpleSimulation) assert isinstance(gl_wmi, GlWindowManagerInterface) assert isinstance(record_path, str) or isinstance(record_path, Path) or record_path is None if isinstance(record_path, str): record_path = Path(record_path) # For the sake of backward compatibility we reuse any currently active OpenGL context. opengl_context = OpenGLContext(sim, gl_wmi) gpu_simulator:GLSimulation = load_simple_sim_to_gpu(sim) infiltration_chronology = InfiltrationChronology.from_list( sim.infiltrations_chronology) if isinstance(injector, SimulationInjector): injector = [InjectorWrapper(injector, sim, infiltration_chronology)] elif isinstance( injector, list): injector = [InjectorWrapper(inj, sim, infiltration_chronology) for inj in injector] elif injector is not None: raise Exception(f"Don't know what to do with {injector}") simulation_runner = SimulationRunner(gpu_simulator,record=record_path is not None, record_path=record_path, early_out_delta_max=early_out_threshold, page_flip_func=opengl_context.page_flipper, data_injector=injector) # FIXME pîggybacking simulation_runner with glcontext is so-so. simulation_runner._opengl_context = opengl_context if injector and sim.has_infiltration(): # We have an injector at work and we have some infiltrations zones. # The difficult part here is that if the injector updates the # infiltration zones, then the quantity of infiltrated water per # cell may change (and thus needs to be recomputed). To make that # computation faster, we keep a copy of the infiltration zones which # belong to the active zone of the injector. We also keep a count of # the number of cells in each zones in the non-modified (by the # injector) infiltration zones. This way we may update these numbres # more easily. # The un-converted chronology (where the quantities where not divided by the # area of infitlration zones) simulation_runner._infiltration_chronology_user = infiltration_chronology simulation_runner._infiltration_zones_count = count_infiltration_cells( sim.infiltration_zones, infiltration_chronology.nb_zones) else: simulation_runner._infiltration_chronology_user = None simulation_runner._infiltration_zones_injection_zone = None simulation_runner._infiltration_zones_count = None simulation_runner._infiltration_zones_count_active_zone = None # For debugging klass._simulation_runner = simulation_runner return simulation_runner
@classmethod
[docs] def quick_run(klass, sim:SimpleSimulation, record_path: Union[Path, str], refresh_view: float=0.1, early_out_threshold = None, gl_wmi: GlWindowManagerInterface = GlWindowManagerInterface.PYGAME, injector:Union[SimulationInjector, List[SimulationInjector]] = None, tiling:TilePackingMode = TilePackingMode.REGULAR ) -> ResultsStore: """ Run a full simulation quickly. By quickly we mean a lot of the setup of the simulator (such as OpenGL context) is done automatically. In particular a progress window is displayed. :param sim: the simulation you want to run :param record_path: a directory where to record the results of the simulation. We accept `str` as well as `Path`. :param refresh_view: how often the progress window is displyaed (every N seconds). :param injector: A `SimulationInjector` or a list of them. :return: At the end of the simulation a ´ResultStore´ is returned. It is open in read only mode, so that it can be used to query your results. """ assert isinstance(sim, SimpleSimulation) assert isinstance(gl_wmi, GlWindowManagerInterface) assert isinstance(record_path, str) or isinstance(record_path, Path) or record_path is None simulation_runner = SimulationRunner._prepare_quick_run( sim, record_path, refresh_view, early_out_threshold, gl_wmi, injector, tiling ) simulation_runner.full_run(refresh_view=refresh_view) return SimulationRunner._finish_quickrun(simulation_runner)
@classmethod
[docs] def _finish_quickrun(klass, simulation_runner: "SimulationRunner") -> ResultsStore: # To avoid issues when having several simulations runs. But be aware # that because of this, one cannot query the `sim` passed that point # (since OpenGL stuff will be wiped out). simulation_runner._glsim.drop_gl_resources() simulation_runner._opengl_context.destroy_context() if simulation_runner.record_path is not None: return ResultsStore(simulation_runner.results_path(), "r") else: return None
[docs] def scanned_file(self): return self._bathy_file
@property
[docs] def step_num(self): """ The step that is going to be computed. Starts at zero. """ return self._glsim._current_step
[docs] def restart_from_record(self, start_rec=None): """ Restart a simulation from a previous record. This is useful when you want to continue a simulation that has been interrupted. When restarting from a record, it means that the record in question will be deleted and simulation will go on from there. :param start_rec: the index of the record you want to start from. If None, the simulation will restart from the last record. """ # start_rec == None: restart from end of the simulation. if not self.record_file().exists(): raise Exception("While trying to restart from previous records, I didn't " f"find {self.record_file()}. Maybe you are trying to " "restart a simulation that has never been started before ?") # Load the result store with current data self._results_store = ResultsStore( self.record_file(), mode="a") if start_rec is not None: self._results_store.truncate_at_report(start_rec) t,dt,n_iter_dry_up_euler, n_iter_dry_up_rk, h,qx,qy = self._results_store.get_last_result() # Reload the state on the step we want to start from self.last_recorded_time = self._results_store.get_last_named_result(ResultType.T) # We can't know the last Δt for sure because most often we record every # N steps. Therefore the last Δt doesn't make much sense. self.last_step_duration = None # The first time step numbe is 0. It represents the initial conditions. # When one asks a 100 step simulation, then we'll have the step numbers # 0,1,...,100: 0 (I.c.) + 100 steps == 101 records. s = self._results_store.get_last_named_result(ResultType.STEP_NUM) self._full_timer = time.time() - self._results_store.get_last_named_result(ResultType.CLOCK_T) self._glsim.reset_globals(self.last_recorded_time, self._glsim.time_step, s, h, qx, qy) # Yeah, tricky. That's because what we have is the start time of the # last record (which may be a group of simulation steps, of which we # can't know the overall duration if it is an optimized time step) so we # have to put ourselves back there... self._results_store.truncate_at_report(self._results_store.nb_results - 1) self.simulation_finished = False logging.info(f"Ready to restart from step {self._glsim._current_step}.")
[docs] def results_path(self) -> Path: assert self._results_store is not None, "Simulation has not produced any results" return self._results_store.path
[docs] def results_store(self) -> ResultsStore: """ Gives a result store back. The result store will be opened in read mode. """ return ResultsStore(self.results_path(), "r")
[docs] def record_file(self): # Path of the result store (a directory) return self.record_path
[docs] def do_record(self, texture_ndx, global_state, force=False): """ Record current state of the simulation. Be careful, this implies reading data from the GPU which is *slow* ! """ assert self.step_num >= 0, "must be zero based" assert global_state is not None alarm_value = global_state.time_step emergency_stop = alarm_value is not None and (isnan(alarm_value) or alarm_value < 0) if force: # We can force the reporting (used in debug) should_we_record = True elif self._record: # Has the user enabled the reporting ? should_we_record = True elif emergency_stop: # Something went worng in the computation should_we_record = True else: should_we_record = False if should_we_record: if emergency_stop: logging.error("Emergency stop! Recording additional matrices in current directory.") np.save("q0", self._glsim.read_quantity_result(0)) np.save("q1", self._glsim.read_quantity_result(1)) np.save("alpha0", self._glsim.read_alpha(0)) np.save("alpha1", self._glsim.read_alpha(1)) with time_it("record step"): self.last_recorded_time, self.last_step_duration, nb_dryup_iterations, nb_active_tiles_cumulated = \ global_state.simulation_time, global_state.simulation_step, global_state.nb_dryup_iterations, global_state.total_active_tiles # print(f"t={self.simulation_time:.4f} acive line={active_inf_line} zones={inf_zones}") # print(self._glsim.read_active_cells()) # print(read_texture(self._glsim._infiltration_fb, GL_COLOR_ATTACHMENT0, 64, 64, GL_R32I)[55:,55:]) #assert self.last_step_duration >= 0 #print(f"\nnb_dryup_iterations: {nb_dryup_iterations}") self.quantities = self._glsim.read_tile_packed_quantity_result(texture_ndx) h, hu, hv = self.quantities[:,:,0], self.quantities[:,:,1], self.quantities[:,:,2] #np.save(Path(self.record_path, f"{self._glsim.basefilename}{self.step_num:07}.GPU_RH"), h) # if False: # plt.imshow(h,origin='lower') # plt.show() if MEM_TRACKING: self.records.append([ self.last_recorded_time, np.max(h), np.mean(h), np.max(hu), np.max(hv)]) # Our step num has not been incremented yet, but glsim's done already. assert self.step_num == self._glsim._current_step, \ "The steps numbers must be in sync because they're used" \ " while restarting a sim from an unfinished one " \ f"{self.step_num} == {self._glsim._current_step}" # A mostly dry mesh is a mesh where water height is not zero but # very small (< MOSTLY_DRY_MESHES_THRESHOLD m). nb_mostly_dry_meshes = np.count_nonzero((h < MOSTLY_DRY_MESHES_THRESHOLD) & (h > 0)) self._results_store.append_result( self.step_num, self.last_recorded_time, self.last_step_duration, nb_dryup_iterations, nb_active_tiles_cumulated, h, hu, hv, None, None, clock_time=time.time() - self._full_timer, delta_early_out=self._last_early_out_delta or 0, nb_mostly_dry_meshes=nb_mostly_dry_meshes) if self._enable_alpha_recording: # Stores the last alpha maps # FIXME I save the two of them because the correct one to take # depends on the number of dry up cancellation iterations that were # done and I don't know that yet (I should put it in the # globals). alpha_correction = self._glsim.read_tile_packed_alpha(0) self._results_store.append_additional_result("alpha_values0", alpha_correction[:,:,0]) self._results_store.append_additional_result("alpha_masks0", alpha_correction[:,:,1]) alpha_correction = self._glsim.read_tile_packed_alpha(1) self._results_store.append_additional_result("alpha_values1", alpha_correction[:,:,0]) self._results_store.append_additional_result("alpha_masks1", alpha_correction[:,:,1]) active_cells_packed = self._glsim.read_active_cells_map() self._results_store.append_additional_result( "active_cells_packed", active_cells_packed ) # alpha_correction = read_texture( # self._glsim.access_fb, GL_COLOR_ATTACHMENT0, self._glsim.width, self._glsim.height, GL_RGB32F) # self._results_store.additional_result("alpha", alpha_correction) # if self._glsim._debugging: # dbg1 = read_texture(self._glsim.access_fb, # GL_COLOR_ATTACHMENT2, self._glsim.width, self._glsim.height, GL_RGBA32F) # self._results_store.additional_result("debug1", dbg1) if emergency_stop: msg = f"NaN's were detected, simulation has been saved. Last written flip/flop={texture_ndx}" logging.error(msg) self._results_store.close() raise Exception(msg) return (h, hu, hv) else: return None
[docs] def texture_written_by_last_step(self) -> int: """ After running `run_one_step` this gives the number of the texture that was *written* to (zero or one; rememeber the GPU code often uses flip-flop textures, that's what you query here). """ return self.current_quantity
[docs] def texture_read_by_last_step(self) -> int: return 1 - self.current_quantity
[docs] def run_extra_step(self): """ When a simulation run is complete, this allows to run an additional step. This is useful in debugging. FIXME The way the reporting is updated is not clear right now. """ assert self.simulation_finished self.simulation_finished = False self.run_one_step()
[docs] def _check_simulation_is_progressing(self, h, hu, hv): # We end the simulation if things don't move anymore if self._previous_results is None: self._previous_results = (h, hu, hv) else: prev_h, prev_hu, prev_hv = self._previous_results delta_h = np.abs(h - prev_h) delta_hu = np.abs(hu - prev_hu) delta_hv = np.abs(hv - prev_hv) sum_delta = np.sum(delta_h + delta_hu + delta_hv) self._previous_results = (h, hu, hv) self._last_early_out_delta = sum_delta if self._early_out_delta_max is not None: #print(sum_delta) if sum_delta <= self._early_out_delta_max: self._still_simulation_count += 1 if self._still_simulation_count >= 3: logging.warning(f"Simulation is still, stopping at t={self.last_recorded_time}s after {self.step_num} iterations.") self.simulation_finished = True self.cancelled_early = True else: logging.info(f"Simulation is slowing down: |Δh|+|ΔQx|+|ΔQy| = {sum_delta}") else: # Reset count self._still_simulation_count = 0
[docs] def _refresh_display_parameters(self): # Recompute the min/max of the array # min/max is used to improve the display readability with time_it("Refresh zoom, read_tile_packed_quantity_result"): #print("read qty") p = self._glsim.read_tile_packed_quantity_result(0) # 1-self.current_quantity) h = p[:,:,0] # read water height msk = h > 0 if msk.any(): valid_heights = h[msk] # Avoid mean on empty arrays self._mean = (np.mean(valid_heights) + self._mean)/2 std = np.std(valid_heights) else: self._mean = 0 std = 0 # p = self._glsim.read_tile_packed_quantity_result(self.current_quantity) # h = p[:,:,0] # read water height # h_max = max(np.max(h), h_max) # How this works: # I average the old zoom value and the new one. # I want color to be distributed around the mean (0.5), from 0 to 1. # So the max deviation is 0.5. # I want to see 2 std => 2*std # I want everythign to be xformed back into 0..1 => I divide by 2*std. # If std happens to be zero, then we set it to 0.1 self._zoom = (0.5/(2*max(0.1,std)) + self._zoom)/2 logging.debug(f"Rezooming at mean={self._mean:3g}, zoom={self._zoom:3g}")
# if self.h_convergence and self._last_h is not None: # if abs(np.max(h - self._last_h)) < self.h_convergence: # logging.info("Simulation has converged") # self.simulation_finished = True # self._last_h = h
[docs] def _read_simulation_global_state(self) -> GLSimulationGlobalState: """ Read the current global state of the simulation. The raison d'être of this method is to allow to to cache the result of the query if the global state because it is expensive. """ # This caching mechanism is important because reading the global state # may be expensive (although I didn't actually check the cost). if self._last_global_state_snapshot_timestep != self._glsim._current_step: self._snapshot_global_state = self._glsim.read_global_state() return self._snapshot_global_state
[docs] def _handle_data_injector(self, proxy: SimulationProxy, data_injector_wrapper: InjectorWrapper, sim_finished_step: int, sim_finished_step_time: float, source_quantity: int, infiltration_chronology) -> SimulationProxy : data_injector: SimulationInjector = data_injector_wrapper.injector next_stop = data_injector.do_updates(proxy) if next_stop: self._timer_strategies.update_one_shot_timer(data_injector_wrapper.time_sampler, next_stop, sim_finished_step, sim_finished_step_time) # FIXME _next_stop is not used anymore self._glsim._next_stop = self._timer_strategies.next_timer_trigger_iteration else: # None means don't call the update anymore. pass return proxy
[docs] def _run_all_injectors(self, source_quantity:int, triggered_timers: set[TimeSampler], global_state:GLSimulationGlobalState) -> SimulationProxy: proxy = None for data_injector in self._data_injectors: if data_injector.time_sampler in triggered_timers: if proxy is None: # Proxy is created only if at least on injector needs it proxy = SimulationProxy( self._glsim, global_state, None, source_quantity, self._infiltration_chronology_user) proxy._set_zone_of_interest(data_injector._injector.active_zone()) self._handle_data_injector( proxy, data_injector, # At the end of the simulation step (in glsimulation), the # current step is increased by one. So if we refer to the # prefious sim time, we need to refere to the previous step # too, hence the -1. global_state.simulation_step, # The previous sim time is the time at teh beginning of the step. # FIXME We could argue the injectors are ran between two steps... global_state.simulation_time, source_quantity, self._infiltration_chronology_user) return proxy
[docs] def _apply_changes_in_proxy(self, proxy: SimulationProxy, source_quantity: int): # Some injector triggered, hence a proxy was set up. if proxy is not None: # Now let's check if some injector actually used the proxy if proxy._bathymetry_was_set: upload_np_array_to_gpu( GL_TEXTURE_RECTANGLE, GL_R32F, self._glsim._tile_packer.shuffle_and_pack_array( proxy._cached_bathymetry), texture_id=self._glsim.bathymetry_tex) if proxy._h_qx_qy_was_set: upload_np_array_to_gpu( GL_TEXTURE_RECTANGLE, GL_RGBA32F, self._glsim._tile_packer.shuffle_and_pack_array( proxy._cached_h_qx_qy), texture_id=self._glsim.quantity_tex[source_quantity]) if self._infiltration_chronology_user is not None: all_updated_counts = np.zeros( (self._infiltration_chronology_user.nb_zones,)) all_preupdate_counts = np.zeros( (self._infiltration_chronology_user.nb_zones,)) if proxy._infiltration_zones_was_set: for data_injector in self._data_injectors: # Count over zone of interest all_updated_counts += count_infiltration_cells( proxy._cached_infiltration_zones[data_injector.injector.active_zone()], self._infiltration_chronology_user.nb_zones) all_preupdate_counts += data_injector._non_updated_infiltration_zones_count_active_zone from wolfgpu.glsimulation import MESH_WITHOUT_INFILTRATION packed = self._glsim._tile_packer.shuffle_and_pack_array( np.ascontiguousarray(proxy._cached_infiltration_zones - 1), neutral_values=MESH_WITHOUT_INFILTRATION) upload_np_array_to_gpu( GL_TEXTURE_RECTANGLE, GL_R32I, packed, texture_id=self._glsim.infiltration_zones_texture) if proxy._infiltration_chronology_was_set or proxy._infiltration_zones_was_set: self._glsim.set_buffers( infiltration_timings= self._glsim._infiltrations_timings_as_gpu_array( self._infiltration_zones_count - all_preupdate_counts + all_updated_counts, self._infiltration_chronology_user))
[docs] def run_one_step(self, dont_close_results:bool= False): """ Run only one step of the simulation. Globally, this method does the following: - Run the GLSimulation for one step with the correct `source_quantity` - Record the results if needed - Inverse the `source_quantity` index - Perform some estimation of the next stop time :param dont_close_results: If True, the results store will not be closed """ if self.simulation_finished: raise Exception("Simulation is finished, you can't run it anymore") # step_num is currently self._glsim._current_step if self._glsim._current_step == 0: # Recording initial conditions first self.do_record(texture_ndx=0, global_state=self._read_simulation_global_state()) with time_it("Run one step"): source_quantity = self.current_quantity dest_quantity = 1 - source_quantity self._glsim.simulation_step(source_quantity) sim_finished_step = self._glsim._current_step triggered_timers = self._timer_strategies.update_clock_wall_timers() # print(f"{sim_finished_step} >= ? {self._timer_strategies.next_timer_trigger_iteration} {self._last_global_state_snapshot_timestep}") if sim_finished_step < 10 or self._timer_strategies.next_timer_trigger_iteration is None or sim_finished_step >= self._timer_strategies.next_timer_trigger_iteration: gs = self._read_simulation_global_state() if (self._glsim.simulation_duration.type == SimulationDurationType.STEPS and gs.simulation_step >= self._glsim.simulation_duration.duration) \ or (self._glsim.simulation_duration.type == SimulationDurationType.SECONDS and gs.simulation_time > self._glsim.simulation_duration.duration): #print(f"Finished {gs.simulation_step} steps / {gs.simulation_time} >= {self._glsim.simulation_duration}") self.simulation_finished = True # Circular buffer b = gs.simulation_step % INJECTION_TRIGGER_TIME_ESTIMATOR_DEPTH if gs.simulation_step >= INJECTION_TRIGGER_TIME_ESTIMATOR_DEPTH: steps = gs.steps_times[b:] + gs.steps_times[0:b] else: steps = gs.steps_times[0:b] # Give the past memorized time steps to the timer manager # so that it can better estimate next stop programmed # on a time basis. self._timer_strategies.set_simulation_time_steps(steps) # Updating all the timers triggered_timers |= self._timer_strategies.update(gs.simulation_step, gs.simulation_time) self._glsim._next_stop = self._timer_strategies.next_timer_trigger_iteration # print(f"synced at step {sim_finished_step}, time {sim_finished_step_time}") # print(f"Current step {sim_finished_step}. Next stop: {self._timer_strategies.next_timer_trigger_iteration}") # print(self._timer_strategies) # print(f"Triggered : {triggered_timers}") if self._reporting_time_sampler in triggered_timers: # self._glsim._current_step is increased at the end of a simulation step #print(f"Record step {gs.simulation_step} time {gs.simulation_time}") recorded_res = self.do_record(dest_quantity, global_state=gs) if recorded_res: h, hu, hv = recorded_res # ATTENTION! checking if the simulation is prorgressing requires # reading the unknowns vector. Since that is very expensive, # we decide that, for the moment, we only check the stop condition # for "free", that is when we do have some reporting. So, disabling # the reporting implies disabling the progress checking # FIXME I have been caught by that so I guess a regular user # will be surprised too... self._check_simulation_is_progressing(h, hu, hv) proxy = self._run_all_injectors(dest_quantity, triggered_timers, gs) if proxy is not None: # Some injector triggered, hence a proxy was set up. self._apply_changes_in_proxy(proxy, dest_quantity) # FIXME REAAALLLLY dirty. I do that because I need glsim to generate the "constans.frs" include # before I can load these shaders... if not hasattr(self, "_vertex_shader2"): self._vertex_shader2 = load_shader_from_file(Path(SHADER_PATH,"simplevertexshader.vs")) self._active_tiles_shader2 = load_shader_from_file(Path(SHADER_PATH,"active_tiles_quad.gs")) self._texture_viewer_shader2 = load_shader_from_file(Path(SHADER_PATH,"textureViewerShader.frs")) self._texture_viewer_program2 = load_program(self._vertex_shader2, self._texture_viewer_shader2, self._active_tiles_shader2) self._vertex_shader = load_shader_from_file(Path(SHADER_PATH,"simplevertexshader.vs")) self._active_tiles_shader = load_shader_from_file(Path(SHADER_PATH,"active_tiles_quad.gs")) self._texture_viewer_shader = load_shader_from_file(Path(SHADER_PATH,"textureViewerShader.frs")) self._texture_viewer_program = load_program(self._vertex_shader, self._texture_viewer_shader) with time_it("Run one step - admin"): # -------------------------------------------------------------------- # Flip/flop the (source) current_quantity index if self.current_quantity == 0: self.current_quantity = 1 else: self.current_quantity = 0 if self.simulation_finished and not dont_close_results and self._results_store is not None: self._results_store.close() return triggered_timers
[docs] def plot_progress2(self): # FIXME Accessing _glsim's private stuff is bad abstraction... domain_size = self._glsim.width, self._glsim.height uniforms = { "zoom": float(self._zoom), "mean": float(self._mean), # FragCoord (screen) to texture coord "textureCoordinateTransform": ( float(domain_size[0]/self._gl_window_viewport[2]), float(domain_size[1]/self._gl_window_viewport[3])) } textures= { "quantityTexture": self._glsim.quantity_tex[1-self.current_quantity], "tileIndirection" : self._glsim._tile_indirection_tex, } #print(self._glsim.nbx, self._glsim.nby) glUseProgram(self._texture_viewer_program2) glBindFramebuffer(GL_FRAMEBUFFER, GL_NONE) # draw on screen glClear(GL_COLOR_BUFFER_BIT) self._glsim._draw_active_tiles(self._texture_viewer_program2, uniforms, textures, self._gl_window_viewport)
# def plot_progress(self): # if self._glsim._optim_geom_shaders: # return self.plot_progress2() # # glUseProgram(self._texture_viewer_program) # # #self._zoom = 0.5 # # set_uniform(self._texture_viewer_program, "zoom", float(self._zoom)) # # set_uniform(self._texture_viewer_program, "mean", float(self._mean)) # # # For reaon unbeknown to me, this doesn't give what I want # # # as window width/height... # # # window_width, window_height = pygame.display.get_surface().get_size() # # r = (self._glsim.nbx/self._glsim._window_width, # # self._glsim.nby/self._glsim._window_height) # # set_uniform(self._texture_viewer_program, "textureCoordinateTransform", r) # # glBindFramebuffer(GL_FRAMEBUFFER, GL_NONE) # draw on screen # # glClear(GL_COLOR_BUFFER_BIT) # # # FIXME Encapsulation is no good here # # set_texture(self._texture_viewer_program, "sTexture", # # self._glsim.quantity_tex[1-self.current_quantity], 0) # # x,y,w,h = 0,0,self._glsim._window_width,self._glsim._window_height # # self._glsim._draw_active_tiles(self._texture_viewer_program,x,y,w,h) # # # self._glsim._draw_active_tiles(self._texture_viewer_program, # # # 0, self._glsim.height//4, # # # self._glsim.width, self._glsim.height//4) # def has_handle(self, fpath): # for proc in psutil.process_iter(): # try: # for item in proc.open_files(): # if fpath == item.path: # return True # except Exception as ex: # print(ex) # pass # return False
[docs] def file_in_use(self, f): try: os.rename(f, f) return False except OSError: return True
[docs] def _try_reload_bathymetry(self): try: # Using a try catch seems overkill, but we had something # like that happening in 2023. Error was: # [WinError 5] Accès refusé ... bath_exists = self._bathy_file is not None and self._bathy_file.exists() except Exception as ex: logging.error(f"Can't check if {self._bathy_file} exists: {ex}") bath_exists = False if bath_exists: try: new_time = os.stat(str(self._bathy_file)).st_mtime if new_time != self._bathy_file_mtime: print() # Nicer CLI logging.info(f"Loading bathymetry file for update: {self._bathy_file}") self._bathy_file_mtime = new_time if not self.file_in_use(str(self._bathy_file)): # from wolfhece.wolf_array import WolfArray_Sim2D # a = WolfArray_Sim2D(fname=str(self._bathy_file)) # a.preload = True # a.read_all() # logging.info(f"Bathymetry file shape is {a.array.shape} {a.nbx}x{a.nby}") # self._glsim.set_buffers(bathymetry_array=np.ascontiguousarray(np.transpose(a.array))) try: # We transpose because we expect the user will pass us an array # following the usual Wolf convention. a = np.load(self._bathy_file).transpose() except Exception as ex: logging.error(f"Can't load bathymetry file {self._bathy_file}. Numpy can't read it, it says: \"{ex}\"") return if a.shape != (self._glsim.nby, self._glsim.nbx): if self._glsim.nby == a.shape[1] and self._glsim.nbx == a.shape[0]: suggestion = " Maybe you need to transpose ?" else: suggestion = "" logging.error(f"The shape of the bathymetry update file is not correct. You gave x={a.shape[0]}, y={a.shape[1]} columns. I expect {self._glsim.nbx} and {self._glsim.nby}.{suggestion}") return self._glsim.set_buffers(bathymetry_array=a) logging.info(f"Loaded bathymetry file {self._bathy_file}, shape is {a.shape}") if self._results_store is not None: gs = self._glsim.read_global_state() self._results_store.add_event(SimulationEventType.BathymetryUpdate, simulation_time= gs.simulation_time, simulation_step=self.step_num, data=self._bathy_file) # z = np.zeros((128,128), dtype=np.float32) # z[:,0:10] = 100 # self._glsim.set_buffers(bathymetry_array=z) else: logging.error(f"The bathymetry file {self._bathy_file} is opened by another process") # else: # logging.warn(f"The bathymetry file {self.record_path} didn't change since the last time I've seen it") except Exception as ex: logging.error(f"Something went wrong: {ex}")
[docs] def init_result_store(self): if self._record: if self._results_store is None: self._results_store = ResultsStore( self.record_file(), mode="w", tile_packer=self._glsim.tile_packer())
[docs] def full_run(self, refresh_view:int = 1, dont_close_results:bool= False, refresh_colormap:int = 10): """ Perform a full simulation run. :param refresh_view: Refresh the view every `refresh_view` seconds :param dont_close_results: Leave the result store open :param refresh_colormap: Refresh the colormap every `refresh_colormap * refresh_view` seconds """ self._setup_full_run(refresh_view, refresh_update_colormap= refresh_colormap) NB_ITER_STATS=100 iteration_times = np.zeros( (NB_ITER_STATS,)) if MEM_TRACKING: mem_tracker = tracker.SummaryTracker() # {percentage:3.0f}% with tqdm.tqdm(total=100, bar_format="{desc}|{bar}|{postfix}", ncols=80) as taqadum: running = True # Inidcates if the user or the early-out condition was met before # the planned end of the simulation. self.cancelled_early = False # result store might be already opened if the user wants # to continue an existing simulation. self.init_result_store() while running and not self.simulation_finished: triggered_timers = self.run_one_step(dont_close_results) # if triggered_timers: # print([str(tt) for tt in triggered_timers]) with time_it("-- Page flip"): refresh = self._refresh_view_time_sampler in triggered_timers if (self.step_num == 1 or refresh): # Two plot progress else double buffering shows flicker if self._display_parameters_time_sampler in triggered_timers: self._refresh_display_parameters() self.plot_progress2() # Swapping buffers avoids flickering # It is only useful if we are in a windowed mode self._gl_page_flip_func() if True: if MEM_TRACKING and self.step_num % 40 == 0: mem_tracker.print_diff() log_mem_used() self._admin_tasks(iteration_times, triggered_timers, taqadum) running = self._must_continue_event() self._teardown_full_run(iteration_times)
[docs] def _admin_tasks(self, iteration_times:np.ndarray, triggered_timers:set, taqadum:tqdm.tqdm): """ Perform administrative tasks such as updating the progress bar, compute indicators or reload bathymetry. """ NB_ITER_STATS = iteration_times.shape[0] with time_it("-- admin tasks"): iteration_times[self.step_num % NB_ITER_STATS] = time.time() if self.step_num > NB_ITER_STATS: # step_num+1 MOD ... is the *first* in the list !!! stats_total_duration = iteration_times[self.step_num % NB_ITER_STATS] - iteration_times[(self.step_num +1) % NB_ITER_STATS] average_duration = stats_total_duration / NB_ITER_STATS active_meshes_per_second = self._glsim.nb_active_meshes/average_duration pt = f"{(active_meshes_per_second/1_000_000)/average_duration:.1f}" self.meshes_per_second = pt else: pt = "?? " #print("yo-4") # Round so that we actually see the time step increments #print(f"debug: {self._glsim.last_step_duration}") # self.last_step_duration > 0, the step duration is recorded at the beginning # of the iteration => it may have never been initialized (at begin of sim, or at beginiing of restart) gs = self._read_simulation_global_state() if gs.simulation_step is not None and gs.simulation_step > 0: # total_time_simulation_str = str( # round(self._glsim.last_gpu_time, # ceil(abs(log10(self._glsim.current_simulation_time_step))))) + "s" total_time_simulation_str = seconds_to_duration_str(gs.simulation_time) last_duration_str = f"{gs.time_step:.4g}s" elif gs.simulation_time is not None: total_time_simulation_str = seconds_to_duration_str(gs.simulation_time) last_duration_str = "--" else: total_time_simulation_str = "--" last_duration_str = "--" if self._glsim.simulation_duration.type == SimulationDurationType.SECONDS: #print(100 * self.last_recorded_time / self._glsim.simulation_duration.duration) percent_done = 100* max(self.last_recorded_time or 0, gs.simulation_time or 0) / self._glsim.simulation_duration.duration else: percent_done = 100*self.step_num / self._glsim.simulation_duration.duration percent_done = min(round(percent_done),100) stats_total_duration = iteration_times[self.step_num % NB_ITER_STATS] - iteration_times[(self.step_num +1) % NB_ITER_STATS] average_duration = stats_total_duration / NB_ITER_STATS iter_per_sec = 1 / average_duration # logging_iterations += 1 if self._logger_time_sampler in triggered_timers or self.simulation_finished: if self._record: if self._results_store.nb_results > 1: rec_flag = f"[{self._results_store.nb_results} records] " elif self._results_store.nb_results == 1: rec_flag = f"[{self._results_store.nb_results} record] " else: rec_flag = "[No record]" else: rec_flag = "" fact_rk = 2 if self._glsim.ponderation_runge_kutta is not None and self._glsim.ponderation_runge_kutta != 1 else 1 taqadum.set_postfix_str(f"{iter_per_sec:.2f} it./sec.") taqadum.set_description(f"{rec_flag} t={total_time_simulation_str} Δt={last_duration_str} {gs.nb_active_tiles * fact_rk * 16*16/1_000_000:.2f}M cells/sec. {gs.nb_dryup_iterations} dryups iter.") taqadum.update(percent_done - taqadum.n) # logging_iterations = 0 self._try_reload_bathymetry()
[docs] def _must_continue_event(self) -> bool: """ Check if the simulation was cancelled early by the user or by an early-out condition. :return bool: True if the simulation should continue, False if it should stop. """ # FIXME Dirty. We're bypassing a glfw context here... if self._gl_page_flip_func == pygame.display.flip: for event in pygame.event.get(): if event.type == pygame.QUIT: running = False self.cancelled_early = True elif event.type == pygame.KEYDOWN: if event.key in (pygame.K_ESCAPE, pygame.K_q): running = False self.cancelled_early = True return not self.cancelled_early
[docs] def _setup_full_run(self, refresh_view:int=1, refresh_update_colormap:int = 10): """ Prepare the simulation for a full run. :param refresh_view: Refresh the view every `refresh_view` seconds. :param refresh_update_colormap: Refresh the colormap every `refresh_view * refresh_update_colormap` seconds. """ logging.debug(f"FINISH={FINISH} STRIPES={STRIPES_ON_NAP}") if self._glsim.nb_active_meshes <= 64*64: # If sim too small, then one-shot textures get in the way. warning = "(!!! very approximate !!!)" else: warning = "(approximate)" logging.info(f"Total memory for textures: {total_textures_size()/1000_000:.1f}Mb, bytes per mesh {warning}:{int(total_textures_size()/self._glsim.nb_active_meshes)}") # Make sure the reporting frequency is meaningful if self._glsim.report_frequency.type == self._glsim.simulation_duration.type: assert self._glsim.report_frequency.duration <= self._glsim.simulation_duration.duration, \ f"Report frequency {self._glsim.report_frequency} is above simulation's duration {self._glsim.simulation_duration}" # debug if self._full_timer is None: # Init unless already initialized (most likely because we restarted a simulation) self._full_timer = time.time() self._reporting_time_sampler = TimeSampler.make_periodic_timer(self._glsim.report_frequency, name="WriteReport") self._refresh_view_time_sampler = TimeSampler.make_periodic_wall_clock_timer(period=SimulationDuration.from_seconds(refresh_view), triggers_on_zero=True, name="RefreshViewGl") self._display_parameters_time_sampler = TimeSampler.make_periodic_wall_clock_timer(period=SimulationDuration.from_seconds(refresh_view*refresh_update_colormap), triggers_on_zero=True, name="RefreshViewParam") self._logger_time_sampler = TimeSampler.make_periodic_wall_clock_timer(period=SimulationDuration.from_seconds(0.5), triggers_on_zero=True, name="ConsoleLogger") self._timer_strategies.reset() self._timer_strategies.add_strategy( self._reporting_time_sampler) self._timer_strategies.add_strategy( self._refresh_view_time_sampler) self._timer_strategies.add_strategy( self._display_parameters_time_sampler) self._timer_strategies.add_strategy( self._logger_time_sampler) for data_injector in self._data_injectors: self._timer_strategies.add_strategy(data_injector.time_sampler)
[docs] def _teardown_full_run(self, iteration_times): NB_ITER_STATS = iteration_times.shape[0] if self.cancelled_early: # When you stop early or by hand in the middle of two record steps # which are 1 hours apart, you don't want to loose too much # information (say half an hour). So we record the last moment. self.do_record(texture_ndx=1 - self.current_quantity, force=True, global_state=self._glsim.read_global_state()) # Yes, I had the equality once. So, you see, these things, they happen. new_time = time.time() while new_time == self._full_timer: new_time = time.time() self.total_duration = time.time() - self._full_timer self.iter_per_second = self.step_num / self.total_duration if self._record: logging.info(f"WolfGPU records stored in {self.record_file().resolve()}") logging.debug(f"Last delta = {self._last_early_out_delta}.") if self.step_num >= NB_ITER_STATS: stats_total_duration = iteration_times[self.step_num % NB_ITER_STATS] - iteration_times[(self.step_num +1) % NB_ITER_STATS] average_duration = stats_total_duration / NB_ITER_STATS if self._record: recs = f"{self._results_store.nb_results} records." else: recs = "No records." logging.info(f"{self.iter_per_second:g} iter/sec overall. {1/average_duration:.2f} it./s over the last {NB_ITER_STATS} iterations. {self.step_num} steps. {recs}")