import logging
import os
import re
import shutil
import numpy as np
import numpy.ma as ma
from typing import Union
from pathlib import Path
from importlib.metadata import version
[docs]
wolf_hece_version = tuple(map(int, version("wolfhece").split('.')))
if wolf_hece_version <= (1,8,11):
from wolfhece.mesh2d.wolf2dprev import BoundaryConditionsTypes
elif wolf_hece_version >= (2,0,5):
from wolfhece.mesh2d.cst_2D_boundary_conditions import BCType_2D as BoundaryConditionsTypes
else:
# 1.8.13, 2.0.1 are broken for us :-(
raise Exception(f"I don't recognize this version of wolfhece : {wolf_hece_version}")
from wolfhece.mesh2d.wolf2dprev import prev_parameters_simul, prev_sim2D
# from wolfhece.PyGui import Wolf2DModel # Deprecated because Wolf2DModel is used only for GUI
from wolfhece.wolf_array import (WolfArray, WolfArrayMB,WolfArrayMNAP)
from wolfgpu.test_scenarios import create_swimming_pool
[docs]
def swimpool_constant(c, width: int = 100, height: int = 50) -> np.ndarray:
""" Create a constant swimming pool for testing purposes. """
a = np.ones((width, height)) * c
a[:, [0, 1, -1, -2]] = 0
a[[0, -1], :] = 0
return a
[docs]
def write_wolf_array(wa: WolfArray, generic_name: Path):
# wa is WolfArray or one of its descendent.
logging.info(f"writing {generic_name}")
# if statements' order is important because of inheritance.
if isinstance(wa, WolfArrayMNAP):
# suffix is not in the original filename, setting it ourselves.
wa.filename = str(generic_name.with_suffix(".MNAP"))
wa.write_all()
elif isinstance(wa, WolfArrayMB):
wa: WolfArrayMB
wa.filename = str(generic_name.with_suffix(Path(wa.filename).suffix))
if len(wa.myblocks) >= 1:
wa.write_all()
elif isinstance(wa, WolfArray):
wa.filename = str(generic_name.with_suffix(Path(wa.filename).suffix))
wa.write_all()
else:
raise Exception(f"Unsupported type of array {type(wa)}")
[docs]
def write_trl(params: prev_parameters_simul, generic_name:Path):
with open(generic_name.with_suffix(".trl"), "w") as f:
f.write("Translation de la simulation pour obtenir les axes locaux\n")
f.write(f"{params._fine_mesh_translx:g}\n")
f.write(f"{params._fine_mesh_transly:g}\n")
[docs]
def write_simulation(wolf2d: prev_sim2D, wolf_cli_path: Path):
# shutil.rmtree(destination_path)
logging.info(f"Writing Wolf sim to {wolf2d.mydir}")
# Python cannot delete exe files after having shutil.copy-ed them.
if Path(wolf2d.mydir).exists():
[
f.unlink()
for f in Path(wolf2d.mydir).iterdir()
if f.is_file() and f.suffix != ".exe"
]
else:
os.makedirs(wolf2d.mydir, exist_ok=True)
wcp = Path(wolf2d.mydir) / "wolfcli.exe"
if wolf_cli_path is not None:
if wolf_cli_path.exists() and not wcp.exists():
shutil.copy(wolf_cli_path, wcp)
with open(Path(wolf2d.mydir) / "run.bat", "w") as fout:
fout.writelines(["wolfcli.exe run_wolf2d_prev genfile=simul"])
# shutil.copy(Path(source_path) / "simul.lst", Path(destination_path) / (generic_name+".lst"))
wolf2d.save()
# # FIXME The empty generic file
# # I don't create it because the "MNAP" files gets loaded
# # if the generic file exists... (look in the MNAP loading
# # code of PyGui.py). So, if I create a single block sim (no MNAP file)
# # and I add a generic (empty) file, then it tries to load
# # the MNAP file... And fails.
# # with open(Path(wolf2d.mydir) / wolf2d.filenamegen, "w") as fout:
# # pass
# # Legacy parameters indicating that we work with
# # several topo/qx/... matrices (was used to cut big
# # simulation domains in smaller pieces)
# with open(Path(wolf2d.mydir) / (wolf2d.filenamegen + ".lst"), "w") as fout:
# # Maybe we need as many zeros as they are blocks
# # PA notes this is a bug in current Fortran code.
# fout.write("0\n")
# generic_path = Path(wolf2d.mydir) / Path(wolf2d.filenamegen)
# # napbin = wolf2d.read_fine_array(".napbin")
# # napbin.suxsuy_contour(filename=str(generic_path))
# # # NAPBIN is special, it is np.int16
# # write_wolf_array(napbin, generic_path)
# # write_wolf_array(wolf2d.top, generic_path)
# # write_wolf_array(wolf2d.hbin, generic_path)
# # write_wolf_array(wolf2d.frot, generic_path)
# # write_wolf_array(wolf2d.qxbin, generic_path)
# # write_wolf_array(wolf2d.qybin, generic_path)
# # if wolf2d.myparam.ntyperead == 3:
# # # Multiblock writing 'cos WolfCli will expect that.
# # write_wolf_array(wolf2d.mymnap, generic_path)
# # write_wolf_array(wolf2d.hbinb, generic_path)
# # write_wolf_array(wolf2d.qxbinb, generic_path)
# # write_wolf_array(wolf2d.qybinb, generic_path)
# # write_wolf_array(wolf2d.topini, generic_path)
# # FIXME Although one specifies mono block, this seems
# # necessary...
# wolf2d.bloc_description.filename = generic_path.with_suffix(Path(wolf2d.bloc_description.filename).suffix)
# wolf2d.bloc_description.write_file()
# write_trl(wolf2d.parameters, generic_path)
# wolf2d.parameters.write_file(fn=str(generic_path))
# # # # FIXME Why do I need that ?
# # with open(Path(wolf2d.mydir) / (wolf2d.filenamegen + ".par"), "a") as fout:
# # fout.write('1\n""\n')
# # This file is used as a beacon to find the generic
# # name of the simulation. Normally it is filled with
# # parameters but those are not used by the wolfcli and WolfPython
# # applications. They are used by the VBA though.
# # with open(Path(wolf2d.mydir) / wolf2d.filenamegen,"w") as fout:
# # fout.write(f"""1,.1,1,0,0
# # 0,0,0,0,{wolf2d.myparam.impfbxgen},0,0,0
# # 1,1,.5,.25,1,2
# # 0,20,0,0
# # .5,1
# # {wolf2d.myparam.nxfin},{wolf2d.myparam.nyfin},{wolf2d.myparam.clf.origx},{wolf2d.myparam.clf.origy}
# # 0,0
# # 0,0,0
# # 1
# # 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
# # """)
[docs]
def force_load(wolf2d: prev_sim2D):
wolf2d.force_load()
# def force_load(wolf2d: prev_sim2D):
# """
# In the Wolf2DModel, the data loading is lazy.
# So here we force it to happen.
# """
# # FIXME in the constructor of WolfArray,
# # self.mask_data(self.nullvalue) is called.
# # but when read_all is called, that mask function
# # is not called. Really strange.
# wolf2d.napbin.preload = True
# wolf2d.napbin.read_all()
# # In wolf: <> 0 is active cell, == 0 is masked cell
# # In numpy: True=invalid/masked
# # assert not ma.is_masked( wolf2d.napbin.array), "I expect that Wolf doesn't apply masks when it loads its data"
# wolf2d.napbin.array = ma.MaskedArray(wolf2d.napbin.array, wolf2d.napbin.array == 0)
# c = wolf2d.napbin.count() # reset number of non masked elements
# wolf2d.top.preload = True
# wolf2d.top.read_all()
# wolf2d.top.array.mask = wolf2d.napbin.array.mask
# wolf2d.top.array.harden_mask()
# wolf2d.hbin.preload = True
# wolf2d.hbin.read_all()
# wolf2d.hbin.array.mask = wolf2d.napbin.array.mask
# wolf2d.hbin.array.harden_mask()
# wolf2d.frot.preload = True
# wolf2d.frot.read_all()
# wolf2d.frot.array.mask = wolf2d.napbin.array.mask
# wolf2d.frot.array.harden_mask()
# wolf2d.qxbin.preload = True
# wolf2d.qxbin.read_all()
# wolf2d.qxbin.array.mask = wolf2d.napbin.array.mask
# wolf2d.qxbin.array.harden_mask()
# wolf2d.qybin.preload = True
# wolf2d.qybin.read_all()
# wolf2d.qybin.array.mask = wolf2d.napbin.array.mask
# wolf2d.qybin.array.harden_mask()
# # The hasattr test are there to be abloe to load an "unfinished model" such
# # as those produced when building a test scenario in python from scratch.
# if hasattr(wolf2d, "qxbinb"):
# wolf2d.qxbinb.preload = True
# wolf2d.qxbinb.read_all()
# if len(wolf2d.qxbinb.myblocks) >= 1:
# # FIXME This test to prevent a possible bug in the implementation
# # of mask_data which relies on nb_blocks value
# # which is incorrecrtly set to one eventhough
# # myblocks length is 0.
# wolf2d.qxbinb.mask_data(wolf2d.qxbin.nullvalue)
# if hasattr(wolf2d, "hbinb"):
# wolf2d.hbinb.preload = True
# wolf2d.hbinb.read_all()
# def create_still_swimming_pool_left_to_right(destination_dir: Path,
# width: int = 100,
# height: int = 50) -> prev_sim2D:
# """
# IC: Water surface is flat.
# Water is moving left to right at some speed.
# There's no friction.
# BC: water gets in at the same speed as the IC. h is set at the
# same height than IC where the water leaves the simulation.
# Such a swimming pool must remain still.
# """
# assert destination_dir.is_dir()
# WATER_HEIGHT = np.float32(10.0)
# IC_SPEED = 2
# m: prev_sim2D = create_swimming_pool(destination_dir,
# water_height=WATER_HEIGHT,
# width=width, height=height)
# m.parameters.reset_all_boundary_conditions()
# # Don't forget, there's dead border and then there's some walls.
# wet_vrange = range(2, m.nby - 2) # Numpy coordinates
# wet_hrange = slice(1, m.nbx - 1)
# m.qxbin.array[wet_hrange, wet_vrange] = np.float32(IC_SPEED)
# m.frot.array[wet_hrange, wet_vrange] = 0
# m.save_arrays_modifications()
# for y in wet_vrange:
# # Left of sxwimming pool: water comes in
# # m.myparam.add_weak_bc_x(
# # 2, y+1, # grid coordinates
# # ntype=BoundaryConditionsTypes.QX,
# # value=IC_SPEED, orient='x')
# m.parameters.add_weak_bc_x(2,y + 1, # grid coordinates
# ntype=BoundaryConditionsTypes.QX,
# value=IC_SPEED)
# m.parameters.add_weak_bc_x(2,y + 1, # grid coordinates
# ntype=BoundaryConditionsTypes.QY,
# value=0,)
# # Right of swimming pool: height is set
# m.parameters.add_weak_bc_x(m.parameters.nbx,y + 1,
# ntype=BoundaryConditionsTypes.HMOD,
# value=WATER_HEIGHT,)
# return m
[docs]
def create_still_swimming_pool_left_to_right(destination_dir: Path,
width: int = 100,
height: int = 50,
WATER_HEIGHT = np.float32(10.0)) -> prev_sim2D:
"""
Water flows left to right
IC: Water surface is flat.
Water is moving left to right at some speed.
There's no friction.
BC: water gets in at the same speed as the IC. h is set at the
same height than IC where the water leaves the simulation.
Such a swimming pool must remain still.
"""
assert destination_dir.is_dir()
# WATER_HEIGHT = np.float32(10.0)
IC_SPEED = 2
m: prev_sim2D = create_swimming_pool(destination_dir,
water_height=WATER_HEIGHT,
width=width,
height=height)
m.parameters.reset_all_boundary_conditions()
# Don't forget, there's dead border and then there's some walls.
wet_vrange = range(2, m.nby - 2) # Numpy coordinates
wet_hrange = slice(1, m.nbx - 1)
m.qxbin.array[wet_hrange, wet_vrange] = np.float32(IC_SPEED)
m.frot.array[wet_hrange, wet_vrange] = np.float32(0.)
m.save_arrays_modifications()
for y in wet_vrange:
m.parameters.add_weak_bc_x(2, y + 1, # grid coordinates
ntype=BoundaryConditionsTypes.QX,
value=IC_SPEED)
m.parameters.add_weak_bc_x(2, y + 1,
ntype=BoundaryConditionsTypes.QY,
value=0 # grid coordinates
)
# Right of swimming pool: height is set
m.parameters.add_weak_bc_x(m.parameters.nbx, y + 1,
ntype=BoundaryConditionsTypes.HMOD,
value=WATER_HEIGHT)
return m
[docs]
def create_still_swimming_pool_right_to_left(destination_dir: Path,
width: int = 100,
height: int = 50,
WATER_HEIGHT = np.float32(10.0)) -> prev_sim2D:
"""
Water flows right to left
IC: Water level is flat and water is moving at some speed.
BC: water gets in at the same speed as the IC. h is set at the
same height then IC where the water leaves the simulation.
There's no friction.
"""
assert destination_dir.is_dir()
IC_SPEED = -2
m: prev_sim2D = create_swimming_pool(destination_dir,
water_height=WATER_HEIGHT,
width=width,
height=height)
m.set_gpu_test(reset_bc=True, optimize_dt=True)
# Don't forget, there's a dead border and then there's some walls.
wet_vrange = range(2, m.nby-2) # Numpy coordinates
wet_hrange = slice(1, m.nbx-1)
m.qxbin.array[wet_hrange,wet_vrange] = np.float32(IC_SPEED)
m.frot.array[wet_hrange,wet_vrange] = 0
m.save_arrays_modifications()
for y in wet_vrange:
# Right of swimming pool: water comes in
m.parameters.add_weak_bc_x(m.nbx, y+1, # grid coordinates
ntype=BoundaryConditionsTypes.QX,
value=IC_SPEED)
m.parameters.add_weak_bc_x(m.nbx, y+1, # grid coordinates
ntype=BoundaryConditionsTypes.QY,
value=0)
# Left of swimming pool: height is set
m.parameters.add_weak_bc_x(2, y+1,
ntype=BoundaryConditionsTypes.HMOD,
value=WATER_HEIGHT)
return m
[docs]
def run_wolf_cli(destination_dir: Path, generic_filename="simul"):
# destination_dir: wher WolfCLI is located
# Return a handler on which one can .wait()
import subprocess
import shlex
wcp = destination_dir / "wolfcli.exe"
if os.path.exists(str(wcp)):
cmd = str(wcp) + f" run_wolf2d_prev genfile={generic_filename}"
logging.info(
f"Running wolf cli in {destination_dir}, as {' '.join(shlex.split(cmd, posix=False))}"
)
return subprocess.Popen(
shlex.split(cmd, posix=False),
cwd=destination_dir,
stdout=subprocess.DEVNULL,
)
else:
raise Exception(f"Can't find {wcp}")
[docs]
def wait_for_wolf_cli_to_end(wolf_cli_process, destination_dir):
assert wolf_cli_process is not None
assert destination_dir is not None
import tqdm
import sys
from time import sleep
logging.info(f"Waiting completion of WolfCLI")
REG = re.compile(r"\*s\*3 ITERATION : +([0-9]+)")
REG_CPU = re.compile(r"\*s\*1 TEMPS TOTAL CPU : +([0-9.]+)")
last_wolfcli_iter = None
model: prev_sim2D = prev_sim2D(
dir=str(destination_dir), splash=False, in_gui=False
)
# sys.stdout trick, see : https://stackoverflow.com/questions/45862715/how-to-flush-tqdm-progress-bar-explicitly
progress_bar = tqdm.tqdm(range(model.parameters._nb_timesteps), leave=False, file=sys.stdout)
logging.info(f"Watching {Path(destination_dir)/'simul.NFO'}")
while wolf_cli_process.poll() is None:
try:
with open(Path(destination_dir) / "simul.NFO", "rb") as file:
file.seek(-1024, os.SEEK_END)
s = file.read().decode()
match = REG.findall(s)
if match:
i = match[-1]
progress_bar.update(int(i) - progress_bar.n)
except Exception as ex:
logging.error(ex)
pass
sleep(1)
sys.stdout.flush()
[docs]
def load_wolf_time_steps(base_file: Path):
TPS_CUMUL_RE = re.compile(r" \*s\*[0-9]+ TEMPS CUMULE : +([0-9.E-]+)")
ts = []
with open(base_file.with_suffix(".NFO")) as fin:
for line in fin.readlines():
m = TPS_CUMUL_RE.match(line)
if m:
ts.append(float(m.groups()[0]))
return ts
[docs]
def compute_min_step_size(h, qx, qy, courant, dx=1, dy=1):
u = qx / h
v = qy / h
speed = np.sqrt(u * u + v * v)
speed[h == 0] = 0
a = speed + np.sqrt(9.81 * h) # The g from wolfGPU (for comparisons)
b = np.ones_like(h) * 0.01
celerity = np.maximum(a, b) # avoid div by zero ?
d = courant * min(dx, dy) / celerity
time_step = np.min(np.minimum(d, 1))
return time_step, d
[docs]
def compute_froude(h, qx, qy):
u = qx / h
v = qy / h
speed = np.sqrt(u * u + v * v)
froude = speed / np.sqrt(9.81 * h)
return froude
[docs]
def set_report_frequency(m: prev_sim2D, freq: Union[str, int]):
if isinstance(freq, str) and "s" in freq:
# Report every freq seconds (simulation time)
m.parameters._writing_frequency = int(freq.replace("s", "").strip())
m.parameters._writing_mode = 1
else:
# Report every freq iteration
m.parameters._writing_frequency = int(freq)
m.parameters._writing_mode = 0
[docs]
def write_simulation_multiblock(wolf2d: prev_sim2D, wolfcli_path: Path):
# shutil.rmtree(destination_path)
# Python cannot delete exe files after having shutil.copy-ed them.
if Path(wolf2d.mydir).exists():
[
f.unlink()
for f in Path(wolf2d.mydir).iterdir()
if f.is_file() and f.suffix != ".exe"
]
else:
os.makedirs(wolf2d.mydir, exist_ok=True)
wcp = Path(wolf2d.mydir) / "wolfcli.exe"
if not wcp.exists():
shutil.copy(
Path(wolfcli_path) / "wolfcli.exe", Path(wolf2d.mydir) / "wolfcli.exe"
)
# shutil.copy(Path(source_path) / "simul.lst", Path(destination_path) / (generic_name+".lst"))
# # Legacy parameters indicating that we work with
# # several topo/qx/... matrices (was used to cut big
# # simulation domains in smaller pieces)
# with open(Path(wolf2d.mydir) / (wolf2d.filenamegen + ".lst"), "w") as fout:
# # Maybe we need as many zeros as they are blocks ?
# # PA notes this is a bug in current Fortran code.
# fout.write("0\n")
wolf2d.save()
# wolf2d.napbin.suxsuy_contour(filename=wolf2d.mydir)
# generic_path = Path(wolf2d.mydir)
# write_wolf_array(wolf2d.napbin, generic_path)
# write_wolf_array(wolf2d.top, generic_path)
# write_wolf_array(wolf2d.hbin, generic_path)
# write_wolf_array(wolf2d.frot, generic_path)
# write_wolf_array(wolf2d.qxbin, generic_path)
# write_wolf_array(wolf2d.qybin, generic_path)
# write_wolf_array(wolf2d.hbinb, generic_path)
# write_wolf_array(wolf2d.qxbinb, generic_path)
# write_wolf_array(wolf2d.qybinb, generic_path)
# write_wolf_array(wolf2d.topini, generic_path)
# write_wolf_array(wolf2d.mymnap, generic_path)
# wolf2d.myblocfile.filename = generic_path.with_suffix(
# Path(wolf2d.myblocfile.filename).suffix
# )
# wolf2d.myblocfile.write_file()
# write_trl(wolf2d.myparam, generic_path)
# wolf2d.myparam.write_file(fn=str(generic_path))
# # FIXME Why do I need that ?
# with open(Path(wolf2d.mydir) / (wolf2d.filenamegen + ".par"), "a") as fout:
# fout.write('1\n""\n')
# # This file is used as a beacon to find the generic
# # name of the simulation. Normally it is filled with
# # parameters but those are not used by the wolfcli and WolfPython
# # applications. They are used by the VBA though.
# with open(Path(wolf2d.mydir) / wolf2d.filenamegen, "w") as fout:
# fout.write(
# f"""1,.1,1,0,0
# 0,0,0,0,{wolf2d.myparam.impfbxgen},0,0,0
# 1,1,.5,.25,1,2
# 0,20,0,0
# .5,1
# {wolf2d.myparam.nxfin},{wolf2d.myparam.nyfin},{wolf2d.myparam.clf.origx},{wolf2d.myparam.clf.origy}
# 0,0
# 0,0,0
# 1
# 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
# """
# )