"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
import wx
from wx.dataview import TreeListCtrl, TreeListItem
from wx import dataview, StaticText, TextCtrl
from os.path import exists,join,splitext,dirname
from os import scandir, chdir, getcwd
import numpy as np
import subprocess
import logging
from enum import Enum
import numpy as np
from collections import namedtuple
from pathlib import Path
from osgeo import gdal
from typing import Union, Literal
import json
import pandas as pd
from datetime import datetime as dt
import matplotlib.pyplot as plt
import types
from ..PyTranslate import _
from ..wolfresults_2D import Wolfresults_2D
from ..wolf_array import WolfArray, header_wolf, WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_SINGLE
from .check_scenario import check_file_update, check_file_bc, import_files
from .update_void import create_new_file as update_void
from .imposebc_void import create_new_file as bc_void
from ..wolf_vrt import create_vrt_from_files_first_based, translate_vrt2tif
from ..PyDraw import WolfMapViewer, draw_type
from ..PyHydrographs import Hydrograph
from .update_void import Update_Sim
from ..Results2DGPU import wolfres2DGPU
from ..PyParams import Wolf_Param
from ..PyVertexvectors import Zones, zone, vector, wolfvertex, getIfromRGB, getRGBfromI
# WOLFGPU
try:
from wolfgpu.simple_simulation import SimpleSimulation, SimulationDuration, SimulationDurationType
from wolfgpu.results_store import ResultsStore
except:
logging.error(_('WOLFGPU not installed !'))
# *****************
# ACCEPTED PREFIX
# *****************
# bath_*.tif
# mann_*.tif
# infil_*.tif
[docs]
ACCEPTED_PREFIX = ['bath_', 'mann_', 'infil_', 'roof_']
[docs]
def delete_folder(pth:Path):
for sub in pth.iterdir():
if sub.is_dir():
delete_folder(sub)
else:
sub.unlink()
pth.rmdir() # if you just want to delete the dir content but not the dir itself, remove this line
# extension des fichiers à vérifier
[docs]
class GPU_2D_file_extensions(Enum):
[docs]
TIFF = '.tiff' # raster
[docs]
PY = '.py' # python script
[docs]
NPY = '.npy' # numpy array
[docs]
BIN = '.bin' # WOLF binary file
[docs]
JSON = '.json' # json file
[docs]
TXT = '.txt' # hydrographs
[docs]
class IC_scenario(Enum):
[docs]
BATHYMETRY = 'bathymetry.npy'
[docs]
ALL_EXTENSIONS = [cur.value for cur in GPU_2D_file_extensions]
# Predefined keys
[docs]
WOLF_UPDATE = 'Wolf update'
[docs]
WOLF_BC = 'Wolf boundary conditions'
[docs]
OTHER_SCRIPTS = 'Other scripts'
[docs]
IS_SCENARIO = 'is_scenario'
[docs]
IS_RESULTS = 'is_results'
[docs]
HAS_RESULTS = 'has_results'
[docs]
DISCHARGES = 'discharges'
[docs]
INITIAL_CONDITIONS = 'initial_conditions'
# Définition d'un namedtuple pour représenter les fichiers d'une simulation GPU
[docs]
_gpu_file = namedtuple('gpufile', ['name', 'type', 'extension'])
# Liste des fichiers à vérifier
[docs]
class GPU_2D_file(Enum):
[docs]
PARAMETERS = _gpu_file('parameters', str , GPU_2D_file_extensions.JSON.value)
[docs]
BATHYMETRY = _gpu_file('bathymetry', np.float32, GPU_2D_file_extensions.NPY.value)
[docs]
WATER_DEPTH = _gpu_file('h' , np.float32, GPU_2D_file_extensions.NPY.value)
[docs]
DISCHARGE_X = _gpu_file('qx' , np.float32, GPU_2D_file_extensions.NPY.value)
[docs]
DISCHARGE_Y = _gpu_file('qy' , np.float32, GPU_2D_file_extensions.NPY.value)
[docs]
MANNING = _gpu_file('manning' , np.float32, GPU_2D_file_extensions.NPY.value)
[docs]
COMPUTATION_MASK = _gpu_file('nap' , np.uint8 , GPU_2D_file_extensions.NPY.value)
[docs]
INFILTRATION = _gpu_file('infiltration_zones', np.int32, GPU_2D_file_extensions.NPY.value)
# répertoire de sortie des simulations GPU
[docs]
RESULT_DIR = 'simul_gpu_results'
[docs]
class Hydrograph_scenario():
""" Hydrograph for a scenario """
def __init__(self, fname:Path, sep:str = '\t', decimal='.') -> None:
self._data = pd.read_csv(fname, sep=sep, decimal=decimal, header=0, index_col=0)
self._filename = fname
self._name = str(fname.with_suffix('').name)
@property
[docs]
def name(self) -> str:
return self._name
@property
[docs]
def data(self) -> pd.DataFrame:
return self._data
[docs]
def plot(self, figax = None):
if figax is None:
fig, ax = plt.subplots()
else:
fig,ax = figax
self._data.plot(ax=ax, drawstyle="steps-post")
return fig,ax
[docs]
class InitialConditions_scenario():
""" Initial conditions for a scenario """
def __init__(self, dir:Path) -> None:
self.h: np.ndarray = None
self.qx: np.ndarray = None
self.qy: np.ndarray = None
self.bathy: np.ndarray = None
if (dir / IC_scenario.WATERDEPTH.value).exists():
self.h = np.load(dir / IC_scenario.WATERDEPTH.value)
else:
logging.warning(_('No waterdepth file found !'))
if (dir / IC_scenario.DISCHARGE_X.value).exists():
self.qx = np.load(dir / IC_scenario.DISCHARGE_X.value)
else:
logging.warning(_('No discharge x file found !'))
if (dir / IC_scenario.DISCHARGE_Y.value).exists():
self.qy = np.load(dir / IC_scenario.DISCHARGE_Y.value)
else:
logging.warning(_('No discharge y file found !'))
if (dir /IC_scenario.BATHYMETRY.value).exists():
self.bathy = np.load(dir / IC_scenario.BATHYMETRY.value)
else:
logging.warning(_('No bathymetry file found !'))
@property
[docs]
def z_elevation(self) -> np.ndarray:
""" Return the elevation of the water surface """
return self.bathy + self.h
[docs]
def set_h_from_z(self, z: np.ndarray):
""" Set the water depth from the elevation of the water surface """
assert z.shape == self.bathy.shape, _('Bad shape for z !')
self.h = z - self.bathy
self.h[self.h < 0.] = 0.
self.qx[self.h < 0.] = 0.
self.qy[self.h < 0.] = 0.
[docs]
def save(self, dir:Path):
""" Save the initial conditions """
if self.h is not None:
np.save(dir / IC_scenario.WATERDEPTH.value, self.h)
if self.qx is not None:
np.save(dir / IC_scenario.DISCHARGE_X.value, self.qx)
if self.qy is not None:
np.save(dir / IC_scenario.DISCHARGE_Y.value, self.qy)
if self.bathy is not None:
np.save(dir / IC_scenario.BATHYMETRY.value, self.bathy)
[docs]
class Config_Manager_2D_GPU:
"""
Gestionnaire de configurations 2D - code GPU
"""
def __init__(self, workingdir:str = '', mapviewer:WolfMapViewer = None, python_venv:Path = None) -> None:
"""
Recherche de toutes les modélisation dans un répertoire et ses sous-répertoires
"""
self.wx_exists = wx.App.Get() is not None
self.workingdir:Path = None
self.wolfgpu:Path = None
self._py_env:Path = python_venv
if workingdir == '':
if self.wx_exists:
dlg = wx.DirDialog(None,_('Choose directory to scan'), style = wx.FD_OPEN)
ret = dlg.ShowModal()
if ret != wx.ID_OK:
dlg.Destroy()
return
workingdir = dlg.GetPath()
dlg.Destroy()
else:
logging.error(_('No working directory provided !'))
return
if not exists(workingdir):
logging.error(_('Directory does not exist !'))
return
self.find_wolfgpu()
self.workingdir = Path(workingdir)
self.mapviewer = mapviewer
self._txtctrl = None
self._ui = None
self._epsilon = 0.01
self._filter_independent = True
self.load_data()
[docs]
def find_wolfgpu(self):
""" Find the wolfgpu Path from wolfgpu package"""
import importlib.util
import sys
if self._py_env is None:
self._py_env = Path(sys.executable).parent
# Find wolfgpu.exe in script directory
candidate = self._py_env / 'wolfgpu.exe'
if candidate.exists():
self.wolfgpu = candidate
return
candidate = self._py_env / 'Scripts' / 'wolfgpu.exe'
if candidate.exists():
self.wolfgpu = candidate
return
else:
logging.error(_('WOLFGPU not found !'))
self.wolfgpu = None
self._py_env = None
[docs]
def _test_ui(self):
""" Test if the UI is available """
if self._ui is not None:
if self._ui._frame.IsShown():
return True
return False
[docs]
def load_data(self):
""" Chargement/Rechargement des données """
self.configs = {}
self.scan_wdir()
self.find_files()
if self._ui is not None:
# la fenêtre est déjà ouverte
self._ui.refill_data(self.configs)
else:
if self.wx_exists:
self._ui = UI_Manager_2D_GPU(self.configs, parent=self)
@property
[docs]
def epsilon(self):
return self._epsilon
@epsilon.setter
def epsilon(self, val:float):
self._epsilon = val
@property
[docs]
def filter_independent(self):
return self._filter_independent
@filter_independent.setter
def filter_independent(self, val:bool):
self._filter_independent = val
@property
[docs]
def txtctrl(self):
if self._ui is None:
return None
return self._ui._txtctrl
# Scanning directories
# --------------------
[docs]
def _scan_dir(self, wd:Path, curdict:dict):
""" Scan récursif d'un répertoire de base et création de sous-dictionnaires """
for curel in scandir(wd):
if curel.is_dir():
newel = curdict[Path(curel)]={}
self._scan_dir(curel, newel)
[docs]
def scan_wdir(self):
"""
Récupération de tous les répertoires et sous-répertoires
et placement dans le dictionnaire self.configs
"""
if self.workingdir.name =='':
logging.warning(_('Nothing to do !'))
return
self._scan_dir(self.workingdir, self.configs)
# Get properties
# --------------
[docs]
def get_all_numpy(self) -> list[Path]:
""" Get all numpy files """
all_numpy = []
for key, curdict in self._flat_configs:
if GPU_2D_file_extensions.NPY.value in curdict.keys():
if len(curdict[GPU_2D_file_extensions.NPY.value])>0:
if not curdict[IS_RESULTS]:
all_numpy += curdict[GPU_2D_file_extensions.NPY.value]
else:
logging.warning(_('Numpy files in simulation directory -- Ignored !'))
return all_numpy
[docs]
def get_all_tif(self) -> list[Path]:
""" Get all tif files """
all_tif = []
for key, curdict in self._flat_configs:
if GPU_2D_file_extensions.TIF.value in curdict.keys():
if len(curdict[GPU_2D_file_extensions.TIF.value])>0:
all_tif += curdict[GPU_2D_file_extensions.TIF.value]
return all_tif
[docs]
def get_all_sims(self) -> list[Path]:
""" Get all simulation files """
all_sims = []
for key, curdict in self._flat_configs:
if IS_SIMUL in curdict.keys():
if curdict[IS_SIMUL]:
all_sims.append(curdict['path'])
return all_sims
[docs]
def _flatten_configs(self) -> list[dict]:
""" flatten configs """
def _flatten(curdict:dict, ret:list):
""" flatten a dict """
for key, val in curdict.items():
if isinstance(val, dict):
ret.append((key, val))
_flatten(val, ret)
self._flat_configs = []
_flatten(self.configs, self._flat_configs)
[docs]
def check_prefix(self, list_tif:list[Path]) -> str:
""" Check if all files have the right prefix """
logging.info(_('Checking if prefix of all files are right...\n'))
logging.info(_('Number of tif files : {}'.format(len(list_tif))))
standard_files = ['bathymetry.tif', 'manning.tif', 'infiltration.tif', 'h.tif', 'qx.tif', 'qy.tif', 'roof.tif']
log = ''
for curtif in list_tif:
if curtif.name.lower() in standard_files:
# No need to test the prefix
break
# test if the prefix is in the accepted prefix
if not any([curtif.name.lower().startswith(curprefix) for curprefix in ACCEPTED_PREFIX]):
loclog = _('Bad prefix for {} !'.format(curtif.name)) + '\n'
tests = ['man_', 'mnn_', 'ann_', 'mamn_', 'mannn_']
for test in tests:
if curtif.name.lower().startswith(test):
loclog += _('Did you mean "mann_" ?') + '\n'
break
tests = ['bath_', 'bth_', 'ath_', 'bat_', 'bathymetry_']
for test in tests:
if curtif.name.lower().startswith(test):
loclog += _('Did you mean "bath_" ?') + '\n'
break
tests = ['infil_', 'infl_', 'nfil_', 'ifil_', 'infiltration_', 'infli_']
for test in tests:
if curtif.name.lower().startswith(test):
loclog += _('Did you mean "infil_" ?') + '\n'
break
tests = ['rof_', 'rooof_', 'rofo_', 'oof_', 'roff_']
for test in tests:
if curtif.name.lower().startswith(test):
loclog += _('Did you mean "roof_" ?') + '\n'
break
logging.warning(loclog)
log += loclog
return log
[docs]
def check_consistency(self) -> str:
"""
Check consistency of all files
All numpy files must have the same shape as the tif file in the root directory
All hydrographs must have the same number of columns
"""
logging.info(_('Checking consistency of all files except simulation results...\n'))
logging.info(_('NPY files...'))
numpyfiles = self.get_all_numpy()
logging.info(_('Number of numpy files : {}'.format(len(numpyfiles))))
log = ''
for curnpy in numpyfiles:
# test if the shape of the numpy file is the same as the tif file
# using memmap to avoid loading the whole array in memory -> faster
arr = np.lib.format.open_memmap(curnpy, mode='r')
if arr.shape != (self._header.nbx, self._header.nby):
loclog = _('Bad shape for {} !'.format(curnpy))
log += loclog + '\n'
logging.warning(loclog)
del(arr)
logging.info(_('Hydrographs...'))
hydro = self.get_hydrographs()
nb = hydro[0].data.shape[1]
logging.info(_('Number of hydrographs : {}'.format(len(hydro))))
for curhydro in hydro:
if curhydro.data.shape[1] != nb:
loclog = _('Bad number of columns for {} !'.format(curhydro[0]._filename))
log += loclog + '\n'
logging.warning(loclog)
return log
# Analyze files
# -------------
[docs]
def _find_files_subdirs(self, wd:Path, curdict:dict, erase_cache:bool = True):
""" Recherche des fichiers de simulation/scenario dans un répertoire """
# create list for all extensions
self._prefill_dict(curdict)
curdict['path'] = wd
# scan each file
for curel in scandir(wd):
if curel.is_file():
if not curel.name.startswith('__'):
if curel.name.startswith('cache_'):
if erase_cache:
# Delete cache file if exists. Normally, it should not exist as it is created by WOLF as temporary file.
# It will be recreated by WOLF if necessary and destroyed at the end of the process.
# The presence of this file is a sign that the simulation creation process was not completed.
Path(curel).unlink()
continue
parts=splitext(curel)
if len(parts)==2:
ext = parts[1]
if ext in ALL_EXTENSIONS:
locpath = Path(curel.path)
if ext == GPU_2D_file_extensions.PY.value:
# test if it is a WOLF update file
if check_file_update(locpath):
curdict[ext][WOLF_UPDATE].append(locpath)
elif check_file_bc(locpath):
curdict[ext][WOLF_BC].append(locpath)
else:
curdict[ext][OTHER_SCRIPTS].append(locpath)
else:
# ajout du fichier dans la liste
curdict[ext].append(locpath)
else:
pass
elif curel.is_dir():
if curel.name.startswith('__'):
pass
elif curel.name == RESULT_DIR:
curdict[HAS_RESULTS] = True
else:
curdict[SUBDIRS].append(Path(curel.name))
# test if it is a simulation
self._test_is_simul(curdict)
# test if it is a scenario
self._test_is_scenario(curdict)
# test if it is a results directory
self._test_is_results(curdict)
[docs]
def _recursive_find_files(self, wd:Path, curdict:dict, erase_cache:bool = True):
""" Recherche récursive des fichiers de simulation/scenario dans les
répertoires dont la structure a été traduite en dictionnaire """
if len(curdict.keys())>0:
for k in curdict.keys():
self._recursive_find_files(k, curdict[k])
if not '__' in wd.name:
self._find_files_subdirs(wd, curdict, erase_cache)
[docs]
def _prefill_dict(self, curdict:dict):
""" Création des listes pour toutes les extensions """
for ext in ALL_EXTENSIONS:
if ext == GPU_2D_file_extensions.PY.value:
curdict[ext] = {}
curdict[ext][WOLF_UPDATE] = []
curdict[ext][WOLF_BC] = []
curdict[ext][OTHER_SCRIPTS] = []
else:
curdict[ext] = []
curdict[IS_SIMUL] = False
curdict[IS_SCENARIO] = False
curdict[IS_RESULTS] = False
curdict[HAS_RESULTS] = False
curdict[MISSING] = []
curdict[SUBDIRS] = []
curdict['path'] = None
[docs]
def _test_is_simul(self, curdict:dict):
""" Teste si le répertoire contient des fichiers de simulation """
ok = True
# présence du fichier de paramètres
if GPU_2D_file_extensions.JSON.value in curdict.keys():
ok &= (GPU_2D_file.PARAMETERS.value.name+GPU_2D_file.PARAMETERS.value.extension).lower() in [cur.name.lower() for cur in curdict[GPU_2D_file_extensions.JSON.value]]
if ok:
json_data = json.load(open(curdict['path'] / (GPU_2D_file.PARAMETERS.value.name+GPU_2D_file.PARAMETERS.value.extension), 'r'))
else:
curdict['missing'].append(GPU_2D_file_extensions.JSON.value)
ok = False
if ok:
# Présence des fichiers de calcul
if GPU_2D_file_extensions.NPY.value in curdict.keys():
path_bath = curdict['path'] / Path(json_data['maps']['bathymetry'])
ok &= path_bath.exists()
path_mann = curdict['path'] / Path(json_data['maps']['manning'])
ok &= path_mann.exists()
path_nap = curdict['path'] / Path(json_data['maps']['NAP'])
ok &= path_nap.exists()
path_h = curdict['path'] / Path(json_data['maps']['h'])
ok &= path_h.exists()
path_qx = curdict['path'] / Path(json_data['maps']['qx'])
ok &= path_qx.exists()
path_qy = curdict['path'] / Path(json_data['maps']['qy'])
ok &= path_qy.exists()
if 'infiltration_zones' in json_data['maps'].keys():
path_infil = curdict['path'] / Path(json_data['maps']['infiltration_zones'])
ok &= path_infil.exists()
if not ok:
curdict['missing'].append(GPU_2D_file_extensions.NPY.value)
# for curfile in GPU_2D_file:
# if curfile.value.extension == GPU_2D_file_extensions.NPY.value:
# ok &= (curfile.value.name + curfile.value.extension).lower() in [cur.name.lower() for cur in curdict[GPU_2D_file_extensions.NPY.value]]
else:
curdict['missing'].append(GPU_2D_file_extensions.NPY.value)
ok = False
else:
pass
curdict[IS_SIMUL] = ok
[docs]
def _test_is_scenario(self, curdict:dict):
""" Teste si le répertoire contient des fichiers de scénario """
ok = False
ok |= len(curdict[GPU_2D_file_extensions.TIF.value])>0
ok |= len(curdict[GPU_2D_file_extensions.TIFF.value])>0
for sublist in curdict[GPU_2D_file_extensions.PY.value].values():
ok |= len(sublist)>0
curdict[IS_SCENARIO] = ok
[docs]
def _test_is_results(self, curdict:dict):
""" Teste si le répertoire contient des fichiers de résultats """
ok = False
# test du fichier 'metadata.json'
if GPU_2D_file_extensions.JSON.value in curdict.keys():
for curfile in curdict[GPU_2D_file_extensions.JSON.value]:
ok = curfile.name.lower() == 'metadata.json'
if ok:
break
curdict[IS_RESULTS] = ok
[docs]
def find_files(self):
"""
Recehrche des fichiers de simulation/scenario dans les répertoires dont la structure a été traduite en dictionnaire
"""
if self.workingdir.name =='':
logging.warning(_('Nothing to do !'))
return
self._recursive_find_files(self.workingdir, self.configs)
self._flatten_configs()
# initialisation du header
self._header = self.get_header()
# Assembly
# --------
[docs]
def get_tree(self, from_path:Path) -> list[Path]:
"""
Get tree from a path
Fnd all directories from the current path to the working directory
"""
curtree = [from_path]
while str(from_path) != str(self.workingdir):
from_path = from_path.parent
curtree.insert(0, from_path)
return curtree
[docs]
def get_dicts(self, from_tree:list[Path]) -> list[dict]:
""" Get dicts from a tree """
curdict = [self.configs]
for curpath in from_tree[1:]:
curdict.append(curdict[-1][curpath])
return curdict
[docs]
def _select_tif_partname(self, curdict:dict, tifstr:Literal['bath_', 'mann_', 'infil_', 'roof_']):
""" Select tif files with a 'str' as name's prefix """
assert tifstr in ACCEPTED_PREFIX, _('Bad prefix !')
if tifstr == 'bath_':
forced_add = ['bathymetry.tif']
elif tifstr == 'mann_':
forced_add = ['manning.tif']
elif tifstr == 'infil_':
forced_add = ['infiltration.tif']
elif tifstr == 'roof_':
forced_add = ['roof.tif']
tif_list = [curtif for curtif in curdict[GPU_2D_file_extensions.TIF.value] \
if curtif.name.lower().startswith(tifstr) or \
curtif.name.lower() in forced_add]
tif_list += [curtif for curtif in curdict[GPU_2D_file_extensions.TIFF.value] \
if curtif.name.lower().startswith(tifstr) or \
curtif.name.lower() in forced_add]
return tif_list
[docs]
def check_nodata(self, from_path:Path):
""" Check nodata in a path """
curtree = self.get_tree(from_path)
curdicts = self.get_dicts(curtree)
# tous les fichiers tif -> list of lists
all_tif_bath = [self._select_tif_partname(curdict, 'bath_') for curdict in curdicts]
all_tif_mann = [self._select_tif_partname(curdict, 'mann_') for curdict in curdicts]
all_tif_infil = [self._select_tif_partname(curdict, 'infil_') for curdict in curdicts]
all_tif_roof = [self._select_tif_partname(curdict, 'roof_') for curdict in curdicts]
# flatten list of lists
all_tif_bath = [curel for curlist in all_tif_bath if len(curlist)>0 for curel in curlist]
all_tif_mann = [curel for curlist in all_tif_mann if len(curlist)>0 for curel in curlist]
all_tif_infil = [curel for curlist in all_tif_infil if len(curlist)>0 for curel in curlist]
all_tif_roof = [curel for curlist in all_tif_roof if len(curlist)>0 for curel in curlist]
for cur_lst in [all_tif_bath, all_tif_mann, all_tif_infil, all_tif_roof]:
for cur_tif in cur_lst:
curarray:WolfArray = WolfArray(cur_tif)
if curarray.nullvalue != 99999.:
curarray.nullvalue = 99999.
curarray.set_nullvalue_in_mask()
curarray.write_all()
logging.warning(_('Bad nodata value for {} !'.format(cur_tif.name)))
[docs]
def create_vrt(self, from_path:Path):
""" Create a vrt file from a path """
logging.info(_('Checking virtual files...'))
logging.info(_('Checking nodata values...'))
self.check_nodata(from_path)
curtree = self.get_tree(from_path)
curdicts = self.get_dicts(curtree)
# tous les fichiers tif -> list of lists
all_tif_bath = [self._select_tif_partname(curdict, 'bath_') for curdict in curdicts]
all_tif_mann = [self._select_tif_partname(curdict, 'mann_') for curdict in curdicts]
all_tif_infil = [self._select_tif_partname(curdict, 'infil_') for curdict in curdicts]
all_tif_roof = [self._select_tif_partname(curdict, 'roof_') for curdict in curdicts]
# flatten list of lists
all_tif_bath = [curel for curlist in all_tif_bath if len(curlist)>0 for curel in curlist]
all_tif_mann = [curel for curlist in all_tif_mann if len(curlist)>0 for curel in curlist]
all_tif_infil = [curel for curlist in all_tif_infil if len(curlist)>0 for curel in curlist]
all_tif_roof = [curel for curlist in all_tif_roof if len(curlist)>0 for curel in curlist]
# création du fichier vrt
create_vrt_from_files_first_based(all_tif_bath, from_path / '__bath_assembly.vrt')
create_vrt_from_files_first_based(all_tif_mann, from_path / '__mann_assembly.vrt')
if len(all_tif_infil)>0:
create_vrt_from_files_first_based(all_tif_infil, from_path / '__infil_assembly.vrt')
else:
logging.info(_('No infiltration files found ! -> no __infil_assembly.vrt file created !'))
if len(all_tif_roof)>0:
create_vrt_from_files_first_based(all_tif_roof, from_path / '__roof_assembly.vrt')
else:
logging.info(_('No roof files found ! -> no __roof_assembly.vrt file created !'))
[docs]
def create_vec(self,
from_path:Path,
which:Literal['bath_', 'mann_', 'infil_', 'roof_'] = 'bath_') -> Zones:
""" Create a vec file from a path """
assert which in ACCEPTED_PREFIX, _('Bad prefix !')
curtree = self.get_tree(from_path)
curdicts = self.get_dicts(curtree)
# tous les fichiers tif -> list of lists
all_tif = [self._select_tif_partname(curdict, which) for curdict in curdicts]
# création du fichier vect
new_zones = Zones(idx = from_path.name, parent=self)
for cur_list in all_tif:
if len(cur_list)>0:
logging.info(_('Treating {} files...'.format(len(cur_list))))
for curtif in cur_list:
logging.info(_('Start : {} file...'.format(curtif.name)))
new_zone = zone(name = curtif.name, parent = new_zones)
new_zones.add_zone(new_zone)
curarray = WolfArray(curtif)
curarray.nullify_border(width=1) # Force a null value border --> necessary to avoid artefacts in the contour as no test is done on the border
sux, sux, curvect, interior = curarray.suxsuy_contour()
new_zone.add_vector(curvect, forceparent=True)
curvect.set_legend_to_centroid(curtif.name)
curvect.myprop.color = getIfromRGB((0, 0, 255))
curvect.myprop.width = 3
bounds = curarray.get_bounds()
bounds_vec = vector(name='bounds')
bounds_vec.add_vertex(wolfvertex(bounds[0][0], bounds[1][0]))
bounds_vec.add_vertex(wolfvertex(bounds[0][1], bounds[1][0]))
bounds_vec.add_vertex(wolfvertex(bounds[0][1], bounds[1][1]))
bounds_vec.add_vertex(wolfvertex(bounds[0][0], bounds[1][1]))
bounds_vec.close_force()
bounds_vec.myprop.color = getIfromRGB((255, 0, 0))
bounds_vec.myprop.width = 3
new_zone.add_vector(bounds_vec, forceparent=True)
logging.info(_('End : {} file...'.format(curtif.name)))
new_zones.find_minmax(update=True)
new_zones.saveas(str(from_path / (which +'_assembly.vecz')))
logging.info(_(f'End of {which}_assembly.vecz creation !'))
return new_zones
[docs]
def translate_vrt2tif(self, from_path:Path):
""" Translate vrt to tif """
vrtin = ['__bath_assembly.vrt', '__mann_assembly.vrt', '__infil_assembly.vrt', '__roof_assembly.vrt']
fout = ['__bathymetry.tif' , '__manning.tif', '__infiltration.tif', '__roof.tif']
for curin, curout in zip(vrtin, fout):
if (from_path / curin).exists():
translate_vrt2tif(from_path / curin, from_path / curout)
[docs]
def apply_scripts_bath_mann_inf_roof(self, from_path:Path):
""" Apply all scripts """
filenames = ['__bathymetry.tif', '__manning.tif', '__infiltration.tif', '__roof.tif']
# check if present on disk
if not all([(from_path / curfile).exists() for curfile in filenames]):
logging.error(_('At least one of the files is missing !'))
for curfile in filenames:
if not (from_path / curfile).exists():
logging.error(_(f'{curfile} is missing !'))
return
arrays = [WolfArray(from_path / curfile) for curfile in filenames]
self._apply_scripts_update_topo_maning_inf_roof(from_path, arrays[0], arrays[1], arrays[2], arrays[3])
# write the files
arrays[0].write_all(from_path / '__bathymetry_after_scripts.tif')
arrays[1].write_all(from_path / '__manning_after_scripts.tif')
arrays[2].write_all(from_path / '__infiltration_after_scripts.tif')
arrays[3].write_all(from_path / '__roof_after_scripts.tif')
[docs]
def _import_scripts(self, from_path:Path, which) -> list[types.ModuleType]:
""" List all modules in structure and import them.
As multiple files with a same name can be found in the structure,
a copy of the file is made in the same folder with a unique name
and then imported.
So, if a file is required in the script, the relative import can be used.
If we cache the file in an other folder, the relative import will not work.
After the import, the copied files are deleted.
"""
import shutil
assert isinstance(from_path, Path), _('Bad type for from_path !')
curtree = self.get_tree(from_path)
curdicts = self.get_dicts(curtree)
# tous les fichiers .py -> list of lists
all_py = [curpy for curdict in curdicts for curpy in curdict[GPU_2D_file_extensions.PY.value][which]]
# make a copy in a cache file in the same folder but with a unique name
to_import = []
for idx, cur_py in enumerate(all_py):
cur_py:Path
cur_dir = cur_py.parent
cached_name = 'cache_py_' + cur_py.stem + str(idx) + '.py'
shutil.copy(cur_py, cur_dir / cached_name)
to_import.append(cur_dir / cached_name)
imported_mod = import_files(to_import)
# #del all caches files
# for cur_py in to_import:
# cur_py.unlink()
return imported_mod
[docs]
def _import_scripts_topo_manning_inf_roof(self, from_path:Path) -> list[types.ModuleType]:
""" import all topo and manning scripts from a path """
return self._import_scripts(from_path, WOLF_UPDATE)
[docs]
def _apply_scripts_update_topo_maning_inf_roof(self,
modules:list[types.ModuleType] | Path | str,
array_bat:WolfArray,
array_mann:WolfArray,
array_inf:WolfArray,
array_roof:WolfArray):
""" Apply all scripts from a list of modules """
if isinstance(modules, str):
modules = Path(modules)
if isinstance(modules, Path):
modules = self._import_scripts_topo_manning_inf_roof(modules)
for curmod in modules:
instmod = curmod.Update_Sim_Scenario()
try:
if not hasattr(instmod, "update_topobathy"):
logging.info(_('No update_topobathy method found in the script {}!').format(curmod))
else:
instmod.update_topobathy(array_bat)
except Exception as e:
logging.error(_('An error occured during bathymetry script - {}!').format(e))
try:
if not hasattr(instmod, "update_manning"):
logging.info(_('No update_manning method found in the script {}!').format(curmod))
else:
instmod.update_manning(array_mann)
except Exception as e:
logging.error(_('An error occured during manning script - {}!').format(e))
try:
if not hasattr(instmod, "update_infiltration"):
logging.info(_('No update_infiltration method found in the script {}!').format(curmod))
else:
instmod.update_infiltration(array_inf)
except Exception as e:
logging.error(_('An error occured during infiltration script - {}!').format(e))
try:
if not hasattr(instmod, "update_roof"):
logging.info(_('No update_roof method found in the script {}!').format(curmod))
else:
instmod.update_roof(array_roof)
except Exception as e:
logging.error(_('An error occured during roof script - {}!').format(e))
[docs]
def _import_scripts_bc(self, from_path:Path) -> list[types.ModuleType]:
""" Import all BC's scripts from a path """
return self._import_scripts(from_path, WOLF_BC)
[docs]
def _apply_scripts_bc(self, modules:list[types.ModuleType] | Path | str, sim:SimpleSimulation):
""" Apply all scripts from a list of modules """
if isinstance(modules, str):
modules = Path(modules)
if isinstance(modules, Path):
modules = self._import_scripts_bc(modules)
for curmod in modules:
try:
curmod.Impose_BC_Scenario().impose_bc(sim)
except Exception as e:
logging.error(_('An error occured during BC script - {}!').format(e))
[docs]
def load_hydrograph(self, path:Path, toplot=True) -> tuple[Hydrograph_scenario, plt.Figure, plt.Axes]:
""" Load hydrograph from a path """
hydro = Hydrograph_scenario(path)
fig,ax = None, None
if toplot:
fig,ax = hydro.plot()
fig.show()
return hydro, fig, ax
[docs]
def load_ic(self, path:Path) -> InitialConditions_scenario:
""" Load initial conditions from a path """
path = Path(path)
low_keys = [Path(curkey).name.lower() for curkey in self.configs.keys()]
if INITIAL_CONDITIONS in low_keys:
return InitialConditions_scenario(self.workingdir / INITIAL_CONDITIONS / path.stem.replace('sim_', ''))
else:
return None
[docs]
def get_hydrographs(self) -> list[Hydrograph]:
""" Get all hydrographs"""
all_hydro = []
low_keys = [Path(curkey).name.lower() for curkey in self.configs.keys()]
if DISCHARGES in low_keys:
curkey = [curkey for curkey in self.configs.keys()][low_keys.index(DISCHARGES)]
list_hydro = self.configs[curkey][GPU_2D_file_extensions.TXT.value]
for curq in list_hydro:
all_hydro.append(self.load_hydrograph(curq, toplot=False)[0])
return all_hydro
[docs]
def get_initial_conditions(self) -> list[InitialConditions_scenario]:
""" Get all initial conditions """
low_keys = [Path(curkey).name.lower() for curkey in self.configs.keys()]
if INITIAL_CONDITIONS in low_keys:
return [self.load_ic(curpath) for curpath in self.configs.keys()[low_keys.index(INITIAL_CONDITIONS)]]
else:
return []
[docs]
def get_names_hydrographs(self) -> list[str]:
all_hydros = self.get_hydrographs()
names = [curhydro.name for curhydro in all_hydros]
return names
[docs]
def get_name_initial_conditions(self) -> list[str]:
low_keys = [Path(curkey).name.lower() for curkey in self.configs.keys()]
names = []
if INITIAL_CONDITIONS in low_keys:
dirdict = self.configs[list(self.configs.keys())[low_keys.index(INITIAL_CONDITIONS)]][SUBDIRS]
names = [curpath.name for curpath in dirdict]
return names
[docs]
def create_void_infil(self):
""" create void infiltration_zones file """
if (self.workingdir / 'bathymetry.tif').exists():
locheader = self.get_header()
infilzones = WolfArray(srcheader=locheader, whichtype= WOLF_ARRAY_FULL_INTEGER)
infilzones.array.data[:,:] = 0
infilzones.nullvalue = -1
infilzones.write_all(str(self.workingdir / 'infiltration.tif'))
if (self.workingdir / 'infiltration.tif').exists():
logging.info(_('infiltration.tif created and set to -1 ! -- Please edit it !'))
else:
logging.error(_("infiltration.tif not created ! -- Does 'bathymetry.tif' or any '.tif' file exist in the root directory ?"))
else:
logging.error(_("No 'bathymetry.tif' file found in the root directory !"))
[docs]
def create_void_roof(self):
""" create void roof file """
if (self.workingdir / 'bathymetry.tif').exists():
locheader = self.get_header()
roof = WolfArray(srcheader=locheader, whichtype= WOLF_ARRAY_FULL_SINGLE)
roof.array.data[:,:] = 99999.
roof.nullvalue = 99999.
roof.write_all(str(self.workingdir / 'roof.tif'))
if (self.workingdir / 'roof.tif').exists():
logging.info(_('roof.tif created and set to 99999. ! -- Please edit it !'))
else:
logging.error(_("roof.tif not created ! -- Does 'bathymetry.tif' or any '.tif' file exist in the root directory ?"))
else:
logging.error(_("No 'bathymetry.tif' file found in the root directory !"))
[docs]
def create_simulation(self,
dir:Path,
idx_hydros:list[int] = [-1],
delete_existing:bool = False,
preserve_ic:bool=False,
callback = None) -> list[Path]:
""" Create a simulation from different hydrographs """
if isinstance(dir, str):
dir = Path(dir)
# test if dir is in the tree
dirs_key = [key for key, curdict in self._flat_configs]
dirs_dict = [curdict for key, curdict in self._flat_configs]
if not dir in dirs_key:
logging.error(_('Directory {} not found ! - Aborting !'.format(dir)))
return
# dictionnaire associé au scénario du répertoire
scen_dict = dirs_dict[dirs_key.index(dir)]
# search for hydrographs
hydros = self.get_hydrographs()
names = self.get_names_hydrographs()
if idx_hydros == [-1]:
idx_hydros = list(range(len(hydros)))
maxhydro = max(idx_hydros)
minhydro = min(idx_hydros)
if maxhydro >= len(hydros):
logging.error(_('Index {} too high ! - Aborting !'.format(maxhydro)))
return
if minhydro < 0:
logging.error(_('Index {} too low ! - Aborting !'.format(minhydro)))
return
# select hydrographs
used_hydros = [hydros[curidx] for curidx in idx_hydros]
used_names = [names[curidx] for curidx in idx_hydros]
ic_available = self.get_name_initial_conditions()
used_ic = []
for curname in used_names:
if curname in ic_available:
used_ic.append(self.load_ic(curname))
else:
used_ic.append(None)
if len(used_hydros)==0:
logging.error(_('No hydrograph selected ! - Aborting !'))
return
# create subdirectories for each hydrograph
used_dirs = [Path(dir / ('simulations/sim_' + curname)) for curname in used_names]
for curdir in used_dirs:
if curdir.exists():
if delete_existing:
logging.info(_('Directory {} already exists ! -- Deleting it !'.format(curdir)))
try:
delete_folder(curdir)
except:
logging.error(_('Directory {} not deleted !'.format(curdir)))
else:
logging.info(_('Directory {} already exists ! -- Using it'.format(curdir)))
else:
logging.info(_('Creating directory {} !'.format(curdir)))
curdir.mkdir(parents=True)
# Assembly of bathymetry, manning and infiltration if exists
self.create_vrt(dir)
self.translate_vrt2tif(dir)
quit = False
if not (dir / '__bathymetry.tif').exists():
logging.error(_('No __bathymetry.tif found !'))
quit = True
if not (dir / '__manning.tif').exists():
logging.error(_('No __manning.tif found !'))
quit = True
if quit:
logging.error(_('Bad assembly operation -- Simulation creation aborted !'))
return
bat = WolfArray(str(dir / '__bathymetry.tif'))
man = WolfArray(str(dir / '__manning.tif'))
# check for infiltration
if exists(dir / '__infiltration.tif'):
infiltration = WolfArray(str(dir / '__infiltration.tif'))
if infiltration.wolftype != WOLF_ARRAY_FULL_INTEGER:
logging.error(_('Infiltration .tif must be a full integer array ! -- The array will be ignored !'))
infiltration = WolfArray(srcheader=bat.get_header(), whichtype= WOLF_ARRAY_FULL_INTEGER)
infiltration.array.data[:,:] = 0
else:
infiltration = WolfArray(srcheader=bat.get_header(), whichtype= WOLF_ARRAY_FULL_INTEGER)
infiltration.array.data[:,:] = 0
# check for roof
if exists(dir / '__roof.tif'):
roof = WolfArray(str(dir / '__roof.tif'))
if roof.wolftype != WOLF_ARRAY_FULL_SINGLE:
logging.error(_('Roof .tif must be a full single array ! -- The array will be ignored !'))
roof = WolfArray(srcheader=bat.get_header(), whichtype= WOLF_ARRAY_FULL_SINGLE)
roof.array.data[:,:] = 99999.
else:
roof = WolfArray(srcheader=bat.get_header(), whichtype= WOLF_ARRAY_FULL_SINGLE)
roof.array.data[:,:] = 99999.
# applying Python scrpitps on ARRAYS
self._apply_scripts_update_topo_maning_inf_roof(dir, bat, man, infiltration, roof)
# save arrays on disk
bat.write_all(str(dir / '__bathymetry_after_scripts.tif'))
man.write_all(str(dir / '__manning_after_scripts.tif'))
infiltration.write_all(str(dir / '__infiltration_after_scripts.tif'))
roof.write_all(str(dir / '__roof_after_scripts.tif'))
# create simulation
allsims = []
for id_sim, (curdir, curhydro, curic) in enumerate(zip(used_dirs, used_hydros, used_ic)):
if callback is not None:
callback(id_sim)
# instanciation de la simulation
cursim = SimpleSimulation(self._header.nbx, self._header.nby)
# paramétrage spatial
cursim.param_dx = self._header.dx
cursim.param_dy = self._header.dy
cursim.param_base_coord_ll_x = self._header.origx
cursim.param_base_coord_ll_y = self._header.origy
# paramétrage hydraulique/numérique
cursim.param_courant = .4
cursim.param_runge_kutta = .5
# associating arrays to simulation
cursim.bathymetry = bat.array.data
cursim.manning = man.array.data
cursim.nap = np.zeros((self._header.nbx, self._header.nby), dtype=np.uint8)
cursim.nap[cursim.bathymetry != 99999.] = 1
if curic is None:
curic:InitialConditions_scenario
# No global initial conditions
if not preserve_ic:
# reset initial conditions
cursim.h = np.zeros((self._header.nbx, self._header.nby), dtype=np.float32)
cursim.qx = np.zeros((self._header.nbx, self._header.nby), dtype=np.float32)
cursim.qy = np.zeros((self._header.nbx, self._header.nby), dtype=np.float32)
else:
# Using initial conditions from disk - sim directory
if (curdir / 'h.npy').exists():
cursim.h = np.load(curdir / 'h.npy')
else:
cursim.h = np.zeros((self._header.nbx, self._header.nby), dtype=np.float32)
if (curdir / 'qx.npy').exists():
cursim.qx = np.load(curdir / 'qx.npy')
else:
cursim.qx = np.zeros((self._header.nbx, self._header.nby), dtype=np.float32)
if (curdir / 'qy.npy').exists():
cursim.qy = np.load(curdir / 'qy.npy')
else:
cursim.qy = np.zeros((self._header.nbx, self._header.nby), dtype=np.float32)
else:
if not preserve_ic:
# Using global initial conditions if available
if curic.h is not None:
cursim.h = curic.h
else:
cursim.h = np.zeros((self._header.nbx, self._header.nby), dtype=np.float32)
if curic.qx is not None:
cursim.qx = curic.qx
else:
cursim.qx = np.zeros((self._header.nbx, self._header.nby), dtype=np.float32)
if curic.qy is not None:
cursim.qy = curic.qy
else:
cursim.qy = np.zeros((self._header.nbx, self._header.nby), dtype=np.float32)
cursim.infiltration_zones = np.asarray(infiltration.array.data, dtype=np.int32)
if roof.nbnotnull == 0:
cursim.bridge_roof = None
logging.info(_("No cells defined as roof ! -- Roof will be ignored !"))
else:
cursim.bridge_roof = roof.array.data
logging.info(_("You have {} cells defined as roof").format(roof.nbnotnull))
# add hydrograph
for idx, curline in curhydro.data.iterrows():
cursim.add_infiltration(float(idx), [float(cur) for cur in curline.values])
if curhydro.data.index[-1] == 0:
cursim.param_duration = SimulationDuration(SimulationDurationType.SECONDS, float(86400))
cursim.add_infiltration(float(86400), [float(cur) for cur in curline.values])
elif curhydro.data.index[-1]==999999.:
cursim.param_duration = SimulationDuration(SimulationDurationType.SECONDS, float(86400))
else:
cursim.param_duration = SimulationDuration(SimulationDurationType.SECONDS, float(curhydro.data.index[-1]))
# check for infiltration zones vs hydrograph
hydro = cursim.infiltrations_chronology
nb_zones = len(hydro[0][1])
if infiltration.array.max() != nb_zones:
logging.error(_('You must have {} Infiltration zones but {} are defined!'.format(nb_zones, infiltration.array.max())))
return
# default reporting period
cursim._param_report_period = SimulationDuration.from_seconds(3600)
# applying Python scrpitps on SIMULATION --> Boundary conditions
self._apply_scripts_bc(dir, cursim)
# cursim.h[cursim.infiltration_zones > 0] = .5
# save simulation
cursim.save(curdir)
logging.info(cursim.check_errors())
logging.info(_('Simulation {} created !'.format(curdir)))
with open(curdir / 'quickrun.bat', 'w', encoding='utf-8') as f:
f.write("@echo off\n")
f.write("\n")
f.write(str(curdir.drive) + '\n')
f.write('cd {}\n'.format(str(curdir.parent)))
f.write("\n")
f.write("WOLFGPU_PARAMS=-quickrun " + str(curdir.name) + "\n")
f.write("\n")
f.write("where wolfgpu.exe\n")
f.write("IF %ERRORLEVEL%==0 (\n")
f.write("wolfgpu %WOLFGPU_PARAMS%\n")
f.write("goto :end\n")
f.write(")\n")
f.write("\n")
f.write("echo -------------------------------\n")
f.write("echo ERROR !!!\n")
f.write("echo -------------------------------\n")
f.write("echo I can't find wolfgpu.exe.\n")
f.write("echo It is normally installed in the 'Scripts' subdirectory of your python\n")
f.write("echo directory (or environment).\n")
f.write("echo This 'Scripts' subdirectory must be available on the PATH environment variable.\n")
f.write("echo I am now going to try to run wolfgpu as a regular python module\n")
f.write("echo -------------------------------\n")
f.write("pause\n")
f.write("python -m wolfgpu.cli %WOLFGPU_PARAMS%\n")
f.write(":end\n")
allsims.append(curdir / 'quickrun.bat')
logging.info(_('Simulation creation finished !'))
logging.warning(_('Do not forget to update/set the boundary conditions if not set by scripts !'))
return allsims
[docs]
def create_batch(self, path:Path, allsims:list[Path]) -> str:
""" Create a batch file """
if len(allsims) == 0:
return
batch = ''
batch += str(allsims[0].drive) + '\n'
for cursim in allsims:
cursim:Path
batch += 'cd {}\n'.format(str(cursim.parent))
batch += str(cursim.name) + '\n'
with open(path, 'w', encoding='utf-8') as f:
f.write(batch)
if self.wolfgpu is None:
logging.warning('****************************************************')
logging.warning(_('Wolfgpu.exe not found !'))
logging.warning(_('It is normally installed in the "Scripts" subdirectory of your python directory (or environment).'))
logging.warning(_('This "Scripts" subdirectory must be available on the PATH environment variable.'))
logging.warning('****************************************************')
else:
logging.info('****************************************************')
logging.info(_('Wolfgpu.exe found in {}!').format(self.wolfgpu))
logging.info(_('You can now run the simulations !'))
logging.info(_('Do not forget to activate your Python virtual environment if you are using one !'))
logging.info('****************************************************')
return batch
[docs]
def run_batch(self, batch:Path):
""" run a batch file in a subprocess """
if not batch.exists():
logging.error(_('Batch file {} does not exist !'.format(batch)))
return
if not batch.is_file():
logging.error(_('Batch file {} is not a file !'.format(batch)))
return
if batch.suffix != '.bat':
logging.error(_('Batch file {} is not a .bat file !'.format(batch)))
return
import subprocess
# Execute the batch file in a separate process
subprocess.Popen(str(batch), shell=True)
[docs]
def get_mapviewer(self):
""" Get the mapviewer object """
return self.mapviewer
[docs]
def transfer_ic(self, dir1: Path, dir2: Path):
""" Transfer IC from one sim to another """
ic1 = self.load_ic(dir1)
if ic1 is None:
logging.error(_('No IC found in {} !'.format(dir1)))
return
ic2 = self.load_ic(dir2)
if ic2 is None:
logging.error(_('No IC found in {} !'.format(dir2)))
return
ic2.qx = ic1.qx
ic2.qy = ic1.qy
ic2.set_h_from_z(ic1.z_elevation)
ic2.save(dir2)
[docs]
class UI_Manager_2D_GPU():
""" User Interface for scenario 2D GPU """
def __init__(self, data:dict, parent:Config_Manager_2D_GPU) -> None:
self._parent = parent
self._batch = None
self.create_UI()
# Fill tree with data
self._append_configs2tree(data, self._root)
self._txtctrl.Clear()
self._txtctrl.write(str(self._parent._header))
self._wp:dict[SimpleSimulation, Wolf_Param] = {}
[docs]
def refill_data(self, data:dict):
""" Fill tree with data """
# la fenêtre est déjà ouverte
self._treelist.DeleteAllItems()
self._txtctrl.SetBackgroundColour(wx.WHITE)
self._txtctrl.SetForegroundColour(wx.BLACK)
self._txtctrl.Clear()
# Fill tree with data
self._append_configs2tree(data, self._root)
[docs]
def create_UI(self):
"""
Création de l'interface graphique
Partie latérale gauche - arbre des simulations
Partie latérale droite - boutons d'actions
Partie inférieure - affichage des informations
"""
# frame creation
self._frame = wx.Frame(self._parent.get_mapviewer(), wx.ID_ANY, _('Scenario WOLF2D_GPU'), size=(800,800))
# sizers creation -- frame's structure
sizer_updown = wx.BoxSizer(wx.VERTICAL)
sizer_horizontal = wx.BoxSizer(wx.HORIZONTAL)
sizer_buttons = wx.BoxSizer(wx.VERTICAL)
sizer_txt_ckd = wx.BoxSizer(wx.HORIZONTAL)
# # Liste des chemins d'accès aux icônes
# icon_paths = []
# icon_path = Path(__file__).parent / '..\\icons'
# for curicon in scandir(icon_path):
# if curicon.is_file():
# icon_paths.append(Path(curicon))
# # Création de l'objet wx.ImageList
# image_list = wx.ImageList()
# # Chargement des icônes dans l'image list
# for path in icon_paths:
# icon = wx.Icon(str(path), wx.BITMAP_TYPE_PNG)
# image_list.Add(icon)
# tree creation
self._treelist = TreeListCtrl(self._frame,
style=dataview.TL_CHECKBOX|
wx.TR_FULL_ROW_HIGHLIGHT | wx.TR_HAS_BUTTONS)
# self._treelist.AssignImageList(image_list)
# tree actions
self._treelist.Bind(dataview.EVT_TREELIST_ITEM_CHECKED, self.OnCheckItem) # check/uncheck
self._treelist.Bind(dataview.EVT_TREELIST_ITEM_ACTIVATED, self.OnActivateTreeElem) # double click
# tree root
self._root = self._treelist.GetRootItem()
self._selected_item = None
self._treelist.AppendColumn(_('2D GPU Models'))
# multilines text control for information
self._txtctrl = TextCtrl(self._frame, style=wx.TE_MULTILINE|wx.TE_BESTWRAP|wx.TE_RICH)
# Action buttons
# --------------
self._reload = wx.Button(self._frame,label = _('Reload/Update structure'))
self._reload.Bind(wx.EVT_BUTTON,self.onupdate_structure)
self._reload.SetToolTip(_('reScan the directory and reload the entire structure'))
self._create_void_infil = wx.Button(self._frame,label = _('Create .tif infiltration zones'))
self._create_void_infil.Bind(wx.EVT_BUTTON,self.oncreate_void_infil)
self._create_void_infil.SetToolTip(_('Create a void infiltration zones file based on bathymetry.tif'))
self._create_void_roof = wx.Button(self._frame,label = _('Create .tif bridge/culvert roof elevation'))
self._create_void_roof.Bind(wx.EVT_BUTTON,self.oncreate_void_roof)
self._create_void_roof.SetToolTip(_('Create a void roof file based on bathymetry.tif'))
self._create_void_scripts = wx.Button(self._frame,label = _('Create void scripts'))
self._create_void_scripts.Bind(wx.EVT_BUTTON,self.oncreate_void_scripts)
self._create_void_scripts.SetToolTip(_('Create void script files for topography, manning and infiltration'))
self._create_vrt = wx.Button(self._frame,label = _('Assembly .vrt to current level'))
self._create_vrt.Bind(wx.EVT_BUTTON,self.oncreatevrt)
self._create_vrt.SetToolTip(_('Create a .vrt file from all bathymetry, manning, infiltration and roof .tif files\nBe sure that all files are right named !\n\n - bathymetry must contain "bath"\n - manning must contain "mann"\n - infiltration must contain "infil"\n - roof must contain "roof"'))
self._translate_vrt = wx.Button(self._frame,label = _('Translate .vrt to .tif'))
self._translate_vrt.Bind(wx.EVT_BUTTON,self.ontranslatevrt2tif)
self._translate_vrt.SetToolTip(_('Translate .vrt files to .tif files\n\n - __bath_assembly.vrt -> __bathymetry.tif\n - __mann_assembly.vrt -> __manning.tif\n - __infil_assembly.vrt -> __infiltration.tif\n - __roof_assembly.vrt -> __roof.tif'))
self._apply_scripts = wx.Button(self._frame,label = _('Apply scripts on bathymetry, manning, infiltration and roof elevation'))
self._apply_scripts.Bind(wx.EVT_BUTTON,self.onapply_scripts)
self._apply_scripts.SetToolTip(_('Apply scripts on bathymetry, manning and infiltration\n\n - bathymetry.tif\n - manning.tif\n - infiltration.tif\n - roof.tif\n\nThe scripts must be in the structure starting with parent directory and descending.'))
self._create_vec = wx.Button(self._frame,label = _('Search spatial coverage from current level'))
self._create_vec.Bind(wx.EVT_BUTTON,self.oncreatevec)
self._create_vec.SetToolTip(_('Create a .vecz file (with contour and global bounds) from all bathymetry and manning .tif files\nBe sure that all files are right named !\n\n - bathymetry must contain "bath"\n - manning must contain "mann"\n - infiltration must contain "infil"\n - roof must contain "roof"'))
self._check_prefix = wx.Button(self._frame,label = _('Check prefix (tif files)'))
self._check_prefix.Bind(wx.EVT_BUTTON,self.oncheck_prefix)
self._check_prefix.SetToolTip(_('Check prefix of .tif files\n\n - bath_*.tif\n - mann_*.tif\n - infil_*.tif\n - roof_*.tif\n\nThe prefix must be "bath_", "mann_", "infil_" and "roof_"'))
self._checkconsistency = wx.Button(self._frame,label = _('Check consistency'))
self._checkconsistency.Bind(wx.EVT_BUTTON,self.oncheck_consistency)
self._checkconsistency.SetToolTip(_('Check consistency of the scenario\n\n - bathymetry.tif\n - manning.tif\n - infiltration.tif\n - hydrographs\n - initial conditions\n - boundary conditions\n - scripts'))
self._createsim = wx.Button(self._frame,label = _('Create simulation(s)'))
self._createsim.Bind(wx.EVT_BUTTON,self.oncreate_simulation)
self._createsim.SetToolTip(_('Create simulation(s) from selected hydrographs'))
self._transfer_ic = wx.Button(self._frame,label = _('Transfer global initial conditions'))
self._transfer_ic.Bind(wx.EVT_BUTTON,self.ontransfer_ic)
self._transfer_ic.SetToolTip(_('Transfer global initial conditions from a simulation to another\n\nThe directory {} must exist !\n\nAnd subdirectories related to the discharges must also be present.\n\nIf not, firstly, extract global IC with ALT+DCLICK on simulations.'.format(INITIAL_CONDITIONS)))
self._extract_tif = wx.Button(self._frame,label = _('Extract .tif files for all selected scenarios'))
self._extract_tif.Bind(wx.EVT_BUTTON,self.onextract_tif)
self._extract_tif.SetToolTip(_('Extract .tif files for all selected scenarios'))
self._runbatch = wx.Button(self._frame,label = _('Run batch file !'))
self._runbatch.Bind(wx.EVT_BUTTON,self.onrun_batch)
self._runbatch.SetToolTip(_('Run the batch file on local machine\n\n - The batch file must be created before\n - The batch file must be a .bat file'))
self.listsims = wx.Button(self._frame,label = _('List simulation(s)'))
self.listsims.Bind(wx.EVT_BUTTON,self.onlist_simulation)
self.listsims.SetToolTip(_('List all simulations and print them in the text control'))
# Text control
# ------------
self.epsilon = wx.TextCtrl(self._frame, value='0.01', style=wx.TE_PROCESS_ENTER | wx.TE_CENTER)
self.epsilon.Bind(wx.EVT_TEXT_ENTER, self.onchange_epsilon)
self.epsilon.SetToolTip(_('Epsilon value\nValues below this threshold will be considered as 0.0\n\nPress Enter to validate'))
# checkbox control
# ----------------
self.filter = wx.CheckBox(self._frame, label=_('Filter independent zones'), style=wx.CHK_2STATE)
self.filter.SetValue(True)
self.filter.Bind(wx.EVT_CHECKBOX, self.onchange_filter)
self.filter.SetToolTip(_('Filter independent zones\nFirst a labelling will be applied\nThen only the most important zone is kept\nThe others are set to 0.0'))
sizer_txt_ckd.Add(self.filter, 1, wx.EXPAND)
sizer_txt_ckd.Add(self.epsilon, 1, wx.EXPAND)
# Positions
# ---------
# buttons -> sizer
sizer_buttons.Add(self._reload,1,wx.EXPAND)
sizer_buttons.Add(self._create_void_infil,1,wx.EXPAND)
sizer_buttons.Add(self._create_void_roof,1,wx.EXPAND)
sizer_buttons.Add(self._create_void_scripts,1,wx.EXPAND)
sizer_buttons.Add(self._check_prefix,1,wx.EXPAND)
sizer_buttons.Add(self._create_vrt,1,wx.EXPAND)
sizer_buttons.Add(self._translate_vrt,1,wx.EXPAND)
sizer_buttons.Add(self._apply_scripts,1,wx.EXPAND)
sizer_buttons.Add(self._checkconsistency,1,wx.EXPAND)
sizer_buttons.Add(self._create_vec,1,wx.EXPAND)
sizer_buttons.Add(self.listsims,1,wx.EXPAND)
sizer_buttons.Add(self._createsim,1,wx.EXPAND)
sizer_buttons.Add(self._runbatch,1,wx.EXPAND)
sizer_buttons.Add(self._transfer_ic,1,wx.EXPAND)
sizer_buttons.Add(self._extract_tif,1,wx.EXPAND)
sizer_buttons.Add(sizer_txt_ckd,1,wx.EXPAND)
# tree, buttons -> horizontal sizer
sizer_horizontal.Add(self._treelist,1,wx.EXPAND)
sizer_horizontal.Add(sizer_buttons,1,wx.EXPAND)
# txt_ctrl, (tree, buttons) -> updown sizer
sizer_updown.Add(sizer_horizontal,1,wx.EXPAND)
sizer_updown.Add(self._txtctrl,1,wx.EXPAND)
# link sizer to frame
self._frame.SetSizer(sizer_updown)
if self._parent.get_mapviewer() is not None:
self._frame.SetIcon(self._parent.get_mapviewer().GetIcon())
# Layout
self._frame.Layout()
# Set the position to the center of the screen
self._frame.Centre(wx.BOTH)
# Show
self._frame.Show()
[docs]
def onchange_epsilon(self, e:wx.KeyEvent):
""" Change epsilon value """
try:
epsilon = float(self.epsilon.GetValue())
self._parent.epsilon = epsilon
except:
logging.error(_('Epsilon value must be a float -- set 0.0 by default !'))
self._parent.epsilon = 0.
[docs]
def onchange_filter(self, e:wx.MouseEvent):
""" Change filter independent zones value """
self._parent.filter_independent = self.filter.GetValue()
[docs]
def reload(self):
""" Reload the structure """
self._parent.load_data()
[docs]
def onupdate_structure(self,e:wx.MouseEvent):
""" Mise à jour de la structure """
self.reload()
[docs]
def oncreate_void_infil(self, e:wx.MouseEvent):
""" Création d'un fichier d'infiltration vide """
self._parent.create_void_infil()
self.reload()
[docs]
def oncreate_void_roof(self, e:wx.MouseEvent):
""" Création d'un fichier de toit de pont vide """
self._parent.create_void_roof()
self.reload()
[docs]
def oncreate_void_scripts(self,e:wx.MouseEvent):
""" Création d'un script vide """
def_dir = str(self._parent.workingdir)
if isinstance(self._selected_item, Path):
def_dir = str(self._selected_item.parent)
dlg = wx.DirDialog(None, _('Choose a scenario directory'), style = wx.DD_DIR_MUST_EXIST, defaultPath=def_dir) #, wildcard = 'Python script (*.py)|*.py')
ret = dlg.ShowModal()
if ret != wx.ID_OK:
dlg.Destroy()
return
wdir = dlg.GetPath()
dlg.Destroy()
file_update = Path(wdir) / 'update_top_mann_scen.py'
if file_update.exists():
dlg = wx.MessageDialog(None, _('File {} already exists ! \n Overwrite ?'.format(file_update)), _('Warning'), wx.YES_NO)
ret = dlg.ShowModal()
if ret != wx.ID_YES:
dlg.Destroy()
dlg = wx.FileDialog(None, _('Choose a new file name'), style = wx.FD_SAVE, defaultDir=wdir, defaultFile='_'+str(file_update.name), wildcard = 'Python script (*.py)|*.py')
ret = dlg.ShowModal()
if ret != wx.ID_OK:
dlg.Destroy()
return
file_update = Path(dlg.GetPath())
dlg.Destroy()
update_void(file_update)
file_bc = Path(wdir) / 'impose_bc_scen.py'
if file_bc.exists():
dlg = wx.MessageDialog(None, _('File {} already exists ! \n Overwrite ?'.format(file_bc)), _('Warning'), wx.YES_NO)
ret = dlg.ShowModal()
if ret != wx.ID_YES:
dlg.Destroy()
dlg = wx.FileDialog(None, _('Choose a new file name'), style = wx.FD_SAVE, defaultDir=wdir, defaultFile='_'+str(file_bc.name), wildcard = 'Python script (*.py)|*.py')
ret = dlg.ShowModal()
if ret != wx.ID_OK:
dlg.Destroy()
return
file_update = Path(dlg.GetPath())
dlg.Destroy()
bc_void(file_bc)
self.reload()
[docs]
def oncreatevrt(self,e:wx.MouseEvent):
""" Création d'un fichier vrt """
if self._selected_item is None:
logging.error(_('Please select a scenario to analyze by activating an elemnt in the tree list !'))
return
logging.info(_('Creating vrt ...'))
mydata = self._treelist.GetItemData(self._selected_item)
if not 'path' in mydata:
logging.error(_('Please select a scenario to analyze !'))
return
# création du fichier vrt
self._parent.create_vrt(mydata['path'])
logging.info(_('... done !'))
self.reload()
[docs]
def oncreatevec(self,e:wx.MouseEvent):
""" Création d'un fichier vec """
if self._selected_item is None:
logging.error(_('Please select a scenario to analyze by activating an elemnt in the tree list !'))
return
logging.info(_('Creating vecz ...'))
mydata = self._treelist.GetItemData(self._selected_item)
if not 'path' in mydata:
logging.error(_('Please select a scenario to analyze !'))
return
# création du fichier vrt
new_zones = self._parent.create_vec(mydata['path'])
logging.info(_('... done !'))
dlg = wx.MessageDialog(None, _('Do you want to add the new zones to the map ?'), _('Warning'), wx.YES_NO)
ret = dlg.ShowModal()
if ret == wx.ID_YES:
new_zones.set_mapviewer()
cur_ids = self._parent.mapviewer.get_list_keys(drawing_type=draw_type.VECTORS)
newid = mydata['path'].name
while newid in cur_ids:
newid = newid + '_'
self._parent.mapviewer.add_object('vector', newobj = new_zones, id=newid)
self._parent.mapviewer.Refresh()
dlg.Destroy()
self.reload()
[docs]
def ontranslatevrt2tif(self,e:wx.MouseEvent):
""" Traduction d'un fichier vrt en tif """
logging.info(_('Translating vrt to tif ...'))
if self._selected_item is None or self._selected_item == self._treelist.GetRootItem():
logging.info(_('No item selected ! -- using root item'))
with wx.MessageDialog(None, _('No item selected ! -- using root item'), _('Warning'), wx.OK | wx.CANCEL | wx.ICON_WARNING) as dlg:
ret = dlg.ShowModal()
if ret != wx.ID_OK:
return
mydata = self._parent.configs
else:
mydata = self._treelist.GetItemData(self._selected_item)
if not 'path' in mydata:
logging.error(_('Please select a scenario to analyze !'))
return
# création du fichier vrt
self._parent.translate_vrt2tif(mydata['path'])
logging.info(_('... done !'))
[docs]
def onapply_scripts(self,e:wx.MouseEvent):
""" Application des scripts sur les fichiers tif """
logging.info(_('Applying scripts ...'))
if self._selected_item is None or self._selected_item == self._treelist.GetRootItem():
logging.info(_('No item selected ! -- using root item'))
with wx.MessageDialog(None, _('No item selected ! -- using root item'), _('Warning'), wx.OK | wx.CANCEL | wx.ICON_WARNING) as dlg:
ret = dlg.ShowModal()
if ret != wx.ID_OK:
return
mydata = self._parent.configs
else:
mydata = self._treelist.GetItemData(self._selected_item)
if not 'path' in mydata:
logging.error(_('Please select a scenario to analyze !'))
return
# application des scripts
self._parent.apply_scripts_bath_mann_inf_roof(mydata['path'])
self.reload()
logging.info(_('... done !'))
[docs]
def oncheck_prefix(self,e:wx.MouseEvent):
""" Vérification des préfixes des fichiers tif """
logging.info(_('Checking prefix ...'))
if self._selected_item is None or self._selected_item == self._treelist.GetRootItem():
logging.info(_('No item selected ! -- using root item'))
with wx.MessageDialog(None, _('No item selected ! -- using root item ?'), _('Warning'), wx.OK | wx.CANCEL | wx.ICON_WARNING) as dlg:
ret = dlg.ShowModal()
if ret != wx.ID_OK:
return
mydata = self._parent.configs
else:
mydata = self._treelist.GetItemData(self._selected_item)
# création du fichier vrt
log = self._parent.check_prefix(mydata['.tif']+mydata['.tiff'])
if log =='':
self._txtctrl.WriteText("\n".join([_("All is fine !")]))
else:
self._txtctrl.WriteText(log)
[docs]
def oncheck_consistency(self,e:wx.MouseEvent):
""" Vérification de la cohérence des fichiers """
self._txtctrl.Clear()
log = self._parent.check_consistency()
if log =='':
self._txtctrl.WriteText("\n\n".join([_("All is fine !")]))
else:
self._txtctrl.WriteText("\n\n".join([log]))
# Info on Python Environment and wolfgpu Path and version
# -------------------------------------------------------
import sys
# Python Environment
self._txtctrl.write(_('\nPython Environment\n'))
self._txtctrl.write('-------------------\n')
self._txtctrl.write('Python version : {}\n'.format(sys.version))
self._txtctrl.write('Python path : {}\n'.format(sys.executable))
self._txtctrl.write('Python version info : {}\n'.format(sys.version_info))
# Test if wolfgpu.exe exists in script directory
# wolfgpu Path and version
self._txtctrl.write('\nWolfgpu Path and version\n')
self._txtctrl.write('------------------------\n')
wolfgpu = self._parent.wolfgpu
if wolfgpu.exists():
self._txtctrl.write('Wolfgpu.exe found in : {}\n'.format(self._parent.wolfgpu.parent))
else:
self._txtctrl.write('Wolfgpu.exe not found !\n')
self._parent.wolfgpu = None
[docs]
def choice_hydrograph(self) -> list[int]:
names = self._parent.get_names_hydrographs()
dlg = wx.MultiChoiceDialog(None, _('Choose hydrograph'), _('Hydrographs'), names)
ret = dlg.ShowModal()
if ret != wx.ID_OK:
dlg.Destroy()
return None
idx = dlg.GetSelections()
dlg.Destroy()
return idx
[docs]
def ontransfer_ic(self, e:wx.MouseEvent):
""" Transfert des conditions initiales """
logging.info(_('Transferring initial conditions ...'))
with wx.DirDialog(None, _('Choose the source scenario directory'), style = wx.DD_DIR_MUST_EXIST) as dlg:
ret = dlg.ShowModal()
if ret != wx.ID_OK:
return
wdir = dlg.GetPath()
with wx.DirDialog(None, _('Choose the destination scenario directory'), style = wx.DD_DIR_MUST_EXIST) as dlg:
ret = dlg.ShowModal()
if ret != wx.ID_OK:
return
wdir2 = dlg.GetPath()
self._parent.transfer_ic(Path(wdir), Path(wdir2))
self.reload()
[docs]
def oncreate_simulation(self, e:wx.MouseEvent):
""" Creation d'une simulation """
logging.info(_('Creating simulation ...'))
hydro = self.choice_hydrograph()
if hydro is None:
return
if len(hydro)==0:
return
# Recherche du répertoire de base à ouvrir
# utilisatation du répertoire sélectionné ou du répertoire parent/source
# si aucun élément sélectionné ou si l'élément sélectionné n'est pas un dictionnaire
if self._selected_item is None or self._selected_item == self._treelist.GetRootItem():
logging.info(_('No item selected ! -- using root item'))
mydata = self._parent.configs
else:
mydata = self._treelist.GetItemData(self._selected_item)
if isinstance(mydata, dict):
pass
else:
logging.info(_('The current activated item is not a dictionnary ! -- using root item'))
mydata = self._parent.configs
dlg = wx.DirDialog(None, _('Choose a scenario directory'), style = wx.DD_DIR_MUST_EXIST, defaultPath=str(mydata['path']))
ret = dlg.ShowModal()
if ret != wx.ID_OK:
dlg.Destroy()
return
path = dlg.GetPath()
dlg.Destroy()
dlg = wx.MessageDialog(None, _('Do you want to delete existing simulations ?'), _('Warning'), wx.YES_NO)
ret = dlg.ShowModal()
destroy_if_exists = ret == wx.ID_YES
dlg.Destroy()
preserve_ic = False
if not destroy_if_exists:
dlg = wx.MessageDialog(None, _('Do you want to preserve initial conditions ?'), _('Warning'), wx.YES_NO)
ret = dlg.ShowModal()
preserve_ic = ret == wx.ID_YES
dlg.Destroy()
pgbar = wx.ProgressDialog(_('Creating simulations ...'), _('Please wait ...'), maximum=len(hydro), parent=self._frame, style = wx.PD_APP_MODAL | wx.PD_AUTO_HIDE)
allsims = self._parent.create_simulation(Path(path), hydro, destroy_if_exists, preserve_ic, callback=pgbar.Update)
pgbar.Destroy()
self.reload()
if allsims is None:
logging.error(_('No simulation created !'))
return
if len(allsims)>0:
self._txtctrl.write(_('You have created {} simulations\n\n'.format(len(allsims))))
for cursim in allsims:
self._txtctrl.write(str(cursim) + '\n')
dlg = wx.MessageDialog(None, _('Do you want to create a batch file ?'), _('Warning'), wx.YES_NO)
ret = dlg.ShowModal()
create_batch = ret == wx.ID_YES
dlg.Destroy()
if create_batch:
dlg = wx.FileDialog(None, _('Choose a batch file name'), style = wx.FD_SAVE, defaultDir=str(path), defaultFile='quickruns.bat', wildcard = 'Batch file (*.bat)|*.bat')
ret = dlg.ShowModal()
if ret != wx.ID_OK:
dlg.Destroy()
return
batch = Path(dlg.GetPath())
dlg.Destroy()
self._batch = batch
batch = self._parent.create_batch(Path(batch), allsims)
self._txtctrl.write('\n\n')
self._txtctrl.write(_('You can run the simulations with the following commands / batch file :\n\n'))
self._txtctrl.write(batch)
logging.info(_('... done !'))
[docs]
def onrun_batch(self,e:wx.MouseEvent):
""" run batch file """
if self._batch is None:
return
self._parent.run_batch(self._batch)
[docs]
def onlist_simulation(self,e:wx.MouseEvent):
""" List all simulations and print infos in text control """
all_sims = self._parent.get_all_sims()
self._txtctrl.Clear()
self._txtctrl.write(_('You have {} simulations\n\n'.format(len(all_sims))))
self._txtctrl.write(_('List of simulations\n\n'))
for cursim in all_sims:
self._txtctrl.write(str(cursim) + '\n')
[docs]
def get_sims_only(self, force=False):
""" Get paths to all or selected simulations """
sims=[]
curitem:TreeListItem
curitem = self._treelist.GetFirstItem()
while curitem.IsOk():
mydata = self._treelist.GetItemData(curitem)
checked = self._treelist.GetCheckedState(curitem) == wx.CHK_CHECKED
if isinstance(mydata, dict):
if IS_SIMUL in mydata:
if mydata[IS_SIMUL]:
if checked or force:
sims += [mydata]
curitem = self._treelist.GetNextItem(curitem)
# curitem = self._treelist.GetItemParent(curitem)
return sims
[docs]
def OnCheckItem(self,e):
""" All levels under the item are checked/unchecked"""
myitem = e.GetItem()
ctrl = wx.GetKeyState(wx.WXK_CONTROL)
myparent:TreeListItem
myparent = self._treelist.GetItemParent(myitem)
mydata = self._treelist.GetItemData(myitem)
check = self._treelist.GetCheckedState(myitem)
self._treelist.CheckItemRecursively(myitem, check)
[docs]
def _callbackwp(self):
""" Callback for wolfparam """
for cursim in self._wp:
curwp = self._wp[cursim]
if curwp is not None:
try:
if curwp.Shown:
cursim.from_wolfparam(curwp)
cursim._save_json()
except Exception as e:
self._wp[cursim] = None
logging.debug(_('Error while saving parameters for simulation {}'.format(cursim.path.name)))
logging.debug(str(e))
[docs]
def _callbackwp_destroy(self):
""" Callback for wolfparam """
for cursim in self._wp:
curwp = self._wp[cursim]
if curwp is not None:
try:
if curwp.Shown:
cursim.from_wolfparam(curwp)
cursim._save_json()
except Exception as e:
self._wp[cursim] = None
logging.debug(_('Error while saving parameters for simulation {}'.format(cursim.path.name)))
logging.debug(str(e))
[docs]
def OnActivateTreeElem(self, e):
"""
If you double click on a tree element
"""
myitem:TreeListItem
myitem = e.GetItem()
self._selected_item = myitem
# State of the CTRL key
# - True if pressed
# - False if not pressed
ctrl = wx.GetKeyState(wx.WXK_CONTROL)
shift = wx.GetKeyState(wx.WXK_SHIFT)
alt = wx.GetKeyState(wx.WXK_ALT)
# Upstream tree element
myparent = self._treelist.GetItemParent(myitem)
# State of the item - Checked or not
check = self._treelist.GetCheckedState(myitem)
# Data associated with the item
mydata = self._treelist.GetItemData(myitem)
self._txtctrl.Clear()
self._txtctrl.SetBackgroundColour(wx.WHITE)
self._txtctrl.SetForegroundColour(wx.BLACK)
if isinstance(mydata, dict):
self._txtctrl.write(_('Yous have selected : {}\n\n'.format(str(mydata['path']))))
if mydata[IS_SIMUL]:
self._txtctrl.write(_('GPU SIMULATION\n\n'))
if ctrl and not shift and not alt:
# CTRL pressed
# - Open the simulation in the viewer
logging.info(_('Opening simulation {}'.format(mydata['path'].name)))
ids = self._parent.mapviewer.get_list_keys(draw_type.RES2D)
newid = str(mydata['path'].name)
while newid.lower() in ids:
dlg = wx.TextEntryDialog(None, _('Choose a name for the new object'), _('Name'), str(mydata['path'].name) + '_new')
ret = dlg.ShowModal()
if ret != wx.ID_OK:
dlg.Destroy()
return
newid = dlg.GetValue()
dlg.Destroy()
logging.info(_(' Reading simulation data'))
addedsim = wolfres2DGPU(str(mydata['path'] / 'simul_gpu_results'), eps=1e-5, idx=newid, mapviewer=self._parent.mapviewer)
addedsim.epsilon = self._parent.epsilon
addedsim._epsilon_default = self._parent.epsilon
addedsim.to_filter_independent = self._parent.filter_independent
logging.info(_(' Adding simulation to the mapviewer'))
self._parent.mapviewer.add_object('res2d_gpu', newobj=addedsim, id=newid)
# add the related menus to the mapviewer
self._parent.mapviewer.menu_wolf2d()
self._parent.mapviewer.menu_2dgpu()
logging.info(_(' Reading last step'))
addedsim.read_oneresult()
logging.info(_(' Coloring the map'))
addedsim.set_currentview()
logging.info(_('Simulation {} opened and added !'.format(mydata['path'].name)))
elif shift and ctrl:
# Ctrl + Shift pressed
# Extract IC from a specific result and save it the current simulation
logging.info(_('Extracting IC from a specific result and save it the current simulation'))
res_path = mydata['path'] / 'simul_gpu_results'
if res_path.exists():
store = ResultsStore(res_path, mode='r')
dlg = wx.SingleChoiceDialog(None, _('Choose a result'), _('Results'), [str(cur) for cur in range(1, store.nb_results+1)])
ret = dlg.ShowModal()
if ret != wx.ID_OK:
dlg.Destroy()
return
idx = int(dlg.GetSelection())
dlg.Destroy()
sim = SimpleSimulation.load(mydata['path'])
sim.write_initial_condition_from_record(res_path, idx, mydata['path'])
if self._parent.filter_independent:
logging.info(_('Filtering independent zones'))
self.filter_independent_zones(1, mydata['path'])
else:
logging.info(_('No filtering applied'))
logging.info(_('IC extracted and saved !'))
elif ctrl and alt:
# ctrl + alt
# Extract last result and save it as IC for the current simulation
logging.info(_('Extracting last result and save it as IC for the current simulation'))
res_path = mydata['path'] / 'simul_gpu_results'
if res_path.exists():
store = ResultsStore(res_path, mode='r')
idx = store.nb_results
sim = SimpleSimulation.load(mydata['path'])
sim.write_initial_condition_from_record(res_path, idx, mydata['path'])
if self._parent.filter_independent:
logging.info(_('Filtering independent zones'))
self.filter_independent_zones(1, mydata['path'])
else:
logging.info(_('No filtering applied'))
logging.info(_('IC extracted and saved !'))
elif shift:
# shift pressed
# show the parameters of the simulation
logging.info(_('Opening parameters for simulation {}'.format(mydata['path'].name)))
sim = SimpleSimulation.load(mydata['path'])
wp = sim.to_wolfparam()
self._wp[sim] = wp
wp.set_callbacks(self._callbackwp, self._callbackwp_destroy)
wp._set_gui(title='Parameters for simulation {}'.format(mydata['path'].name), toShow=False)
wp.hide_selected_buttons()
wp.Show()
elif alt:
# alt pressed
# Extract last result and save it as General Initial conditions
logging.info(_('Extracting last result and save it as General Initial conditions'))
res_path = mydata['path'] / 'simul_gpu_results'
if res_path.exists():
sim = SimpleSimulation.load(mydata['path'])
destpath = self._parent.workingdir / INITIAL_CONDITIONS / mydata['path'].name.replace('sim_', '')
sim.write_initial_condition_from_record(res_path, None, destpath)
if self._parent.filter_independent:
logging.info(_('Filtering independent zones'))
self.filter_independent_zones(1, destpath)
else:
logging.info(_('No filtering applied'))
logging.info(_('IC extracted and saved !'))
else:
self._txtctrl.write(_('\n\n CTRL + double click to open the simulation results in the UI'))
self._txtctrl.write(_('\n SHIFT + double click to edit simulation parameters in the UI'))
self._txtctrl.write(_('\n ALT + double click to extract last result as general initial conditions'))
self._txtctrl.write(_('\n CTRL + ALT + double click to extract last result as initial conditions and update the simulation'))
self._txtctrl.write(_('\n CTRL + SHIFT + double click to extract a specific result as initial conditions and update the simulation'))
elif isinstance(mydata, list):
def allfiles(curlist):
""" Get all files from a list of files """
allfiles = '\n'
for curfile in curlist:
allfiles+= curfile.name +'\n'
return allfiles
self._txtctrl.write(_('Yous have selected a list : {}'.format(allfiles(mydata))))
elif isinstance(mydata, Path):
self._txtctrl.write(_('Yous have selected : {} \n\n'.format(str(mydata))))
if mydata.name.endswith(GPU_2D_file_extensions.PY.value):
# script Python
self._txtctrl.write(_('\n\n CTRL+ double click to open the script in the viewer (not an editor !)'))
# Name of the item and its parent
nameparent = self._treelist.GetItemText(myparent).lower()
nameitem = self._treelist.GetItemText(myitem).lower()
if nameparent == WOLF_UPDATE.lower() or nameparent == OTHER_SCRIPTS.lower() or nameparent == WOLF_BC.lower() :
# script
if ctrl :
# CTRL pressed
# - Open the script in the editor
self._open_script(mydata, nameparent in [WOLF_UPDATE.lower(), WOLF_BC.lower()])
elif mydata.name.endswith(GPU_2D_file_extensions.JSON.value):
# fichier de paramètres
if ctrl :
# script
with open(mydata, 'r', encoding='utf-8') as file:
txt = json.load(file)
self._txtctrl.write(str(txt))
else:
self._txtctrl.write(_('\n\n CTRL+ double click to open the json file in the viewer (not an editor !)'))
elif mydata.name.endswith(GPU_2D_file_extensions.TIF.value) or mydata.name.endswith(GPU_2D_file_extensions.NPY.value) or mydata.name.endswith(GPU_2D_file_extensions.BIN.value):
# proposition de chargement dans l'UI (pas de message si CTRL+double click)
self._txtctrl.write(str(self._parent._get_header(mydata)))
if self._parent.mapviewer is not None:
if not ctrl:
dlg = wx.MessageDialog(None, _('Do you want to load the file in the mapviewer ?'), _('Load file'), wx.YES_NO)
ret = dlg.ShowModal()
dlg.Destroy()
if ret != wx.ID_YES:
return
myarray = WolfArray(str(mydata), srcheader=self._parent._header, nullvalue=99999., idx=str(mydata.name))
ids = self._parent.mapviewer.get_list_keys(draw_type.ARRAYS)
newid = str(mydata.name)
while newid in ids:
newid = newid + '_'
self._parent.mapviewer.add_object('array', newobj=myarray, id=newid)
elif mydata.name.endswith(GPU_2D_file_extensions.TXT.value):
# Chragement d'un hydrogramme
try:
hydro, fig, ax = self._parent.load_hydrograph(mydata, ctrl)
if hydro._data.empty:
with open(mydata, 'r', encoding='utf-8') as file:
txt = file.read()
self._txtctrl.write(txt)
else:
self._txtctrl.write(_('There are {} columns\n\n'.format(len(hydro._data.columns))))
for idx, curcol in enumerate(hydro._data.columns):
self._txtctrl.write(' - {} has index \t{} \tin the infiltration array'.format(curcol, idx+1) + '\n')
self._txtctrl.write('\n\n')
self._txtctrl.write(_('Time'))
for curcol in hydro._data.columns:
self._txtctrl.write('\t'+curcol)
self._txtctrl.write('\n')
if hydro._data.shape[0]>50:
logging.warning(_('Too many lines in the hydrograph -- only the first 50 will be displayed !'))
for idx, curline in hydro._data.iloc[:50].iterrows():
self._txtctrl.write(str(idx))
for curval in curline.values:
self._txtctrl.write('\t' + str(curval))
self._txtctrl.write('\n')
else:
for idx, curline in hydro._data.iterrows():
self._txtctrl.write(str(idx))
for curval in curline.values:
self._txtctrl.write('\t' + str(curval))
self._txtctrl.write('\n')
except:
with open(mydata, 'r', encoding='utf-8') as file:
txt = file.read()
self._txtctrl.write(txt)
[docs]
def filter_independent_zones(self, n_largest:int = 1, icpath:Path=None):
"""
Filtre des zones indépendantes et conservation des n plus grandes
"""
from scipy.ndimage import label, sum_labels
waterdepth = icpath / 'h.npy'
waterdepth = np.load(waterdepth)
np.save(icpath / '_h_before_filtering.npy', waterdepth)
# labellisation
labeled_array = waterdepth.copy()
labeled_array[np.where(waterdepth<self._parent.epsilon)] = 0
labeled_array, num_features = label(labeled_array)
longueur = []
longueur = list(sum_labels(np.ones(labeled_array.shape, dtype=np.int32), labeled_array, range(1, num_features+1)))
longueur = [[longueur[j], j+1] for j in range(0, num_features)]
longueur.sort(key=lambda x: x[0], reverse=True)
newh = np.zeros_like(waterdepth)
for j in range(0,n_largest):
newh[labeled_array == longueur[j][1]] = waterdepth[labeled_array == longueur[j][1]]
np.save(icpath / 'h.npy', newh)
qx = icpath / 'qx.npy'
if qx.exists():
qx = np.load(qx)
np.save(icpath / '_qx_before_filtering.npy', qx)
qx[newh==0.]=0.
np.save(icpath / 'qx.npy', qx)
qy = icpath / 'qy.npy'
if qy.exists():
qy = np.load(qy)
np.save(icpath / '_qy_before_filtering.npy', qy)
qy[newh==0.]=0.
np.save(icpath / 'qy.npy', qy)
[docs]
def clear_text(self):
""" Reset txt control"""
self._txtctrl.Clear()
self._txtctrl.SetBackgroundColour(wx.WHITE)
self._txtctrl.SetForegroundColour(wx.BLACK)
[docs]
def _open_script(self, mydata:Path, wolf:bool=False):
""" Open the script in the editor """
if mydata.exists():
with open(mydata, 'r', encoding='utf-8') as file:
txt = file.read()
self.clear_text()
if wolf:
self._txtctrl.SetBackgroundColour(wx.Colour(28,142,62))
self._txtctrl.SetForegroundColour(wx.WHITE)
self._txtctrl.WriteText('WOLF update or BC file\n------------------\n\n')
# self._txtctrl.SetForegroundColour(wx.BLACK)
else:
self._txtctrl.SetBackgroundColour(wx.RED)
self._txtctrl.SetForegroundColour(wx.WHITE)
self._txtctrl.WriteText(' **NOT** WOLF update or BC file\n-----------------------------\n\n')
# self._txtctrl.SetForegroundColour(wx.BLACK)
self._txtctrl.WriteText(txt)
self._frame.Layout()
[docs]
def _append_configs2tree(self, curdict:dict, root:TreeListItem):
""" Ajout des éléments du dictionnaire dans l'arbre sur base de la racine fournie """
for idx, (k,v) in enumerate(curdict.items()):
if isinstance(v, dict):
if isinstance(k,Path):
# on ne garde que le nom du chemin complet
kstr = k.name
else:
kstr = str(k)
if kstr != SUBDIRS and '__' not in kstr:
create = True
if kstr == GPU_2D_file_extensions.PY.value:
# Pas d'ajout de noeud à l'arbre si pas de fichiers .py
create = len(v[WOLF_UPDATE])>0 or len(v[OTHER_SCRIPTS])>0 or len(v[WOLF_BC])>0
if create:
newroot = self._treelist.AppendItem(root, kstr, data = v)
self._append_configs2tree(v, newroot)
elif isinstance(v, list):
if isinstance(k,Path):
# on ne garde que le nom du chemin complet
k=k.name
else:
kstr = str(k)
if len(v)>0:
if (kstr in ALL_EXTENSIONS or kstr == WOLF_UPDATE or kstr == OTHER_SCRIPTS or kstr == WOLF_BC) and (kstr != SUBDIRS):
newroot = self._treelist.AppendItem(root, k, data = v)
for curfile in v:
self._treelist.AppendItem(newroot, curfile.name, data = curfile)