Source code for wolfgpu.wolf_utils

"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import logging
import os
import re
import shutil
import numpy as np
import numpy.ma as ma
from typing import Union
from pathlib import Path

from importlib.metadata import version
[docs] wolf_hece_version = tuple(map(int, version("wolfhece").split('.')))
if wolf_hece_version <= (1,8,11): from wolfhece.mesh2d.wolf2dprev import BoundaryConditionsTypes elif wolf_hece_version >= (2,0,5): from wolfhece.mesh2d.cst_2D_boundary_conditions import BCType_2D as BoundaryConditionsTypes else: # 1.8.13, 2.0.1 are broken for us :-( raise Exception(f"I don't recognize this version of wolfhece : {wolf_hece_version}") from wolfhece.mesh2d.wolf2dprev import prev_parameters_simul, prev_sim2D # from wolfhece.PyGui import Wolf2DModel # Deprecated because Wolf2DModel is used only for GUI from wolfhece.wolf_array import (WolfArray, WolfArrayMB,WolfArrayMNAP) from .test_scenarios import create_swimming_pool
[docs] def swimpool_constant(c, width: int = 100, height: int = 50) -> np.ndarray: """ Create a constant swimming pool for testing purposes. """ a = np.ones((width, height)) * c a[:, [0, 1, -1, -2]] = 0 a[[0, -1], :] = 0 return a
[docs] def write_wolf_array(wa: WolfArray, generic_name: Path): # wa is WolfArray or one of its descendent. logging.info(f"writing {generic_name}") # if statements' order is important because of inheritance. if isinstance(wa, WolfArrayMNAP): # suffix is not in the original filename, setting it ourselves. wa.filename = str(generic_name.with_suffix(".MNAP")) wa.write_all() elif isinstance(wa, WolfArrayMB): wa: WolfArrayMB wa.filename = str(generic_name.with_suffix(Path(wa.filename).suffix)) if len(wa.myblocks) >= 1: wa.write_all() elif isinstance(wa, WolfArray): wa.filename = str(generic_name.with_suffix(Path(wa.filename).suffix)) wa.write_all() else: raise Exception(f"Unsupported type of array {type(wa)}")
[docs] def write_trl(params: prev_parameters_simul, generic_name:Path): with open(generic_name.with_suffix(".trl"), "w") as f: f.write("Translation de la simulation pour obtenir les axes locaux\n") f.write(f"{params._fine_mesh_translx:g}\n") f.write(f"{params._fine_mesh_transly:g}\n")
[docs] def write_simulation(wolf2d: prev_sim2D, wolf_cli_path: Path): # shutil.rmtree(destination_path) logging.info(f"Writing Wolf sim to {wolf2d.mydir}") # Python cannot delete exe files after having shutil.copy-ed them. if Path(wolf2d.mydir).exists(): [ f.unlink() for f in Path(wolf2d.mydir).iterdir() if f.is_file() and f.suffix != ".exe" ] else: os.makedirs(wolf2d.mydir, exist_ok=True) wcp = Path(wolf2d.mydir) / "wolfcli.exe" if wolf_cli_path is not None: if wolf_cli_path.exists() and not wcp.exists(): shutil.copy(wolf_cli_path, wcp) with open(Path(wolf2d.mydir) / "run.bat", "w") as fout: fout.writelines(["wolfcli.exe run_wolf2d_prev genfile=simul"]) # shutil.copy(Path(source_path) / "simul.lst", Path(destination_path) / (generic_name+".lst")) wolf2d.save()
# # FIXME The empty generic file # # I don't create it because the "MNAP" files gets loaded # # if the generic file exists... (look in the MNAP loading # # code of PyGui.py). So, if I create a single block sim (no MNAP file) # # and I add a generic (empty) file, then it tries to load # # the MNAP file... And fails. # # with open(Path(wolf2d.mydir) / wolf2d.filenamegen, "w") as fout: # # pass # # Legacy parameters indicating that we work with # # several topo/qx/... matrices (was used to cut big # # simulation domains in smaller pieces) # with open(Path(wolf2d.mydir) / (wolf2d.filenamegen + ".lst"), "w") as fout: # # Maybe we need as many zeros as they are blocks # # PA notes this is a bug in current Fortran code. # fout.write("0\n") # generic_path = Path(wolf2d.mydir) / Path(wolf2d.filenamegen) # # napbin = wolf2d.read_fine_array(".napbin") # # napbin.suxsuy_contour(filename=str(generic_path)) # # # NAPBIN is special, it is np.int16 # # write_wolf_array(napbin, generic_path) # # write_wolf_array(wolf2d.top, generic_path) # # write_wolf_array(wolf2d.hbin, generic_path) # # write_wolf_array(wolf2d.frot, generic_path) # # write_wolf_array(wolf2d.qxbin, generic_path) # # write_wolf_array(wolf2d.qybin, generic_path) # # if wolf2d.myparam.ntyperead == 3: # # # Multiblock writing 'cos WolfCli will expect that. # # write_wolf_array(wolf2d.mymnap, generic_path) # # write_wolf_array(wolf2d.hbinb, generic_path) # # write_wolf_array(wolf2d.qxbinb, generic_path) # # write_wolf_array(wolf2d.qybinb, generic_path) # # write_wolf_array(wolf2d.topini, generic_path) # # FIXME Although one specifies mono block, this seems # # necessary... # wolf2d.bloc_description.filename = generic_path.with_suffix(Path(wolf2d.bloc_description.filename).suffix) # wolf2d.bloc_description.write_file() # write_trl(wolf2d.parameters, generic_path) # wolf2d.parameters.write_file(fn=str(generic_path)) # # # # FIXME Why do I need that ? # # with open(Path(wolf2d.mydir) / (wolf2d.filenamegen + ".par"), "a") as fout: # # fout.write('1\n""\n') # # This file is used as a beacon to find the generic # # name of the simulation. Normally it is filled with # # parameters but those are not used by the wolfcli and WolfPython # # applications. They are used by the VBA though. # # with open(Path(wolf2d.mydir) / wolf2d.filenamegen,"w") as fout: # # fout.write(f"""1,.1,1,0,0 # # 0,0,0,0,{wolf2d.myparam.impfbxgen},0,0,0 # # 1,1,.5,.25,1,2 # # 0,20,0,0 # # .5,1 # # {wolf2d.myparam.nxfin},{wolf2d.myparam.nyfin},{wolf2d.myparam.clf.origx},{wolf2d.myparam.clf.origy} # # 0,0 # # 0,0,0 # # 1 # # 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 # # """)
[docs] def force_load(wolf2d: prev_sim2D): wolf2d.force_load()
# def force_load(wolf2d: prev_sim2D): # """ # In the Wolf2DModel, the data loading is lazy. # So here we force it to happen. # """ # # FIXME in the constructor of WolfArray, # # self.mask_data(self.nullvalue) is called. # # but when read_all is called, that mask function # # is not called. Really strange. # wolf2d.napbin.preload = True # wolf2d.napbin.read_all() # # In wolf: <> 0 is active cell, == 0 is masked cell # # In numpy: True=invalid/masked # # assert not ma.is_masked( wolf2d.napbin.array), "I expect that Wolf doesn't apply masks when it loads its data" # wolf2d.napbin.array = ma.MaskedArray(wolf2d.napbin.array, wolf2d.napbin.array == 0) # c = wolf2d.napbin.count() # reset number of non masked elements # wolf2d.top.preload = True # wolf2d.top.read_all() # wolf2d.top.array.mask = wolf2d.napbin.array.mask # wolf2d.top.array.harden_mask() # wolf2d.hbin.preload = True # wolf2d.hbin.read_all() # wolf2d.hbin.array.mask = wolf2d.napbin.array.mask # wolf2d.hbin.array.harden_mask() # wolf2d.frot.preload = True # wolf2d.frot.read_all() # wolf2d.frot.array.mask = wolf2d.napbin.array.mask # wolf2d.frot.array.harden_mask() # wolf2d.qxbin.preload = True # wolf2d.qxbin.read_all() # wolf2d.qxbin.array.mask = wolf2d.napbin.array.mask # wolf2d.qxbin.array.harden_mask() # wolf2d.qybin.preload = True # wolf2d.qybin.read_all() # wolf2d.qybin.array.mask = wolf2d.napbin.array.mask # wolf2d.qybin.array.harden_mask() # # The hasattr test are there to be abloe to load an "unfinished model" such # # as those produced when building a test scenario in python from scratch. # if hasattr(wolf2d, "qxbinb"): # wolf2d.qxbinb.preload = True # wolf2d.qxbinb.read_all() # if len(wolf2d.qxbinb.myblocks) >= 1: # # FIXME This test to prevent a possible bug in the implementation # # of mask_data which relies on nb_blocks value # # which is incorrecrtly set to one eventhough # # myblocks length is 0. # wolf2d.qxbinb.mask_data(wolf2d.qxbin.nullvalue) # if hasattr(wolf2d, "hbinb"): # wolf2d.hbinb.preload = True # wolf2d.hbinb.read_all() # def create_still_swimming_pool_left_to_right(destination_dir: Path, # width: int = 100, # height: int = 50) -> prev_sim2D: # """ # IC: Water surface is flat. # Water is moving left to right at some speed. # There's no friction. # BC: water gets in at the same speed as the IC. h is set at the # same height than IC where the water leaves the simulation. # Such a swimming pool must remain still. # """ # assert destination_dir.is_dir() # WATER_HEIGHT = np.float32(10.0) # IC_SPEED = 2 # m: prev_sim2D = create_swimming_pool(destination_dir, # water_height=WATER_HEIGHT, # width=width, height=height) # m.parameters.reset_all_boundary_conditions() # # Don't forget, there's dead border and then there's some walls. # wet_vrange = range(2, m.nby - 2) # Numpy coordinates # wet_hrange = slice(1, m.nbx - 1) # m.qxbin.array[wet_hrange, wet_vrange] = np.float32(IC_SPEED) # m.frot.array[wet_hrange, wet_vrange] = 0 # m.save_arrays_modifications() # for y in wet_vrange: # # Left of sxwimming pool: water comes in # # m.myparam.add_weak_bc_x( # # 2, y+1, # grid coordinates # # ntype=BoundaryConditionsTypes.QX, # # value=IC_SPEED, orient='x') # m.parameters.add_weak_bc_x(2,y + 1, # grid coordinates # ntype=BoundaryConditionsTypes.QX, # value=IC_SPEED) # m.parameters.add_weak_bc_x(2,y + 1, # grid coordinates # ntype=BoundaryConditionsTypes.QY, # value=0,) # # Right of swimming pool: height is set # m.parameters.add_weak_bc_x(m.parameters.nbx,y + 1, # ntype=BoundaryConditionsTypes.HMOD, # value=WATER_HEIGHT,) # return m
[docs] def create_still_swimming_pool_left_to_right(destination_dir: Path, width: int = 100, height: int = 50, WATER_HEIGHT = np.float32(10.0)) -> prev_sim2D: """ Water flows left to right IC: Water surface is flat. Water is moving left to right at some speed. There's no friction. BC: water gets in at the same speed as the IC. h is set at the same height than IC where the water leaves the simulation. Such a swimming pool must remain still. """ assert destination_dir.is_dir() # WATER_HEIGHT = np.float32(10.0) IC_SPEED = 2 m: prev_sim2D = create_swimming_pool(destination_dir, water_height=WATER_HEIGHT, width=width, height=height) m.parameters.reset_all_boundary_conditions() # Don't forget, there's dead border and then there's some walls. wet_vrange = range(2, m.nby - 2) # Numpy coordinates wet_hrange = slice(1, m.nbx - 1) m.qxbin.array[wet_hrange, wet_vrange] = np.float32(IC_SPEED) m.frot.array[wet_hrange, wet_vrange] = np.float32(0.) m.save_arrays_modifications() for y in wet_vrange: m.parameters.add_weak_bc_x(2, y + 1, # grid coordinates ntype=BoundaryConditionsTypes.QX, value=IC_SPEED) m.parameters.add_weak_bc_x(2, y + 1, ntype=BoundaryConditionsTypes.QY, value=0 # grid coordinates ) # Right of swimming pool: height is set m.parameters.add_weak_bc_x(m.parameters.nbx, y + 1, ntype=BoundaryConditionsTypes.HMOD, value=WATER_HEIGHT) return m
[docs] def create_still_swimming_pool_right_to_left(destination_dir: Path, width: int = 100, height: int = 50, WATER_HEIGHT = np.float32(10.0)) -> prev_sim2D: """ Water flows right to left IC: Water level is flat and water is moving at some speed. BC: water gets in at the same speed as the IC. h is set at the same height then IC where the water leaves the simulation. There's no friction. """ assert destination_dir.is_dir() IC_SPEED = -2 m: prev_sim2D = create_swimming_pool(destination_dir, water_height=WATER_HEIGHT, width=width, height=height) m.set_gpu_test(reset_bc=True, optimize_dt=True) # Don't forget, there's a dead border and then there's some walls. wet_vrange = range(2, m.nby-2) # Numpy coordinates wet_hrange = slice(1, m.nbx-1) m.qxbin.array[wet_hrange,wet_vrange] = np.float32(IC_SPEED) m.frot.array[wet_hrange,wet_vrange] = 0 m.save_arrays_modifications() for y in wet_vrange: # Right of swimming pool: water comes in m.parameters.add_weak_bc_x(m.nbx, y+1, # grid coordinates ntype=BoundaryConditionsTypes.QX, value=IC_SPEED) m.parameters.add_weak_bc_x(m.nbx, y+1, # grid coordinates ntype=BoundaryConditionsTypes.QY, value=0) # Left of swimming pool: height is set m.parameters.add_weak_bc_x(2, y+1, ntype=BoundaryConditionsTypes.HMOD, value=WATER_HEIGHT) return m
[docs] def run_wolf_cli(destination_dir: Path, generic_filename="simul"): # destination_dir: wher WolfCLI is located # Return a handler on which one can .wait() import subprocess import shlex wcp = destination_dir / "wolfcli.exe" if os.path.exists(str(wcp)): cmd = str(wcp) + f" run_wolf2d_prev genfile={generic_filename}" logging.info( f"Running wolf cli in {destination_dir}, as {' '.join(shlex.split(cmd, posix=False))}" ) return subprocess.Popen( shlex.split(cmd, posix=False), cwd=destination_dir, stdout=subprocess.DEVNULL, ) else: raise Exception(f"Can't find {wcp}")
[docs] def wait_for_wolf_cli_to_end(wolf_cli_process, destination_dir): assert wolf_cli_process is not None assert destination_dir is not None import tqdm import sys from time import sleep logging.info(f"Waiting completion of WolfCLI") REG = re.compile(r"\*s\*3 ITERATION : +([0-9]+)") REG_CPU = re.compile(r"\*s\*1 TEMPS TOTAL CPU : +([0-9.]+)") last_wolfcli_iter = None model: prev_sim2D = prev_sim2D( dir=str(destination_dir), splash=False, in_gui=False ) # sys.stdout trick, see : https://stackoverflow.com/questions/45862715/how-to-flush-tqdm-progress-bar-explicitly progress_bar = tqdm.tqdm(range(model.parameters._nb_timesteps), leave=False, file=sys.stdout) logging.info(f"Watching {Path(destination_dir)/'simul.NFO'}") while wolf_cli_process.poll() is None: try: with open(Path(destination_dir) / "simul.NFO", "rb") as file: file.seek(-1024, os.SEEK_END) s = file.read().decode() match = REG.findall(s) if match: i = match[-1] progress_bar.update(int(i) - progress_bar.n) except Exception as ex: logging.error(ex) pass sleep(1) sys.stdout.flush()
[docs] def load_wolf_time_steps(base_file: Path): TPS_CUMUL_RE = re.compile(r" \*s\*[0-9]+ TEMPS CUMULE : +([0-9.E-]+)") ts = [] with open(base_file.with_suffix(".NFO")) as fin: for line in fin.readlines(): m = TPS_CUMUL_RE.match(line) if m: ts.append(float(m.groups()[0])) return ts
[docs] def compute_min_step_size(h, qx, qy, courant, dx=1, dy=1): u = qx / h v = qy / h speed = np.sqrt(u * u + v * v) speed[h == 0] = 0 a = speed + np.sqrt(9.81 * h) # The g from wolfGPU (for comparisons) b = np.ones_like(h) * 0.01 celerity = np.maximum(a, b) # avoid div by zero ? d = courant * min(dx, dy) / celerity time_step = np.min(np.minimum(d, 1)) return time_step, d
[docs] def compute_froude(h, qx, qy): u = qx / h v = qy / h speed = np.sqrt(u * u + v * v) froude = speed / np.sqrt(9.81 * h) return froude
[docs] def set_report_frequency(m: prev_sim2D, freq: Union[str, int]): if isinstance(freq, str) and "s" in freq: # Report every freq seconds (simulation time) m.parameters._writing_frequency = int(freq.replace("s", "").strip()) m.parameters._writing_mode = 1 else: # Report every freq iteration m.parameters._writing_frequency = int(freq) m.parameters._writing_mode = 0
[docs] def write_simulation_multiblock(wolf2d: prev_sim2D, wolfcli_path: Path): # shutil.rmtree(destination_path) # Python cannot delete exe files after having shutil.copy-ed them. if Path(wolf2d.mydir).exists(): [ f.unlink() for f in Path(wolf2d.mydir).iterdir() if f.is_file() and f.suffix != ".exe" ] else: os.makedirs(wolf2d.mydir, exist_ok=True) wcp = Path(wolf2d.mydir) / "wolfcli.exe" if not wcp.exists(): shutil.copy( Path(wolfcli_path) / "wolfcli.exe", Path(wolf2d.mydir) / "wolfcli.exe" ) # shutil.copy(Path(source_path) / "simul.lst", Path(destination_path) / (generic_name+".lst")) # # Legacy parameters indicating that we work with # # several topo/qx/... matrices (was used to cut big # # simulation domains in smaller pieces) # with open(Path(wolf2d.mydir) / (wolf2d.filenamegen + ".lst"), "w") as fout: # # Maybe we need as many zeros as they are blocks ? # # PA notes this is a bug in current Fortran code. # fout.write("0\n") wolf2d.save()
# wolf2d.napbin.suxsuy_contour(filename=wolf2d.mydir) # generic_path = Path(wolf2d.mydir) # write_wolf_array(wolf2d.napbin, generic_path) # write_wolf_array(wolf2d.top, generic_path) # write_wolf_array(wolf2d.hbin, generic_path) # write_wolf_array(wolf2d.frot, generic_path) # write_wolf_array(wolf2d.qxbin, generic_path) # write_wolf_array(wolf2d.qybin, generic_path) # write_wolf_array(wolf2d.hbinb, generic_path) # write_wolf_array(wolf2d.qxbinb, generic_path) # write_wolf_array(wolf2d.qybinb, generic_path) # write_wolf_array(wolf2d.topini, generic_path) # write_wolf_array(wolf2d.mymnap, generic_path) # wolf2d.myblocfile.filename = generic_path.with_suffix( # Path(wolf2d.myblocfile.filename).suffix # ) # wolf2d.myblocfile.write_file() # write_trl(wolf2d.myparam, generic_path) # wolf2d.myparam.write_file(fn=str(generic_path)) # # FIXME Why do I need that ? # with open(Path(wolf2d.mydir) / (wolf2d.filenamegen + ".par"), "a") as fout: # fout.write('1\n""\n') # # This file is used as a beacon to find the generic # # name of the simulation. Normally it is filled with # # parameters but those are not used by the wolfcli and WolfPython # # applications. They are used by the VBA though. # with open(Path(wolf2d.mydir) / wolf2d.filenamegen, "w") as fout: # fout.write( # f"""1,.1,1,0,0 # 0,0,0,0,{wolf2d.myparam.impfbxgen},0,0,0 # 1,1,.5,.25,1,2 # 0,20,0,0 # .5,1 # {wolf2d.myparam.nxfin},{wolf2d.myparam.nyfin},{wolf2d.myparam.clf.origx},{wolf2d.myparam.clf.origy} # 0,0 # 0,0,0 # 1 # 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 # """ # )