"""
Author: University of Liege, HECE
Date: 2025
Copyright (c) 2025 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
import wx
import numpy as np
import logging
import subprocess
import shutil
import os
import geopandas as gpd
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from matplotlib.backends.backend_wxagg import FigureCanvasWxAgg as FigureCanvas
from matplotlib.backends.backend_wxagg import NavigationToolbar2WxAgg as NavigationToolbar2Wx
from pathlib import Path
from scipy.ndimage import label
from gettext import gettext as _
import rasterio
from shapely.geometry import Polygon
from .acceptability import Base_data_creation, Database_to_raster, Vulnerability, Acceptability
from .acceptability import steps_base_data_creation, steps_vulnerability, steps_acceptability
from .func import Accept_Manager
from ..wolf_array import WolfArray, header_wolf
from ..scenario.config_manager import Config_Manager_2D_GPU
from ..PyDraw import WolfMapViewer, draw_type
from ..Results2DGPU import wolfres2DGPU
from ..PyGui import MapManager
[docs]
def nullvalue_for_hole(WA):
"""
Sets the null value for a WolfArray to 0 (as per the convention in the interpolation routine).
"""
WA.nullvalue = 0.
WA.set_nullvalue_in_mask()
[docs]
def read_export_z_bin(fn_read, fn_write, fn_laststep):
"""
Reads the free surface altitude from a GPU simulation and exports it in binary format.
Inputs:
- fn_read_simu: the simulation file to read.
- fn_laststep: the folder EXTRACTED_LAST_STEP defined in acceptability.
- fn_write: the path to save the output in binary format.
"""
fn_temp = os.path.join(fn_laststep, 'temp')
os.makedirs(fn_temp, exist_ok=True)
wolfres2DGPU_test = wolfres2DGPU(fn_read)
wolfres2DGPU_test.read_oneresult(-1)
wd = wolfres2DGPU_test.get_h_for_block(1)
top = wolfres2DGPU_test.get_top_for_block(1)
nullvalue_for_hole(wd)
nullvalue_for_hole(top)
wd.array = wd.array + top.array
fn_write = fn_write.with_suffix('.bin')
wd.write_all(fn_write)
fn_write = fn_write.with_suffix('.tif')
wd.write_all(fn_write)
top.write_all(fn_write.parent / fn_write.stem / "_dem.tif")
shutil.rmtree(fn_temp)
[docs]
def riverbed_trace(fn_read_simu, fn_output, threshold):
"""
Recognizes the riverbed trace based on a simulation, where water depth above a given threshold is considered part of the riverbed.
Inputs:
- fn_read_simu: the simulation file to read.
- fn_output: the location to save the riverbed trace as a .tiff file.
- threshold: the water depth threshold above which the areas are considered riverbed.
"""
wolfres2DGPU_test = wolfres2DGPU(fn_read_simu)
wolfres2DGPU_test.read_oneresult(-1)
wd = wolfres2DGPU_test.get_h_for_block(1)
wd.array[wd.array > 1000] = 0
wd.array[wd.array > threshold] = 1
wd.array[wd.array < threshold] = 0
wd.as_WolfArray()
wd.nodata=0
wd.write_all(Path(fn_output))
[docs]
def empty_folder(folder):
"""
Empties the content of a directory if it exists.
"""
if os.path.exists(folder):
for files in os.listdir(folder):
fn = os.path.join(folder, files)
try:
if os.path.isfile(fn) or os.path.islink(fn):
os.unlink(fn)
elif os.path.isdir(fn):
shutil.rmtree(fn)
except Exception as e:
print(f"Error when deleting file {fn}: {e}")
else:
print("The folder does not exist.")
"""
This script performs two main operations:
1. Subtraction of two raster TIFF files:
- Identifies areas with building traces by subtracting the `bathymetry.tif` from simulations
(corresponding to 'MNT_muret_bati') from `MNT` (DEM).
2. For the identified building areas (from step 1):
- Replace the values with those from the `MNT` (ground level), ensuring it reflects the terrain, not bathymetry values.
Final Output:
- The mask should highlight building traces with corresponding DEM (`MNT`) values ("ground") inside, and its name must start with "MNT_" and include "mask" in it.
Note: the computations are perfomed with tifs .tif rasters but should be translated to .bin files in the acceptability routine
"""
#----------------------------------------------------------------------------------------------------------
#1 - Soustraction bathymetry.tif (from simulations) - DEM (MNT, cfr "projet tuilage") ---------------------
[docs]
def soustraction(fn_a,fn_b,fn_result):
with rasterio.open(fn_a) as src_a, rasterio.open(fn_b) as src_b:
if (
src_a.width != src_b.width or
src_a.height != src_b.height or
src_a.transform != src_b.transform or
src_a.crs != src_b.crs
):
logging.error(f"{fn_a} and {fn_b} do not have the same properties, please edit them.")
data_a = src_a.read(1)
data_b = src_b.read(1)
#(A - B)
data_diff = data_a - data_b
nodata_value = src_a.nodata if src_a.nodata == src_b.nodata else None
if nodata_value is not None:
data_diff[(data_a == nodata_value) | (data_b == nodata_value)] = nodata_value
data_diff[data_diff > 5000] = 0
labeled, n = label(data_diff)
# Remove small objects
threshold = 5
sizes = np.bincount(labeled.ravel())
idx_small = np.where(sizes <= threshold)[0]
data_diff[np.isin(labeled, idx_small)] = 0
out_meta = src_a.meta.copy()
out_meta.update({
"dtype": "float32",
"driver": "GTiff"
})
with rasterio.open(fn_result, "w", **out_meta) as dst:
dst.write(data_diff, 1)
#2 - DEM (MNT) value in the buildings traces ------------------------------------------------------------------
[docs]
def mask_creation_data(mask_file, ground_file, output_file):
with rasterio.open(mask_file) as mask_src:
mask = mask_src.read(1)
mask_meta = mask_src.meta
indices = np.where(mask > 0)
with rasterio.open(ground_file) as bathy_src:
bathy = bathy_src.read(1)
mask[indices] = bathy[indices]
mask[mask <= 0] = 9999.
output_meta = mask_meta.copy()
output_meta.update({"dtype": 'float32'})
with rasterio.open(output_file, "w", **output_meta) as dst:
dst.write(mask, 1)
WA_mask = WolfArray(output_file)
WA_mask.write_all(Path(Path(output_file).parent / "MNT_computed_with_mask.bin"))
[docs]
def MTN_And_mask_creation_all(fn_bathy, fn_mtn_cropped, fn_where_buildings, fn_mask_final):
#couper_raster()
soustraction(fn_bathy, fn_mtn_cropped, fn_where_buildings)
mask_creation_data(fn_where_buildings, fn_mtn_cropped, fn_mask_final)
#--------------------------------------------------------------------------------------------------------------
[docs]
def create_shapefile_from_prop_tif(fn_tif, shapefile_path):
"""
Creates a shapefile of the study area based on the extent of an input TIFF file.
Inputs:
- fn_tif: the path to the input TIFF file.
- shapefile_path: the location to save the output shapefile.
"""
_,_,width,height,_,_ = get_header_info(fn_tif)
transform, crs = get_transform_and_crs(fn_tif)
top_left = transform * (0, 0)
bottom_left = transform * (0, height)
top_right = transform * (width, 0)
bottom_right = transform * (width, height)
rectangle = Polygon([top_left, top_right, bottom_right, bottom_left, top_left])
gdf = gpd.GeoDataFrame({'geometry': [rectangle]})
gdf.set_crs(crs, allow_override=True, inplace=True)
gdf.to_file(shapefile_path)
[docs]
def search_for_modif_bath_and_copy(main_gpu, from_path, path_vuln):
"""
When loading gpu simulations for last step extraction, search for modified bath_ topography file, according to
the structure coded in the scenarios manager. If they exist, their extent is copied to CHANGE_VULNE, called vuln_ and
MNTmodifs_, to enable the user to modify it later. In addition, returns True if such files exist and False if they do not.
"""
found_bath = False
scen_manager = Config_Manager_2D_GPU(main_gpu, create_ui_if_wx=False)
curtree = scen_manager.get_tree(from_path)
curdicts = scen_manager.get_dicts(curtree)
all_tif_bath = [scen_manager._select_tif_partname(curdict, 'bath_') for curdict in curdicts]
all_tif_bath = [curel for curlist in all_tif_bath if len(curlist)>0 for curel in curlist if curel.name.startswith('bath_')]
if len(all_tif_bath) :
found_bath = True
for tif_file in all_tif_bath:
found_bath = True
with rasterio.open(tif_file) as src:
#vuln_ files
metadata = src.meta.copy()
metadata.update(dtype=rasterio.uint8, count=1, nodata=0)
data = np.ones((metadata['height'], metadata['width']), dtype=rasterio.uint8)
output_file = path_vuln / tif_file.name.replace('bath_', 'vuln_')
with rasterio.open(output_file, 'w', **metadata) as dst:
dst.write(data, 1)
#MNTmodifs_ files
metadata.update(dtype=rasterio.float32, count=1, nodata=0)
data = np.ones((metadata['height'], metadata['width']), dtype=rasterio.float32)
output_file = path_vuln / tif_file.name.replace('bath_', 'MNTmodifs_')
with rasterio.open(output_file, 'w', **metadata) as dst:
dst.write(data, 1)
return found_bath
[docs]
def mapviewer_display(list_path):
""" Load the output in the mapviewer on WOLF """
results = " and ".join(Path(path).name for path in list_path)
dlg = wx.MessageDialog(None, _(f'Do you want to load {results} in the mapviewer ?'), _('Load file'), wx.YES_NO)
ret = dlg.ShowModal()
dlg.Destroy()
if ret != wx.ID_YES:
return
mapviewer = WolfMapViewer(title="OUTPUT Acceptability manager")
for path in list_path:
myarray = WolfArray(path)
newid = Path(path).name
mapviewer.add_object('array', newobj=myarray, id=newid)
logging.info("Press F5 to refresh the mapviewer.")
mapviewer.Refresh()
[docs]
class AcceptabilityGui(wx.Frame):
""" The main frame for the vulnerability/acceptability computation """
def __init__(self, parent=None, width=1024, height=500):
super(wx.Frame, self).__init__(parent, title='Acceptability score manager', size=(width, height))
self._manager = None
self._mapviewer = None
self.InitUI()
@property
[docs]
def mapviewer(self):
return self._mapviewer
@mapviewer.setter
def mapviewer(self, value):
from ..PyDraw import WolfMapViewer
if not isinstance(value, WolfMapViewer):
raise TypeError("The mapviewer must be a WolfMapViewer")
self._mapviewer = value
[docs]
def OnHoverEnter(self, event):
"""Dynamic colour layout 1"""
self._but_creation.SetBackgroundColour(wx.Colour(100,100,100))
self._but_creation.Refresh()
event.Skip()
[docs]
def OnHoverLeave(self, event):
"""Dynamic colour layout 2"""
self._but_creation.SetBackgroundColour(wx.Colour(150,150,150))
self._but_creation.Refresh()
event.Skip()
[docs]
def layout(self, self_fct):
"""Update the layers for the main buttons"""
font = self_fct.GetFont()
font.SetWeight(wx.FONTWEIGHT_BOLD)
self_fct.SetFont(font)
self_fct.SetBackgroundColour(wx.Colour(150,150,150))
self_fct.Bind(wx.EVT_ENTER_WINDOW, self.OnHoverEnter)
self_fct.Bind(wx.EVT_LEAVE_WINDOW, self.OnHoverLeave)
[docs]
def layout_listbox(self, self_fct):
"""Changes the layout for the listbox : light grey."""
self_fct.SetBackgroundColour(wx.Colour(220, 220, 220))
[docs]
def InitUI(self):
self.gpu_bathy = None
self.maindir = None
sizer_hor_main = wx.BoxSizer(wx.HORIZONTAL)
sizer_vert1 = wx.BoxSizer(wx.VERTICAL)
sizer_hor_threads = wx.BoxSizer(wx.HORIZONTAL)
sizer_hor1 = wx.BoxSizer(wx.HORIZONTAL)
sizer_hor1_1 = wx.BoxSizer(wx.HORIZONTAL)
sizer_hor2 = wx.BoxSizer(wx.HORIZONTAL)
sizer_hor3 = wx.BoxSizer(wx.HORIZONTAL)
sizer_hor4 = wx.BoxSizer(wx.HORIZONTAL)
sizer_hor_scen = wx.BoxSizer(wx.HORIZONTAL)
# 1st LINE - Loading acceptability folder
panel = wx.Panel(self)
self._but_maindir = wx.Button(panel, label='Main Directory')
self._but_maindir.SetToolTip("To indicate where the main acceptability\n folder is located.")
self._but_maindir.SetFont(wx.Font(9, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD))
self._but_maindir.Bind(wx.EVT_BUTTON, self.OnMainDir)
self._listbox_studyarea = wx.ListBox(panel, choices=[], style=wx.LB_SINGLE)
self.layout_listbox(self._listbox_studyarea)
self._listbox_studyarea.Bind(wx.EVT_LISTBOX, self.OnStudyArea)
self._listbox_studyarea.SetToolTip("Choose the study area existed in the folder.")
self._listbox_scenario = wx.ListBox(panel, choices=[], style=wx.LB_SINGLE)
self.layout_listbox(self._listbox_scenario)
self._listbox_scenario.Bind(wx.EVT_LISTBOX, self.OnScenario)
self._listbox_scenario.SetToolTip("Choose the acceptability scenario.")
sizer_ver_small = wx.BoxSizer(wx.VERTICAL)
self._but_checkfiles = wx.Button(panel, label='Check structure')
self._but_checkfiles.Bind(wx.EVT_BUTTON, self.OnCheckFiles)
self._but_checkfiles.SetToolTip("Checks if the folder is correctly structured\n with INPUT, TEMP, OUTPUT.")
self._but_checksim = wx.Button(panel, label='Check simulations')
self._but_checksim.SetToolTip("Displays the loaded simulations, interpolated in INTERP_WD.")
self._but_checksim.Bind(wx.EVT_BUTTON, self.OnHydrodynInput)
self._but_checkpond= wx.Button(panel, label='Check ponderation')
self._but_checkpond.Bind(wx.EVT_BUTTON, self.OnCheckPond)
self._but_checkpond.SetToolTip("Displays a graph of the computed weighting coefficient\n of the final acceptability computations.")
# 2nd LINE - Hydrodynamic part
self._but_loadgpu = wx.Button(panel, label='Load new\n hydraulic scenarios')
self._but_loadgpu.SetToolTip("To load or change the hydraulic simulations")
self._but_loadgpu.SetFont(wx.Font(9, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD))
self._but_loadgpu.Bind(wx.EVT_BUTTON, self.OnLoadingSimu)
sizer_hor1_1.Add(self._but_loadgpu, 1, wx.ALL | wx.EXPAND, 0)
self._check_listbox = wx.CheckListBox(panel, choices=[], style=wx.LB_MULTIPLE | wx.CHK_CHECKED)
self.layout_listbox(self._check_listbox)
self.sims = {}
sizer_hor1_1.Add(self._check_listbox, 1, wx.ALL | wx.EXPAND, 0) #ajouter!! sinon s'affiche pas
self._but_DEM = wx.Button(panel, label='Check DEM inputs\n for interpolation')
self._but_DEM.SetToolTip("To display the existing DEM input for the interpolation of the simulated free surfaces.")
self._but_DEM.Bind(wx.EVT_BUTTON, self.OnDEM)
sizer_hor1_1.Add(self._but_DEM, 1, wx.ALL | wx.EXPAND, 0)
self._but_extrinterp = wx.Button(panel, label='Reading and interpolating\n free surface')
self._but_extrinterp.SetToolTip("To read the simulation, and created the hydraulic input for\n acceptability (interpolated simulated free surfaces)")
self._but_extrinterp.Bind(wx.EVT_BUTTON, self.OnInterpolation)
sizer_hor1_1.Add(self._but_extrinterp, 1, wx.ALL | wx.EXPAND, 0)
sizer_hor1.Add(self._but_maindir, 2, wx.ALL | wx.EXPAND, 0)
sizer_hor1.Add(self._listbox_studyarea, 1, wx.ALL | wx.EXPAND, 0)
sizer_hor1.Add(self._listbox_scenario, 1, wx.ALL | wx.EXPAND, 0)
sizer_hor1.Add(sizer_ver_small, 0, wx.ALL | wx.EXPAND, 5)
#3rd line
sizer_hor_threads = wx.BoxSizer(wx.HORIZONTAL)
text_dx = wx.StaticText(panel, label='Resolution (dx,dy) [m]:')
self.input_dx = wx.StaticText(panel)
self.input_dx.SetMinSize((80, -1))
text_nbxy = wx.StaticText(panel, label='(nbx, nby):')
self.input_nbxy = wx.StaticText(panel)
self.input_nbxy.SetMinSize((90, -1))
text_O = wx.StaticText(panel, label='Origin (X,Y):')
self.input_O = wx.StaticText(panel)
self.input_O.SetMinSize((170, -1))
text_threads = wx.StaticText(panel, label='Number of threads:')
self._nb_process = wx.SpinCtrl(panel, value=str(os.cpu_count()), min=1, max=os.cpu_count())
self._nb_process.SetToolTip("Number of threads to be used in the computations.")
sizer_hor_threads.Add(text_dx, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1)
sizer_hor_threads.Add(self.input_dx, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1)
sizer_hor_threads.Add(text_nbxy, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1)
sizer_hor_threads.Add(self.input_nbxy, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1)
sizer_hor_threads.Add(text_O, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1)
sizer_hor_threads.Add(self.input_O, 1, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1)
sizer_hor_threads.Add(text_threads, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1)
sizer_hor_threads.Add(self._nb_process, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1)
# 3 last lines + scenarios
#--------------------------
self._but_creation = wx.Button(panel, label='DataBase Creation')
self.layout(self._but_creation)
self._but_creation.Bind(wx.EVT_BUTTON, self.OnCreation)
self._steps_db = wx.CheckListBox(panel, choices=steps_base_data_creation.get_list_names(), style=wx.LB_MULTIPLE | wx.CHK_CHECKED)
self._but_vulnerability = wx.Button(panel, label='Vulnerability')
self.layout(self._but_vulnerability)
self._but_vulnerability.Bind(wx.EVT_BUTTON, self.OnVulnerability)
step_Vuln_without_withoutscenarios = [item for item in steps_vulnerability.get_list_names() if item != 'APPLY_SCENARIOSVULN - 4']
self._steps_vulnerability = wx.CheckListBox(panel, choices=step_Vuln_without_withoutscenarios, style=wx.LB_MULTIPLE | wx.CHK_CHECKED)
# Scenarios specifics --
self._but_checkscenario = wx.Button(panel, label='Check existing scenarios')
self._but_checkscenario.SetToolTip("To display the scenario to be taken into account in CHANGE_VULNE.")
self._but_checkscenario.Bind(wx.EVT_BUTTON, self.OnCheckScenario)
self._but_upriverbed = wx.Button(panel, label="Update riverbed")
self._but_upriverbed.SetToolTip("To create the raster of the riverbed trace.")
self._but_upriverbed.Bind(wx.EVT_BUTTON, self.on_button_click)
self.menu = wx.Menu()
self.menu.Append(1, "File of riverbed trace exists.")
self.menu.Append(2, "Point to a low discharge simulation and calculate the riverbed trace.")
self.menu.Bind(wx.EVT_MENU, self.on_menu_click)
self._but_toggle_scen = wx.ToggleButton(panel, label="Accounting for scenarios")
self._but_toggle_scen.SetToolTip("To be activated to surimpose the vuln_ files, \n and so to take into account scenarios")
self.toggle_state = False
self._but_toggle_scen.Bind(wx.EVT_TOGGLEBUTTON, self.OnToggle)
sizer_hor_scen.Add(self._but_checkscenario, 1, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1)
sizer_hor_scen.Add(self._but_upriverbed, 1, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1)
sizer_hor_scen.Add(self._but_toggle_scen, 1, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1)
self._but_toggle_resamp = wx.ToggleButton(panel, label="Resampling [m]:")
self._but_toggle_resamp.SetToolTip("To compute the final raster with a coarser resolution than the original one.")
self.toggle_resamp_state = False
self._but_toggle_resamp.Bind(wx.EVT_TOGGLEBUTTON, self.OnToggleResampling)
sizer_hor_scen.Add(self._but_toggle_resamp, flag=wx.ALIGN_CENTER | wx.TOP)
self._but_resampling = wx.SpinCtrl(panel, value="100", min=1, max=1000)
sizer_hor_scen.Add(self._but_resampling, flag=wx.ALIGN_CENTER | wx.TOP)
#--
self._but_acceptability = wx.Button(panel, label='Acceptability')
self.layout(self._but_acceptability)
self._but_acceptability.Bind(wx.EVT_BUTTON, self.OnAcceptability)
step_without_withoutscenarios = [item for item in steps_acceptability.get_list_names() if item != 'COMPUTE_WITH_SCENARIOS - 5']
step_without_withoutscenarios = [item for item in step_without_withoutscenarios if item != 'RESAMPLING - 6']
self._steps_acceptability = wx.CheckListBox(panel, choices=step_without_withoutscenarios, style=wx.LB_MULTIPLE | wx.CHK_CHECKED)
sizer_hor2.Add(self._but_creation, 1, wx.ALL | wx.EXPAND, 0)
sizer_hor2.Add(self._steps_db, 1, wx.ALL | wx.EXPAND, 0)
sizer_hor3.Add(self._but_vulnerability, 1, wx.ALL | wx.EXPAND, 0)
sizer_hor3.Add(self._steps_vulnerability, 1, wx.ALL | wx.EXPAND, 0)
sizer_hor4.Add(self._but_acceptability, 1, wx.ALL | wx.EXPAND, 0)
sizer_hor4.Add(self._steps_acceptability, 1, wx.ALL | wx.EXPAND, 0)
#Lines order
sizer_ver_small.Add(self._but_checkfiles, 0, wx.ALL | wx.EXPAND, 1)
sizer_ver_small.Add(self._but_checksim, 0, wx.ALL | wx.EXPAND, 1)
sizer_ver_small.Add(self._but_checkpond, 0, wx.ALL | wx.EXPAND, 1)
sizer_vert1.Add(sizer_hor1, 1, wx.EXPAND, 0)
sizer_vert1.Add(sizer_hor1_1, 1, wx.EXPAND, 0)
sizer_vert1.Add(sizer_hor_threads, 0, wx.EXPAND, 0)
sizer_vert1.Add(sizer_hor2, 1, wx.EXPAND, 0)
sizer_vert1.Add(sizer_hor_scen, 1, wx.EXPAND, 0)
sizer_vert1.Add(sizer_hor3, 1, wx.EXPAND, 0)
sizer_vert1.Add(sizer_hor4, 1, wx.EXPAND, 0)
sizer_hor_main.Add(sizer_vert1, proportion=1, flag=wx.EXPAND, border=0)
#Disabled if Main Directory + SA + Scenario not selected
self._but_acceptability.Enable(False)
self._but_vulnerability.Enable(False)
self._but_creation.Enable(False)
self._but_checkfiles.Enable(False)
self._but_DEM.Enable(False)
self._but_extrinterp.Enable(False)
self._but_toggle_scen.Enable(False)
self._but_toggle_resamp.Enable(False)
self._but_upriverbed.Enable(False)
self._but_checkpond.Enable(False)
self._but_checkscenario.Enable(False)
self._but_checksim.Enable(False)
self._but_creation.Enable(False)
self._but_loadgpu.Enable(False)
panel.SetSizer(sizer_hor_main)
panel.Layout()
[docs]
def OnSims(self, e:wx.ListEvent):
""" Load sim into the mapviewer """
pass
[docs]
def OnSimsDBLClick(self, e:wx.ListEvent):
""" Load sim into the mapviewer """
if self.mapviewer is None:
return
from ..PyDraw import draw_type
idx_sim = e.GetSelection()
tmppath = self._manager.get_filepath_for_return_period(self._manager.get_return_periods()[idx_sim])
if tmppath.stem not in self.mapviewer.get_list_keys(drawing_type=draw_type.ARRAYS):
self.mapviewer.add_object('array', filename=str(tmppath), id=tmppath.stem)
self.mapviewer.Refresh()
[docs]
def OnCheckFiles(self, e):
""" Check the files """
if self._manager is None:
logging.error("No main directory selected -- Nothing to check")
return
i=self._manager.check_inputs()
if i == False :
logging.error(f"Missing files in INPUT. Please provide them by following the right structure.")
with wx.MessageDialog(self, f"Missing files in INPUT. Inputs can not be created automatically : you must provide them.\n Please read the logs and terminal to see the missing ones.", "Error", wx.OK | wx.ICON_ERROR) as dlg:
dlg.ShowModal()
return
else :
if (self._manager._study_area is None) or (self._manager._scenario is None):
logging.error(f"No study area and/or scenario selected, no check of TEMP and OUTPUT.")
with wx.MessageDialog(self, f"INPUT is well structured, but TEMP and OUTPUT have not been checked because there is no study area and scenario selected.", "Checking", wx.OK | wx.ICON_INFORMATION) as dlg:
dlg.ShowModal()
else:
logging.info(f"The folder is well structured.")
t=self._manager.check_temporary()
o=self._manager.check_outputs()
with wx.MessageDialog(self, f"Main directory is checked.\nINPUT is well structured, and TEMP and OUTPUT have been checked. If folders were missing, they have been created\nMain directory at {self.maindir}", "Checking", wx.OK | wx.ICON_INFORMATION) as dlg:
dlg.ShowModal()
[docs]
def OnCheckPond(self,e):
ponds = self._manager.get_ponderations()
if isinstance(ponds, pd.DataFrame):
logging.info(f"Plotting the coefficients graph.")
ponds.plot(kind='bar', color='gray', edgecolor='black')
plt.ylabel("Weighting coefficients [-]")
plt.xlabel("Return period [years]")
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()
else:
with wx.MessageDialog(self,
"No coefficients computed, because no return period found in the interpolated simulation folder. Try after loading gpu simulations",
"Checking",
style=wx.OK | wx.ICON_INFORMATION) as dlg:
dlg.ShowModal()
[docs]
def OnMainDir(self, e):
"""Selects the main directory to be read."""
vanish_info_header(self.input_dx,self.input_nbxy,self.input_O)
with wx.DirDialog(self, "Choose the main directory containing the data (folders INPUT, TEMP and OUTPUT):",
style=wx.DD_DEFAULT_STYLE
) as dlg:
if dlg.ShowModal() == wx.ID_OK:
self._manager = Accept_Manager(dlg.GetPath())
self.maindir=dlg.GetPath()
folders = ["INPUT", "TEMP", "OUTPUT"]
if all(os.path.isdir(os.path.join(self.maindir, folder)) for folder in folders) == False:
logging.info("Folder not loaded (incorrect structure).")
wx.MessageBox(
f"Missing folders among INPUT, TEMP and OUTPUT. Please organize correctly this folder.",
"Error",
wx.OK | wx.ICON_ERROR
)
return
self._but_acceptability.Enable(True)
self._but_vulnerability.Enable(True)
self._but_creation.Enable(True)
self._but_checkfiles.Enable(True)
self._but_toggle_scen.Enable(True)
self._but_toggle_resamp.Enable(True)
self._but_upriverbed.Enable(True)
self._but_checkscenario.Enable(True)
self._but_checkpond.Enable(True)
self._but_checksim.Enable(True)
self._but_loadgpu.Enable(True)
self._listbox_scenario.Clear()
studyareas = self._manager.get_list_studyareas()
if len(studyareas) == 0 :
logging.info("Folder loaded but no study areas found in the folder (INPUT/STUDY_AREA). Please use the button to load hydraulic simulations in the manager.")
return
self._listbox_studyarea.Clear()
self._listbox_studyarea.InsertItems(studyareas, 0)
logging.info("All the files are present")
else:
return
[docs]
def OnStudyArea(self, e):
""" Change the study area """
if self._manager is None:
return
vanish_info_header(self.input_dx,self.input_nbxy,self.input_O)
self._listbox_scenario.Clear()
study_area:str = self._manager.get_list_studyareas(with_suffix=True)[e.GetSelection()]
self._manager.change_studyarea(study_area)
sc = self._manager.get_list_scenarios()
if len(sc)!=0:
self._listbox_scenario.InsertItems(sc, 0)
else :
logging.error("No scenario available associated with this study area.")
if self.mapviewer is not None:
tmp_path = self._manager.IN_STUDY_AREA / study_area
from ..PyDraw import draw_type
if not tmp_path.stem in self.mapviewer.get_list_keys(drawing_type=draw_type.VECTORS):
self.mapviewer.add_object('vector', filename=str(tmp_path), id=tmp_path.stem)
self.mapviewer.Refresh()
[docs]
def OnScenario(self, e):
""" Change the scenario """
if self._manager is None:
return
scenario = self._manager.get_list_scenarios()[e.GetSelection()]
self._manager.change_scenario(scenario)
create_INPUT_TEMP_OUTPUT_forScenario(self.maindir, self._manager.Study_area, self._manager.scenario, None)
update_info_header(self.input_dx,self.input_nbxy,self.input_O,self._manager.IN_SA_INTERP)
[docs]
def OnCreation(self, e):
""" Create the database """
if self._manager is None:
return
dx,_,_,_,_,_ = update_info_header(self.input_dx,self.input_nbxy,self.input_O,self._manager.IN_SA_INTERP)
resolution = dx
if resolution == '':
wx.MessageBox(
f"There are no files in INTERP_WD lease, use first the buttons at the second line.",
"Attention",
wx.OK | wx.ICON_ERROR
)
else :
steps = list(self._steps_db.GetCheckedStrings())
steps = [int(cur.split('-')[1]) for cur in steps]
if len(steps) != 0:
wx.MessageBox(
f"The database will now be created, with a resolution of {dx}. This process may take some time, and the window may temporarily stop responding.",
"Information",
wx.OK | wx.ICON_INFORMATION
)
Base_data_creation(self._manager.main_dir,
Study_area=self._manager.Study_area,
number_procs=self._nb_process.GetValue(),
resolution=dx,
steps=steps)
wx.MessageBox(
"The database is created with the selected steps.",
"Information",
wx.OK | wx.ICON_INFORMATION
)
else :
wx.MessageBox(
f"No steps selected. The code for the DataBase creation will consider all steps by default, , with a resolution of {dx}. This process may take some time, and the window may temporarily stop responding.",
"Information",
wx.OK | wx.ICON_INFORMATION
)
Base_data_creation(self._manager.main_dir,
Study_area=self._manager.Study_area,
number_procs=self._nb_process.GetValue(),
resolution=dx)
wx.MessageBox(
"The database is created for every steps.",
"Information",
wx.OK | wx.ICON_INFORMATION
)
[docs]
def OnLoadingSimu(self,e):
""" Link between acceptability and simulations
-Load a hydraulic scenarios from the scenario manager
-Create scenario and study area if needed.
"""
dlg = wx.DirDialog(None, "Please select the main scenario manager folder (containing the scenarios, the folder discharge, the scripts.py...), named after the STUDY AREA.", style=wx.DD_DEFAULT_STYLE)
if dlg.ShowModal() == wx.ID_OK:
main_gpu = Path(dlg.GetPath())
study_area = main_gpu.name
logging.info(f"Selected folder for GPU result such as the STUDY AREA is {study_area}")
dlg = wx.DirDialog(None, "Please select the scenarios folder (containing the 'simulations' folder) of the specific HYDRAULIC SCENARIO.", defaultPath=str(main_gpu), style=wx.DD_DEFAULT_STYLE)
if dlg.ShowModal() == wx.ID_OK:
scenario = Path(dlg.GetPath())
hydraulic_scen=scenario.joinpath("simulations")
scenario=scenario.name
logging.info(f"Selected hydraulic scenario : {scenario}")
create_INPUT_TEMP_OUTPUT_forScenario(self.maindir, study_area, scenario, main_gpu)
self._manager.change_studyarea(study_area+'.shp')
self._manager.change_scenario(scenario)
self._listbox_studyarea.Clear()
self._listbox_studyarea.InsertItems(self._manager.get_list_studyareas(), 0)
self._listbox_scenario.Clear()
self._listbox_scenario.InsertItems(self._manager.get_list_scenarios(), 0)
#Blue color of selection even if not directly clicked :
index_to_select = self._listbox_scenario.FindString(scenario)
if index_to_select != wx.NOT_FOUND:
self._listbox_scenario.SetSelection(index_to_select)
self._listbox_scenario.SetItemBackgroundColour(index_to_select, wx.Colour(0, 120, 215))
index_to_select = self._listbox_studyarea.FindString(study_area)
if index_to_select != wx.NOT_FOUND:
self._listbox_studyarea.SetSelection(index_to_select)
self._listbox_studyarea.SetItemBackgroundColour(index_to_select, wx.Colour(0, 120, 215))
self._listbox_studyarea.Refresh()
self._listbox_scenario.Refresh()
else:
logging.error('No hydraulic scenario selected.')
else:
logging.error('No folder found / selected. Please try again.')
return
self._check_listbox.Clear()
self.sims = {}
for subdir in hydraulic_scen.iterdir():
if subdir.is_dir() and subdir.name.startswith("sim_"):
self.sims[subdir.name] = subdir
else:
logging.info('No folder sim_ found / selected. Please try again.')
self.datadir_simulations = hydraulic_scen
self.file_paths = {Path(sim).name: Path(sim) for sim in sorted(self.sims.keys())}
self._check_listbox.Set(sorted(sim for sim in self.sims.keys()))
logging.info(f"GPU simulations loaded in the checkbox.\n\nPlease select the ones you want to interpolate and use the button 'Reading and interpolating free surface'.")
message = "GPU simulations loaded in the checkbox\n\nPlease select the ones you want to interpolate and use the button 'Reading and interpolating free surface'."
found_bath = search_for_modif_bath_and_copy(Path(main_gpu), Path(hydraulic_scen.parent), self._manager.IN_CH_SA_SC)
if found_bath :
message+= "\nIn addition, modification files for bathymetry (bath_) have been found in the gpu simulations, a copy has been made for a change in the vulnerability and DEM (see vuln_ and MNTmodifs_ in CHANGE_VULNE). Please edit them."
logging.info(f"Modification files for bathymetry (bath_) have been found in the gpu simulations, a copy has been made for a change in the vulnerability and DEM (see vuln_ and MNTmodifs_ in CHANGE_VULNE). Please edit them.")
self.gpu_bathy = hydraulic_scen.parent / "__bathymetry.tif"
self._but_extrinterp.Enable(True)
self._but_DEM.Enable(True)
with wx.MessageDialog(self,
message,
"Information",
style=wx.OK | wx.ICON_INFORMATION) as dlg:
dlg.ShowModal()
[docs]
def OnDEM(self,e):
"""Import and create the inputs for the interpolation routine holes.exe (name including 'MNT_...' and 'MNT_..._with_mask'.
See function MTN_And_mask_creation_all"""
if not hasattr(self, 'file_paths'):
with wx.MessageDialog(self,
f"Please, first load gpu simulations via the previous button.",
"Attention",
style=wx.OK | wx.ICON_ERROR) as dlg:
dlg.ShowModal()
return
path = self._manager.IN_SA_DEM
names_inDEM = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
if len(names_inDEM) != 0 :
dialog = wx.MessageDialog(None, f"The DEM_FILES folder is not empty and contains the files {names_inDEM}. ", "Confirmation", wx.YES_NO | wx.ICON_QUESTION)
dialog.SetYesNoLabels("Delete and reload", "Keep and leave")
response = dialog.ShowModal()
if response == wx.ID_YES:
for file_name in names_inDEM:
file_path = os.path.join(path, file_name)
os.remove(file_path)
logging.info("Files in DEM_FILES deleted.")
else :
logging.info("No update of DEM_FILES.")
return
with wx.FileDialog(self, "Please select the DEM file in .tif format (without modifications).", wildcard="TIFF files (*.tif)|*.tif",
style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) as dlg:
result = dlg.ShowModal()
if result != wx.ID_OK:
return
path_DEM_base = dlg.GetPath()
logging.info("DEM file selected.")
#DEM and masked DEM creation
path = self._manager.IN_CH_SA_SC
names_inCHVUL_MNTmodifs = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) and f.startswith("MNTmodifs_")]
#path_MNT_computed is the path to the DEM with the MNTmodifs if exist, or the given true DEM if not
path_MNT_computed = Path(path_DEM_base)
if len(names_inCHVUL_MNTmodifs) !=0:
path_MNT_computed = Path(self._manager.IN_CH_SA_SC_MNT_tif.with_suffix('.tif'))
dialog = wx.MessageDialog(None, f"Please modify the 'MNTmodifs_' files in INPUT\CHANGE_VULNE\... as in the hydraulic scenario you want to study. They are: {names_inCHVUL_MNTmodifs}", "Confirmation", wx.YES_NO | wx.ICON_QUESTION)
dialog.SetYesNoLabels("Done, continue", "Not done, stop")
response = dialog.ShowModal()
if response == wx.ID_NO:
logging.info("No modifications done in MNTmodifs_ files, process stopped.")
return
#else :
if os.path.exists(self._manager.IN_CH_SA_SC):
existence=False
existence = self._manager.create_vrtIfExists(Path(path_DEM_base), self._manager.IN_CH_SA_SC, self._manager.IN_CH_SA_SC_MNT_VRT, name="MNTmodifs_")
if existence :
self._manager.translate_vrt2tif(self._manager.IN_CH_SA_SC_MNT_VRT, self._manager.IN_CH_SA_SC_MNT_tif)
logging.info(f"Scenarios have been applied to DEM see {self._manager.IN_CH_SA_SC_MNT_tif}.tif.")
WA_mask = WolfArray(self._manager.IN_CH_SA_SC_MNT_tif.with_suffix('.tif'))
WA_mask.write_all(Path(self._manager.IN_SA_DEM / "MNT_loaded.bin"))
else :
logging.info(f"No MNTmodifs_ files in {self._manager.IN_CH_SA_SC}. The given file {path_DEM_base} has not been modified")
WA_mask = WolfArray(path_DEM_base)
WA_mask.write_all(Path(self._manager.IN_SA_DEM / "MNT_loaded.bin"))
else:
logging.error(f"Path {self._manager.IN_CH_SA_SC} does not exist.")
#self._manager.IN_CH_SA_SC_MNT_tif ou fn_mnt_cropped : ground + riverbed
fn_wherebuildings_buffer = self._manager.IN_CH_SA_SC / "buffer_wherebuilding.tif"
fn_mask = self._manager.IN_SA_DEM / "MNT_computed_with_mask.tif"
MTN_And_mask_creation_all(self.gpu_bathy, path_MNT_computed, fn_wherebuildings_buffer, fn_mask)
if fn_wherebuildings_buffer.exists():
fn_wherebuildings_buffer.unlink()
if fn_mask.exists():
fn_mask.unlink()
dlg = wx.MessageDialog(self,
"DEM files created in INPUT\WATER_DEPTH\...\DEM_FILES.",
"Success.",
wx.OK | wx.ICON_INFORMATION)
dlg.ShowModal()
dlg.Destroy()
return
[docs]
def OnInterpolation(self,e):
"""Interpolates the last extracted time steps present in
LAST_STEP_EXTRACTED using the fast marching
interpolation routine holes.exe, by creating a batch file
while performing multiple checks on the required input files."""
if not hasattr(self, 'file_paths'):
with wx.MessageDialog(self,
f"Please, first load gpu simulations via the previous button.",
"Attention",
style=wx.OK | wx.ICON_ERROR) as dlg:
dlg.ShowModal()
return
checked_indices = self._check_listbox.GetCheckedItems()
checked_items = [self._check_listbox.GetString(index) for index in checked_indices]
selected_paths = [self.file_paths[item] for item in checked_items]
path_simulations = self.datadir_simulations
path_LastSteps = Path(self._manager.IN_SA_EXTRACTED)
fn_write = None
dx,dy,nbx,nby,X,Y = False, False, False, False, False, False
for sim_ in selected_paths:
if sim_.name.startswith("sim_"):
self.sims[sim_.name] = sim_
fn_read = Path(path_simulations/ sim_ / "simul_gpu_results")
logging.info(f"Found simulation folder: {sim_}")
parts = sim_.name.split("sim_")
if len(parts) > 1:
name = parts[1]
fn_write = Path(path_LastSteps / name )
dx,dy,nbx,nby,X,Y = display_info_header(self.input_dx, self.input_nbxy, self.input_O, fn_write.with_suffix(".bin"))
read_export_z_bin(fn_read, fn_write, path_LastSteps)
else:
logging.info(f"Please, ensure your simulations are named with the return period, e.g sim_T4")
else:
logging.info('No folder found / selected. Please try again...')
else:
logging.error('No simulation selected! Please select some in the checkbox.')
C = None
D = None
for file in os.listdir(Path(self._manager.IN_SA_DEM)):
file_path = Path(self._manager.IN_SA_DEM) / file
if file_path.is_file() and file.startswith("MNT_") and file_path.suffix == ".bin":
if "mask" not in file:
D = file_path
else:
C = file_path
if D is None:
logging.info("DEM (.bin) not found in DEM_FILES. The file must begins by 'MNT_' and CANNOT include the word 'mask'")
with wx.MessageDialog(self,
f"DEM (.bin) not found in DEM_FILES. The file must begins by 'MNT_' and CANNOT include the word 'mask'",
"Missing file",
style=wx.OK | wx.ICON_INFORMATION) as dlg:
dlg.ShowModal()
return
if C is None:
logging.info("DEM mask (.bin) not found in DEM_FILES. The file must begins by 'MNT_' and MUST include the word 'mask'")
with wx.MessageDialog(self,
f"DEM mask (.bin) not found in DEM_FILES. The file must begins by 'MNT_' and MUST include the word 'mask'",
"Missing file",
style=wx.OK | wx.ICON_INFORMATION) as dlg:
dlg.ShowModal()
return
if not get_header_comparison([fn_write.with_suffix(".bin"), C, D]):
logging.info("Files in DEM_FILES do not have the same properties as the simulations files. Please, fix them.")
with wx.MessageDialog(self,
f"Files in DEM_FILES do not have the same properties as the simulations files. Please, fix them.",
"Error in DEM_FILES files",
style=wx.OK | wx.ICON_INFORMATION) as dlg:
dlg.ShowModal()
return
checked_names = self._check_listbox.GetCheckedStrings()
if not checked_names:
logging.info("No items selected. Adding all paths.")
checked_paths = list(self.file_paths.values())
message_info = "No simulations were checked in the box; so the computations will consider all of them. The interpolation of the given free surface will begin when you press OK, please wait."
else:
logging.info("Adding only the selected simulations.")
checked_paths = [self.file_paths[name] for name in checked_names]
message_info = "The interpolation of the given free surface will begin (for the selected simulation(s)), please wait."
if len(self.file_paths) == 0 :
with wx.MessageDialog(self, f"No files in EXTRACTED_LAST_STEP_WD. Please provide some or use the 'Load gpu simulation' button.",
"OK", wx.OK | wx.ICON_INFORMATION) as dlg:
dlg.ShowModal()
else :
path_Interp = Path(self._manager.IN_SA_INTERP)
path_bat_file = os.path.join(self._manager.IN_SCEN_DIR, "process_files.bat")
if os.path.exists(path_bat_file):
logging.info(f"The file {path_bat_file} already exists and will be replaced.")
os.remove(path_bat_file)
path_code = os.path.join(self._manager.IN_WATER_DEPTH, "holes.exe")
renamed_files = []
if True:
A=[]
for path in checked_paths:
parts = path.name.split("sim_")
A.extend([os.path.join(path_LastSteps, g) for g in os.listdir(path_LastSteps) if g.endswith(f"{parts[1]}.bin")])
B = [os.path.join(path_Interp, os.path.splitext(os.path.basename(f))[0]) for f in A]
if not A or not B or not C or not D:
logging.info("Missing files.")
with wx.MessageDialog(self, f"The interpolation cannot go on, as some files are missing (see logs): please check the DEM_FILES or the EXTRACTED_LAST_STEP_WD folders.",
"Missing files.", wx.OK | wx.ICON_INFORMATION) as dlg:
dlg.ShowModal()
with open(path_bat_file, "w") as bat_file:
for a, b in zip(A, B):
line = f'"{path_code}" filling in="{a}" out="{b}" mask="{C}" dem="{D} avoid_last=1"\n'
bat_file.write(line)
logging.info(message_info)
empty_folder(self._manager.IN_SA_INTERP)
path_bat_file = os.path.join(self._manager.IN_SCEN_DIR, "process_files.bat")
# FORTRAN HOLES.EXE
subprocess.run([path_bat_file], check=True)
path_fichier=self._manager.IN_SA_INTERP
for file in path_fichier.glob("*.tif"):
if "_h" in file.name:
new_name = file.stem.split("_h")[0].replace(".bin", "") + ".tif"
file.rename(file.with_name(new_name))
renamed_files.append(new_name)
#deleting the other
for file in path_fichier.glob("*.tif"):
if "_combl" in file.name or file.name not in renamed_files:
file.unlink()
else:
#Python eikonal model
from ..eikonal import inpaint_array, inpaint_waterlevel
dtm = WolfArray(D)
A=[]
for path in checked_paths:
parts = path.name.split("sim_")
A.extend([os.path.join(path_LastSteps, g) for g in os.listdir(path_LastSteps) if g.endswith(f"{parts[1]}.tif")])
if not A or not C or not D:
logging.info("Missing files.")
with wx.MessageDialog(self, f"The interpolation cannot go on, as some files are missing (see logs): please check the DEM_FILES or the EXTRACTED_LAST_STEP_WD folders.",
"Missing files.", wx.OK | wx.ICON_INFORMATION) as dlg:
dlg.ShowModal()
empty_folder(self._manager.IN_SA_INTERP)
for a in A:
a = Path(a)
in_wa = WolfArray(a)
dem_wa = WolfArray(a.parent / a.stem / "_dem.tif")
_t, _wse, _wd = inpaint_waterlevel(in_wa.array, dem_wa.array.data, dtm.array.data, ignore_last_patches=1)
b = os.path.join(path_Interp, os.path.splitext(os.path.basename(a))[0])
new_name = Path(b).with_suffix('.tif')
renamed_files.append(new_name)
in_wa.array.data[:,:] = _wd
in_wa.array.mask[:,:] = _wd == 0.
in_wa.write_all(new_name)
logging.info("Filling completed.")
with wx.MessageDialog(self, f"Filling completed. Created files : {renamed_files}",
"Redirecting", wx.OK | wx.ICON_INFORMATION) as dlg:
dlg.ShowModal()
update_info_header(self.input_dx,self.input_nbxy,self.input_O,self._manager.IN_SA_INTERP)
[docs]
def OnToggle(self,e):
"""Creates a toggle button to be activated if the scenarios vuln_ have to be taken into account."""
self.toggle_state = False
if self._but_toggle_scen.GetValue():
logging.info("Activating the scenario button.")
self._but_toggle_scen.SetBackgroundColour(wx.Colour(175, 175, 175))
self._but_toggle_scen_state = True
tif_files = [file for file in Path(self._manager.IN_CH_SA_SC).glob("*.tif") if file.name.startswith("vuln_")]
if not tif_files:
wx.MessageBox(
"The scenario button cannot be activated because there is no change in vulnerability 'vuln_' in CHANGE_VULNE. Please reload the simulations containing 'bath_' files via 'Load new hydraulic scenario' and edit it, or directly introduce your 'vuln_' files.",
"Information",
wx.OK | wx.ICON_INFORMATION
)
logging.info("Desactivating the scenario button.")
self._but_toggle_scen.SetValue(False)
self._but_toggle_scen.SetBackgroundColour(wx.NullColour)
self._but_toggle_scen_state = False
else :
self.toggle_state = True
else:
self._but_toggle_scen.SetBackgroundColour(wx.NullColour)
self.toggle_state = False
logging.info("Desactivating the scenario button.")
[docs]
def OnToggleResampling(self,e):
"""Creates a toggle button for the acceptability resampling to be activated."""
self.toggle_resamp_state = False
toggle = self._but_toggle_resamp
if toggle.GetValue():
self._but_toggle_resamp.SetBackgroundColour(wx.Colour(175, 175, 175))
self.toggle_resamp_state = True
logging.info("Resampling activated")
current_res = self._but_resampling.GetValue()
resolution = self.input_dx.GetLabel()
if resolution != '':
values = resolution.strip("()").split(",")
dx = float(values[0])
#selection of size
if current_res < dx:
wx.MessageBox(
"The resampling size cannot be inferior to the resolution. Please select another resampling size.",
"Attention",
wx.OK | wx.ICON_ERROR
)
self.toggle_resamp_state = False
self._but_toggle_resamp.SetValue(False)
self._but_toggle_resamp.SetBackgroundColour(wx.NullColour)
logging.info("Resampling disactivated because of a bad resampling size.")
else :
logging.info(f"Allowed resampling value : {current_res}[m].")
else:
self.toggle_resamp_state = False
self._but_toggle_resamp.SetValue(False)
self._but_toggle_resamp.SetBackgroundColour(wx.NullColour)
logging.info("No simulations in INTERP_WD.")
else:
self.toggle_resamp_state = False
self._but_toggle_resamp.SetValue(False)
self._but_toggle_resamp.SetBackgroundColour(wx.NullColour)
logging.info("Resampling disactivated")
[docs]
def OnCheckScenario(self,e):
"""Checks if scenarios exist in CHANGE_VULNE."""
tif_files = self._manager.get_modifiedrasters()
logging.info("checking the scenarios for vulnerability and acceptability.")
if not tif_files:
wx.MessageBox(
"No files 'vuln_' or 'MNTmodifs_' found in CHANGE_VULNE.",
"Information",
wx.OK | wx.ICON_INFORMATION
)
else :
wx.MessageBox(
f"There exist vuln_ file(s) in CHANGE_VULNE:\n {', '.join([tif_file for tif_file in tif_files])}",
"Information", wx.OK | wx.ICON_INFORMATION)
[docs]
def OnVulnerability(self, e):
""" Run the vulnerability """
if self._manager is None:
return
path = [self._manager.OUT_VULN]
steps = list(self._steps_vulnerability.GetCheckedStrings())
steps = [int(cur.split('-')[1]) for cur in steps]
resolution,_,_,_,_,_ =update_info_header(self.input_dx,self.input_nbxy,self.input_O,self._manager.IN_SA_INTERP)
if resolution == '':
wx.MessageBox(
f"There are no files in INTERP_WD lease, use first the buttons at the second line.",
"Attention",
wx.OK | wx.ICON_ERROR
)
else :
message_supp = "."
if len(steps) == 0:
steps = [1,10,11,2,3]
if self.toggle_state == True :
message_supp = " AND scenario(s) vuln_ taken into account"
steps = [1,10,11,2,3,4]
if self._manager.OUT_VULN.exists:
message_supp = " FOR scenario(s) (vuln_ taken into account)"
logging.info("Attention - The manager ONLY computes Vulnerability_scenario, as Vulnerability_baseline already computed.")
steps=[4]
else :
logging.info("Attention - The manager computes also Vulnerability_baseline, as Vulnerability_scenario needs it as input.")
path = [self._manager.OUT_VULN_Stif]
dialog = wx.MessageDialog(None, f"Please modify the 'vuln_' files in INPUT\CHANGE_VULNE\... as desired. Default value set to one. ", "Confirmation", wx.YES_NO | wx.ICON_QUESTION)
dialog.SetYesNoLabels("Done, continue", "Not done, stop")
response = dialog.ShowModal()
if response == wx.ID_NO:
return
logging.info("No steps selected. By default every steps will be performed" + message_supp)
Vulnerability(str(self._manager.main_dir),
scenario=str(self._manager.scenario),
Study_area=str(self._manager.Study_area),
resolution=resolution,
steps=steps)
wx.MessageBox(
"Vulnerability computed with every steps" + message_supp,
"Information",
wx.OK | wx.ICON_INFORMATION
)
else :
if self.toggle_state == True :
steps.append(4)
message_supp = " AND scenario(s) vuln_ taken into account"
if self._manager.OUT_VULN.exists:
message_supp = " FOR scenario(s) (vuln_ taken into account)"
logging.info("Attention - The manager ONLY computes Vulnerability_scenario, as Vulnerability_baseline already computed.")
steps=[4]
else :
logging.info("Attention - The manager computes also Vulnerability_baseline, as Vulnerability_scenario needs it as input.")
path = [self._manager.OUT_VULN_Stif]
dialog = wx.MessageDialog(None, f"Please modify the 'vuln_' files in INPUT\CHANGE_VULNE\... as desired. Default value set to one. ", "Confirmation", wx.YES_NO | wx.ICON_QUESTION)
dialog.SetYesNoLabels("Done, continue", "Not done, stop")
response = dialog.ShowModal()
if response == wx.ID_NO:
return
Vulnerability(self._manager.main_dir,
scenario=self._manager.scenario,
Study_area=self._manager.Study_area,
resolution=resolution,
steps=steps)
wx.MessageBox(
"Vulnerability computed with the selected steps" + message_supp,
"Information",
wx.OK | wx.ICON_INFORMATION
)
mapviewer_display(path)
[docs]
def OnAcceptability(self, e):
""" Run the acceptability """
if self._manager is None:
return
river_trace = self._manager.wich_river_trace()
if self.toggle_state == True and not os.path.isfile(str(self._manager.OUT_VULN_Stif)) and not os.path.isfile(str(river_trace)) :
wx.MessageBox("Necessary files are missing, please ensure the DataBase or Vulnerability or Updating riverbed steps were performed. ","Error", wx.OK | wx.ICON_ERROR )
return
if self.toggle_state == False and not os.path.isfile(str(self._manager.OUT_VULN)) and not os.path.isfile(str(river_trace)) :
wx.MessageBox("Necessary files are missing, please ensure the DataBase or Vulnerability or Updating riverbed steps were performed. ","Error", wx.OK | wx.ICON_ERROR )
return
steps = list(self._steps_acceptability.GetCheckedStrings())
steps = [int(cur.split('-')[1]) for cur in steps]
message_supp = "."
resampling=100
path = [self._manager.OUT_ACCEPT]
if len(steps) == 0:
steps = [1,2,3,4]
if self.toggle_state == True :
steps = [1,2,3,4,5]
message_supp = " AND scenario(s) vuln_ taken into account"
path = [self._manager.OUT_ACCEPT_Stif]
if self._manager.OUT_ACCEPT.exists:
steps = [x for x in steps if x != 4]
logging.info('Acceptability_baseline not computed because it already exists.')
message_supp = " FOR scenario(s) vuln_ taken into account"
if river_trace == self._manager.OUT_MASKED_RIVER : message_supp=message_supp +" WITH the _baseline riverbed trace."
if river_trace == self._manager.OUT_MASKED_RIVER_S :message_supp+= " WITH the _scenarios riverbed trace."
if self.toggle_resamp_state == True :
steps.append(6)
resampling = self._but_resampling.GetValue()
resolution = self.input_dx.GetLabel()
if resolution != '':
values = resolution.strip("()").split(",")
resolution = float(values[0])
message_supp+= f" It has been created for the resolution {resolution}m and the resampling size {resampling}m."
logging.info("No steps selected. By default every steps will be performed.")
Acceptability(self._manager.main_dir,
scenario=self._manager.scenario,
Study_area=self._manager.Study_area,
resample_size=resampling,
steps=steps)
wx.MessageBox(
"Acceptability computed with every steps" + message_supp,
"Information",
wx.OK | wx.ICON_INFORMATION
)
else :
if self.toggle_state == True :
steps.append(5)
message_supp = " AND scenario(s) vuln_ taken into account"
if self._manager.OUT_ACCEPT.exists:
steps = [x for x in steps if x != 4]
logging.info('Acceptability_baseline not computed because it already exists.')
message_supp = "FOR scenario(s) (vuln_taken into account)"
path = [self._manager.OUT_ACCEPT_Stif]
river_trace = self._manager.wich_river_trace()
if river_trace == self._manager.OUT_MASKED_RIVER : message_supp =message_supp + " WITH the _baseline riverbed trace."
if river_trace == self._manager.OUT_MASKED_RIVER_S : message_supp =message_supp + " WITH the _scenarios riverbed trace."
if self.toggle_resamp_state == True :
resampling = self._but_resampling.GetValue()
steps.append(6)
Acceptability(self._manager.main_dir,
scenario=self._manager.scenario,
Study_area=self._manager.Study_area,
resample_size=resampling,
steps=steps)
wx.MessageBox(
"Acceptability computed with the selected steps" + message_supp,
"Information",
wx.OK | wx.ICON_INFORMATION
)
mapviewer_display(path)