Source code for wolfgpu.wolf_utils

import logging
import os
import re
import shutil
import numpy as np
import numpy.ma as ma
from typing import Union
from pathlib import Path

from importlib.metadata import version
[docs] wolf_hece_version = tuple(map(int, version("wolfhece").split('.')))
if wolf_hece_version <= (1,8,11): from wolfhece.mesh2d.wolf2dprev import BoundaryConditionsTypes elif wolf_hece_version >= (2,0,5): from wolfhece.mesh2d.cst_2D_boundary_conditions import BCType_2D as BoundaryConditionsTypes else: # 1.8.13, 2.0.1 are broken for us :-( raise Exception(f"I don't recognize this version of wolfhece : {wolf_hece_version}") from wolfhece.mesh2d.wolf2dprev import prev_parameters_simul, prev_sim2D # from wolfhece.PyGui import Wolf2DModel # Deprecated because Wolf2DModel is used only for GUI from wolfhece.wolf_array import (WolfArray, WolfArrayMB,WolfArrayMNAP) from wolfgpu.test_scenarios import create_swimming_pool
[docs] def swimpool_constant(c, width: int = 100, height: int = 50) -> np.ndarray: """ Create a constant swimming pool for testing purposes. """ a = np.ones((width, height)) * c a[:, [0, 1, -1, -2]] = 0 a[[0, -1], :] = 0 return a
[docs] def write_wolf_array(wa: WolfArray, generic_name: Path): # wa is WolfArray or one of its descendent. logging.info(f"writing {generic_name}") # if statements' order is important because of inheritance. if isinstance(wa, WolfArrayMNAP): # suffix is not in the original filename, setting it ourselves. wa.filename = str(generic_name.with_suffix(".MNAP")) wa.write_all() elif isinstance(wa, WolfArrayMB): wa: WolfArrayMB wa.filename = str(generic_name.with_suffix(Path(wa.filename).suffix)) if len(wa.myblocks) >= 1: wa.write_all() elif isinstance(wa, WolfArray): wa.filename = str(generic_name.with_suffix(Path(wa.filename).suffix)) wa.write_all() else: raise Exception(f"Unsupported type of array {type(wa)}")
[docs] def write_trl(params: prev_parameters_simul, generic_name:Path): with open(generic_name.with_suffix(".trl"), "w") as f: f.write("Translation de la simulation pour obtenir les axes locaux\n") f.write(f"{params._fine_mesh_translx:g}\n") f.write(f"{params._fine_mesh_transly:g}\n")
[docs] def write_simulation(wolf2d: prev_sim2D, wolf_cli_path: Path): # shutil.rmtree(destination_path) logging.info(f"Writing Wolf sim to {wolf2d.mydir}") # Python cannot delete exe files after having shutil.copy-ed them. if Path(wolf2d.mydir).exists(): [ f.unlink() for f in Path(wolf2d.mydir).iterdir() if f.is_file() and f.suffix != ".exe" ] else: os.makedirs(wolf2d.mydir, exist_ok=True) wcp = Path(wolf2d.mydir) / "wolfcli.exe" if wolf_cli_path is not None: if wolf_cli_path.exists() and not wcp.exists(): shutil.copy(wolf_cli_path, wcp) with open(Path(wolf2d.mydir) / "run.bat", "w") as fout: fout.writelines(["wolfcli.exe run_wolf2d_prev genfile=simul"]) # shutil.copy(Path(source_path) / "simul.lst", Path(destination_path) / (generic_name+".lst")) wolf2d.save()
# # FIXME The empty generic file # # I don't create it because the "MNAP" files gets loaded # # if the generic file exists... (look in the MNAP loading # # code of PyGui.py). So, if I create a single block sim (no MNAP file) # # and I add a generic (empty) file, then it tries to load # # the MNAP file... And fails. # # with open(Path(wolf2d.mydir) / wolf2d.filenamegen, "w") as fout: # # pass # # Legacy parameters indicating that we work with # # several topo/qx/... matrices (was used to cut big # # simulation domains in smaller pieces) # with open(Path(wolf2d.mydir) / (wolf2d.filenamegen + ".lst"), "w") as fout: # # Maybe we need as many zeros as they are blocks # # PA notes this is a bug in current Fortran code. # fout.write("0\n") # generic_path = Path(wolf2d.mydir) / Path(wolf2d.filenamegen) # # napbin = wolf2d.read_fine_array(".napbin") # # napbin.suxsuy_contour(filename=str(generic_path)) # # # NAPBIN is special, it is np.int16 # # write_wolf_array(napbin, generic_path) # # write_wolf_array(wolf2d.top, generic_path) # # write_wolf_array(wolf2d.hbin, generic_path) # # write_wolf_array(wolf2d.frot, generic_path) # # write_wolf_array(wolf2d.qxbin, generic_path) # # write_wolf_array(wolf2d.qybin, generic_path) # # if wolf2d.myparam.ntyperead == 3: # # # Multiblock writing 'cos WolfCli will expect that. # # write_wolf_array(wolf2d.mymnap, generic_path) # # write_wolf_array(wolf2d.hbinb, generic_path) # # write_wolf_array(wolf2d.qxbinb, generic_path) # # write_wolf_array(wolf2d.qybinb, generic_path) # # write_wolf_array(wolf2d.topini, generic_path) # # FIXME Although one specifies mono block, this seems # # necessary... # wolf2d.bloc_description.filename = generic_path.with_suffix(Path(wolf2d.bloc_description.filename).suffix) # wolf2d.bloc_description.write_file() # write_trl(wolf2d.parameters, generic_path) # wolf2d.parameters.write_file(fn=str(generic_path)) # # # # FIXME Why do I need that ? # # with open(Path(wolf2d.mydir) / (wolf2d.filenamegen + ".par"), "a") as fout: # # fout.write('1\n""\n') # # This file is used as a beacon to find the generic # # name of the simulation. Normally it is filled with # # parameters but those are not used by the wolfcli and WolfPython # # applications. They are used by the VBA though. # # with open(Path(wolf2d.mydir) / wolf2d.filenamegen,"w") as fout: # # fout.write(f"""1,.1,1,0,0 # # 0,0,0,0,{wolf2d.myparam.impfbxgen},0,0,0 # # 1,1,.5,.25,1,2 # # 0,20,0,0 # # .5,1 # # {wolf2d.myparam.nxfin},{wolf2d.myparam.nyfin},{wolf2d.myparam.clf.origx},{wolf2d.myparam.clf.origy} # # 0,0 # # 0,0,0 # # 1 # # 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 # # """)
[docs] def force_load(wolf2d: prev_sim2D): wolf2d.force_load()
# def force_load(wolf2d: prev_sim2D): # """ # In the Wolf2DModel, the data loading is lazy. # So here we force it to happen. # """ # # FIXME in the constructor of WolfArray, # # self.mask_data(self.nullvalue) is called. # # but when read_all is called, that mask function # # is not called. Really strange. # wolf2d.napbin.preload = True # wolf2d.napbin.read_all() # # In wolf: <> 0 is active cell, == 0 is masked cell # # In numpy: True=invalid/masked # # assert not ma.is_masked( wolf2d.napbin.array), "I expect that Wolf doesn't apply masks when it loads its data" # wolf2d.napbin.array = ma.MaskedArray(wolf2d.napbin.array, wolf2d.napbin.array == 0) # c = wolf2d.napbin.count() # reset number of non masked elements # wolf2d.top.preload = True # wolf2d.top.read_all() # wolf2d.top.array.mask = wolf2d.napbin.array.mask # wolf2d.top.array.harden_mask() # wolf2d.hbin.preload = True # wolf2d.hbin.read_all() # wolf2d.hbin.array.mask = wolf2d.napbin.array.mask # wolf2d.hbin.array.harden_mask() # wolf2d.frot.preload = True # wolf2d.frot.read_all() # wolf2d.frot.array.mask = wolf2d.napbin.array.mask # wolf2d.frot.array.harden_mask() # wolf2d.qxbin.preload = True # wolf2d.qxbin.read_all() # wolf2d.qxbin.array.mask = wolf2d.napbin.array.mask # wolf2d.qxbin.array.harden_mask() # wolf2d.qybin.preload = True # wolf2d.qybin.read_all() # wolf2d.qybin.array.mask = wolf2d.napbin.array.mask # wolf2d.qybin.array.harden_mask() # # The hasattr test are there to be abloe to load an "unfinished model" such # # as those produced when building a test scenario in python from scratch. # if hasattr(wolf2d, "qxbinb"): # wolf2d.qxbinb.preload = True # wolf2d.qxbinb.read_all() # if len(wolf2d.qxbinb.myblocks) >= 1: # # FIXME This test to prevent a possible bug in the implementation # # of mask_data which relies on nb_blocks value # # which is incorrecrtly set to one eventhough # # myblocks length is 0. # wolf2d.qxbinb.mask_data(wolf2d.qxbin.nullvalue) # if hasattr(wolf2d, "hbinb"): # wolf2d.hbinb.preload = True # wolf2d.hbinb.read_all() # def create_still_swimming_pool_left_to_right(destination_dir: Path, # width: int = 100, # height: int = 50) -> prev_sim2D: # """ # IC: Water surface is flat. # Water is moving left to right at some speed. # There's no friction. # BC: water gets in at the same speed as the IC. h is set at the # same height than IC where the water leaves the simulation. # Such a swimming pool must remain still. # """ # assert destination_dir.is_dir() # WATER_HEIGHT = np.float32(10.0) # IC_SPEED = 2 # m: prev_sim2D = create_swimming_pool(destination_dir, # water_height=WATER_HEIGHT, # width=width, height=height) # m.parameters.reset_all_boundary_conditions() # # Don't forget, there's dead border and then there's some walls. # wet_vrange = range(2, m.nby - 2) # Numpy coordinates # wet_hrange = slice(1, m.nbx - 1) # m.qxbin.array[wet_hrange, wet_vrange] = np.float32(IC_SPEED) # m.frot.array[wet_hrange, wet_vrange] = 0 # m.save_arrays_modifications() # for y in wet_vrange: # # Left of sxwimming pool: water comes in # # m.myparam.add_weak_bc_x( # # 2, y+1, # grid coordinates # # ntype=BoundaryConditionsTypes.QX, # # value=IC_SPEED, orient='x') # m.parameters.add_weak_bc_x(2,y + 1, # grid coordinates # ntype=BoundaryConditionsTypes.QX, # value=IC_SPEED) # m.parameters.add_weak_bc_x(2,y + 1, # grid coordinates # ntype=BoundaryConditionsTypes.QY, # value=0,) # # Right of swimming pool: height is set # m.parameters.add_weak_bc_x(m.parameters.nbx,y + 1, # ntype=BoundaryConditionsTypes.HMOD, # value=WATER_HEIGHT,) # return m
[docs] def create_still_swimming_pool_left_to_right(destination_dir: Path, width: int = 100, height: int = 50, WATER_HEIGHT = np.float32(10.0)) -> prev_sim2D: """ Water flows left to right IC: Water surface is flat. Water is moving left to right at some speed. There's no friction. BC: water gets in at the same speed as the IC. h is set at the same height than IC where the water leaves the simulation. Such a swimming pool must remain still. """ assert destination_dir.is_dir() # WATER_HEIGHT = np.float32(10.0) IC_SPEED = 2 m: prev_sim2D = create_swimming_pool(destination_dir, water_height=WATER_HEIGHT, width=width, height=height) m.parameters.reset_all_boundary_conditions() # Don't forget, there's dead border and then there's some walls. wet_vrange = range(2, m.nby - 2) # Numpy coordinates wet_hrange = slice(1, m.nbx - 1) m.qxbin.array[wet_hrange, wet_vrange] = np.float32(IC_SPEED) m.frot.array[wet_hrange, wet_vrange] = np.float32(0.) m.save_arrays_modifications() for y in wet_vrange: m.parameters.add_weak_bc_x(2, y + 1, # grid coordinates ntype=BoundaryConditionsTypes.QX, value=IC_SPEED) m.parameters.add_weak_bc_x(2, y + 1, ntype=BoundaryConditionsTypes.QY, value=0 # grid coordinates ) # Right of swimming pool: height is set m.parameters.add_weak_bc_x(m.parameters.nbx, y + 1, ntype=BoundaryConditionsTypes.HMOD, value=WATER_HEIGHT) return m
[docs] def create_still_swimming_pool_right_to_left(destination_dir: Path, width: int = 100, height: int = 50, WATER_HEIGHT = np.float32(10.0)) -> prev_sim2D: """ Water flows right to left IC: Water level is flat and water is moving at some speed. BC: water gets in at the same speed as the IC. h is set at the same height then IC where the water leaves the simulation. There's no friction. """ assert destination_dir.is_dir() IC_SPEED = -2 m: prev_sim2D = create_swimming_pool(destination_dir, water_height=WATER_HEIGHT, width=width, height=height) m.set_gpu_test(reset_bc=True, optimize_dt=True) # Don't forget, there's a dead border and then there's some walls. wet_vrange = range(2, m.nby-2) # Numpy coordinates wet_hrange = slice(1, m.nbx-1) m.qxbin.array[wet_hrange,wet_vrange] = np.float32(IC_SPEED) m.frot.array[wet_hrange,wet_vrange] = 0 m.save_arrays_modifications() for y in wet_vrange: # Right of swimming pool: water comes in m.parameters.add_weak_bc_x(m.nbx, y+1, # grid coordinates ntype=BoundaryConditionsTypes.QX, value=IC_SPEED) m.parameters.add_weak_bc_x(m.nbx, y+1, # grid coordinates ntype=BoundaryConditionsTypes.QY, value=0) # Left of swimming pool: height is set m.parameters.add_weak_bc_x(2, y+1, ntype=BoundaryConditionsTypes.HMOD, value=WATER_HEIGHT) return m
[docs] def run_wolf_cli(destination_dir: Path, generic_filename="simul"): # destination_dir: wher WolfCLI is located # Return a handler on which one can .wait() import subprocess import shlex wcp = destination_dir / "wolfcli.exe" if os.path.exists(str(wcp)): cmd = str(wcp) + f" run_wolf2d_prev genfile={generic_filename}" logging.info( f"Running wolf cli in {destination_dir}, as {' '.join(shlex.split(cmd, posix=False))}" ) return subprocess.Popen( shlex.split(cmd, posix=False), cwd=destination_dir, stdout=subprocess.DEVNULL, ) else: raise Exception(f"Can't find {wcp}")
[docs] def wait_for_wolf_cli_to_end(wolf_cli_process, destination_dir): assert wolf_cli_process is not None assert destination_dir is not None import tqdm import sys from time import sleep logging.info(f"Waiting completion of WolfCLI") REG = re.compile(r"\*s\*3 ITERATION : +([0-9]+)") REG_CPU = re.compile(r"\*s\*1 TEMPS TOTAL CPU : +([0-9.]+)") last_wolfcli_iter = None model: prev_sim2D = prev_sim2D( dir=str(destination_dir), splash=False, in_gui=False ) # sys.stdout trick, see : https://stackoverflow.com/questions/45862715/how-to-flush-tqdm-progress-bar-explicitly progress_bar = tqdm.tqdm(range(model.parameters._nb_timesteps), leave=False, file=sys.stdout) logging.info(f"Watching {Path(destination_dir)/'simul.NFO'}") while wolf_cli_process.poll() is None: try: with open(Path(destination_dir) / "simul.NFO", "rb") as file: file.seek(-1024, os.SEEK_END) s = file.read().decode() match = REG.findall(s) if match: i = match[-1] progress_bar.update(int(i) - progress_bar.n) except Exception as ex: logging.error(ex) pass sleep(1) sys.stdout.flush()
[docs] def load_wolf_time_steps(base_file: Path): TPS_CUMUL_RE = re.compile(r" \*s\*[0-9]+ TEMPS CUMULE : +([0-9.E-]+)") ts = [] with open(base_file.with_suffix(".NFO")) as fin: for line in fin.readlines(): m = TPS_CUMUL_RE.match(line) if m: ts.append(float(m.groups()[0])) return ts
[docs] def compute_min_step_size(h, qx, qy, courant, dx=1, dy=1): u = qx / h v = qy / h speed = np.sqrt(u * u + v * v) speed[h == 0] = 0 a = speed + np.sqrt(9.81 * h) # The g from wolfGPU (for comparisons) b = np.ones_like(h) * 0.01 celerity = np.maximum(a, b) # avoid div by zero ? d = courant * min(dx, dy) / celerity time_step = np.min(np.minimum(d, 1)) return time_step, d
[docs] def compute_froude(h, qx, qy): u = qx / h v = qy / h speed = np.sqrt(u * u + v * v) froude = speed / np.sqrt(9.81 * h) return froude
[docs] def set_report_frequency(m: prev_sim2D, freq: Union[str, int]): if isinstance(freq, str) and "s" in freq: # Report every freq seconds (simulation time) m.parameters._writing_frequency = int(freq.replace("s", "").strip()) m.parameters._writing_mode = 1 else: # Report every freq iteration m.parameters._writing_frequency = int(freq) m.parameters._writing_mode = 0
[docs] def write_simulation_multiblock(wolf2d: prev_sim2D, wolfcli_path: Path): # shutil.rmtree(destination_path) # Python cannot delete exe files after having shutil.copy-ed them. if Path(wolf2d.mydir).exists(): [ f.unlink() for f in Path(wolf2d.mydir).iterdir() if f.is_file() and f.suffix != ".exe" ] else: os.makedirs(wolf2d.mydir, exist_ok=True) wcp = Path(wolf2d.mydir) / "wolfcli.exe" if not wcp.exists(): shutil.copy( Path(wolfcli_path) / "wolfcli.exe", Path(wolf2d.mydir) / "wolfcli.exe" ) # shutil.copy(Path(source_path) / "simul.lst", Path(destination_path) / (generic_name+".lst")) # # Legacy parameters indicating that we work with # # several topo/qx/... matrices (was used to cut big # # simulation domains in smaller pieces) # with open(Path(wolf2d.mydir) / (wolf2d.filenamegen + ".lst"), "w") as fout: # # Maybe we need as many zeros as they are blocks ? # # PA notes this is a bug in current Fortran code. # fout.write("0\n") wolf2d.save()
# wolf2d.napbin.suxsuy_contour(filename=wolf2d.mydir) # generic_path = Path(wolf2d.mydir) # write_wolf_array(wolf2d.napbin, generic_path) # write_wolf_array(wolf2d.top, generic_path) # write_wolf_array(wolf2d.hbin, generic_path) # write_wolf_array(wolf2d.frot, generic_path) # write_wolf_array(wolf2d.qxbin, generic_path) # write_wolf_array(wolf2d.qybin, generic_path) # write_wolf_array(wolf2d.hbinb, generic_path) # write_wolf_array(wolf2d.qxbinb, generic_path) # write_wolf_array(wolf2d.qybinb, generic_path) # write_wolf_array(wolf2d.topini, generic_path) # write_wolf_array(wolf2d.mymnap, generic_path) # wolf2d.myblocfile.filename = generic_path.with_suffix( # Path(wolf2d.myblocfile.filename).suffix # ) # wolf2d.myblocfile.write_file() # write_trl(wolf2d.myparam, generic_path) # wolf2d.myparam.write_file(fn=str(generic_path)) # # FIXME Why do I need that ? # with open(Path(wolf2d.mydir) / (wolf2d.filenamegen + ".par"), "a") as fout: # fout.write('1\n""\n') # # This file is used as a beacon to find the generic # # name of the simulation. Normally it is filled with # # parameters but those are not used by the wolfcli and WolfPython # # applications. They are used by the VBA though. # with open(Path(wolf2d.mydir) / wolf2d.filenamegen, "w") as fout: # fout.write( # f"""1,.1,1,0,0 # 0,0,0,0,{wolf2d.myparam.impfbxgen},0,0,0 # 1,1,.5,.25,1,2 # 0,20,0,0 # .5,1 # {wolf2d.myparam.nxfin},{wolf2d.myparam.nyfin},{wolf2d.myparam.clf.origx},{wolf2d.myparam.clf.origy} # 0,0 # 0,0,0 # 1 # 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 # """ # )