Source code for wolfhece.acceptability.acceptability_gui

"""
Author: University of Liege, HECE
Date: 2025

Copyright (c) 2025 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
import wx
import numpy as np
import logging
import subprocess
import shutil
import os
import geopandas as gpd
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from matplotlib.backends.backend_wxagg import FigureCanvasWxAgg as FigureCanvas
from matplotlib.backends.backend_wxagg import NavigationToolbar2WxAgg as NavigationToolbar2Wx
from pathlib import Path
from scipy.ndimage import label
from gettext import gettext as _
import rasterio
from shapely.geometry import Polygon

from .acceptability import Base_data_creation, Database_to_raster, Vulnerability, Acceptability
from .acceptability import steps_base_data_creation, steps_vulnerability, steps_acceptability
from .func import Accept_Manager
from ..wolf_array import WolfArray, header_wolf
from ..scenario.config_manager import Config_Manager_2D_GPU
from ..PyDraw import WolfMapViewer, draw_type
from ..Results2DGPU import wolfres2DGPU
from ..PyGui import MapManager

[docs] def nullvalue_for_hole(WA): """ Sets the null value for a WolfArray to 0 (as per the convention in the interpolation routine). """ WA.nullvalue = 0. WA.set_nullvalue_in_mask()
[docs] def read_export_z_bin(fn_read, fn_write, fn_laststep): """ Reads the free surface altitude from a GPU simulation and exports it in binary format. Inputs: - fn_read_simu: the simulation file to read. - fn_laststep: the folder EXTRACTED_LAST_STEP defined in acceptability. - fn_write: the path to save the output in binary format. """ fn_temp = os.path.join(fn_laststep, 'temp') os.makedirs(fn_temp, exist_ok=True) wolfres2DGPU_test = wolfres2DGPU(fn_read) wolfres2DGPU_test.read_oneresult(-1) wd = wolfres2DGPU_test.get_h_for_block(1) top = wolfres2DGPU_test.get_top_for_block(1) nullvalue_for_hole(wd) nullvalue_for_hole(top) wd.array = wd.array + top.array fn_write = fn_write.with_suffix('.bin') wd.write_all(fn_write) fn_write = fn_write.with_suffix('.tif') wd.write_all(fn_write) top.write_all(fn_write.parent / fn_write.stem / "_dem.tif") shutil.rmtree(fn_temp)
[docs] def riverbed_trace(fn_read_simu, fn_output, threshold): """ Recognizes the riverbed trace based on a simulation, where water depth above a given threshold is considered part of the riverbed. Inputs: - fn_read_simu: the simulation file to read. - fn_output: the location to save the riverbed trace as a .tiff file. - threshold: the water depth threshold above which the areas are considered riverbed. """ wolfres2DGPU_test = wolfres2DGPU(fn_read_simu) wolfres2DGPU_test.read_oneresult(-1) wd = wolfres2DGPU_test.get_h_for_block(1) wd.array[wd.array > 1000] = 0 wd.array[wd.array > threshold] = 1 wd.array[wd.array < threshold] = 0 wd.as_WolfArray() wd.nodata=0 wd.write_all(Path(fn_output))
[docs] def empty_folder(folder): """ Empties the content of a directory if it exists. """ if os.path.exists(folder): for files in os.listdir(folder): fn = os.path.join(folder, files) try: if os.path.isfile(fn) or os.path.islink(fn): os.unlink(fn) elif os.path.isdir(fn): shutil.rmtree(fn) except Exception as e: print(f"Error when deleting file {fn}: {e}") else: print("The folder does not exist.")
""" This script performs two main operations: 1. Subtraction of two raster TIFF files: - Identifies areas with building traces by subtracting the `bathymetry.tif` from simulations (corresponding to 'MNT_muret_bati') from `MNT` (DEM). 2. For the identified building areas (from step 1): - Replace the values with those from the `MNT` (ground level), ensuring it reflects the terrain, not bathymetry values. Final Output: - The mask should highlight building traces with corresponding DEM (`MNT`) values ("ground") inside, and its name must start with "MNT_" and include "mask" in it. Note: the computations are perfomed with tifs .tif rasters but should be translated to .bin files in the acceptability routine """ #---------------------------------------------------------------------------------------------------------- #1 - Soustraction bathymetry.tif (from simulations) - DEM (MNT, cfr "projet tuilage") ---------------------
[docs] def soustraction(fn_a,fn_b,fn_result): with rasterio.open(fn_a) as src_a, rasterio.open(fn_b) as src_b: if ( src_a.width != src_b.width or src_a.height != src_b.height or src_a.transform != src_b.transform or src_a.crs != src_b.crs ): logging.error(f"{fn_a} and {fn_b} do not have the same properties, please edit them.") data_a = src_a.read(1) data_b = src_b.read(1) #(A - B) data_diff = data_a - data_b nodata_value = src_a.nodata if src_a.nodata == src_b.nodata else None if nodata_value is not None: data_diff[(data_a == nodata_value) | (data_b == nodata_value)] = nodata_value data_diff[data_diff > 5000] = 0 labeled, n = label(data_diff) # Remove small objects threshold = 5 sizes = np.bincount(labeled.ravel()) idx_small = np.where(sizes <= threshold)[0] data_diff[np.isin(labeled, idx_small)] = 0 out_meta = src_a.meta.copy() out_meta.update({ "dtype": "float32", "driver": "GTiff" }) with rasterio.open(fn_result, "w", **out_meta) as dst: dst.write(data_diff, 1)
#2 - DEM (MNT) value in the buildings traces ------------------------------------------------------------------
[docs] def mask_creation_data(mask_file, ground_file, output_file): with rasterio.open(mask_file) as mask_src: mask = mask_src.read(1) mask_meta = mask_src.meta indices = np.where(mask > 0) with rasterio.open(ground_file) as bathy_src: bathy = bathy_src.read(1) mask[indices] = bathy[indices] mask[mask <= 0] = 9999. output_meta = mask_meta.copy() output_meta.update({"dtype": 'float32'}) with rasterio.open(output_file, "w", **output_meta) as dst: dst.write(mask, 1) WA_mask = WolfArray(output_file) WA_mask.write_all(Path(Path(output_file).parent / "MNT_computed_with_mask.bin"))
[docs] def MTN_And_mask_creation_all(fn_bathy, fn_mtn_cropped, fn_where_buildings, fn_mask_final): #couper_raster() soustraction(fn_bathy, fn_mtn_cropped, fn_where_buildings) mask_creation_data(fn_where_buildings, fn_mtn_cropped, fn_mask_final)
#--------------------------------------------------------------------------------------------------------------
[docs] def create_INPUT_TEMP_OUTPUT_forScenario(maindir, study_area, scenario, simu_gpu): """Creates folder for a new study area or/and scenario. The last argument simu_gpu is used when loading simulation (indicates path to the simulation folder), if not used, indicate None to ignore it.""" base_pathwd = Path(maindir) / "INPUT" / "WATER_DEPTH" / study_area / scenario subfolders = ["DEM_FILES", "INTERP_WD", "EXTRACTED_LAST_STEP_WD"] os.makedirs(base_pathwd, exist_ok=True) for folder in subfolders: os.makedirs(os.path.join(base_pathwd, folder), exist_ok=True) base_pathch = Path(maindir) / "INPUT" / "CHANGE_VULNE" / study_area / scenario os.makedirs(base_pathch, exist_ok=True) if simu_gpu != None: path_bat_gpu = Path(simu_gpu) / "bathymetry.tif" if path_bat_gpu.exists(): create_shapefile_from_prop_tif(path_bat_gpu, Path(maindir) / "INPUT" / "STUDY_AREA" / f"{study_area}.shp") logging.info("Study area file created in INPUT/STUDY_AREA.") else : logging.error(f"Error in the study area creation : no bathymetry.tif file in the given simulation folder {simu_gpu}. Please provide it in this folder and try again.") Accept_Manager(main_dir=maindir, Study_area=study_area, scenario=scenario) logging.info(f"Files created in INPUT, TEMP and OUTPUT for the study area named '{study_area}', and the scenario named '{scenario}'") return
[docs] def get_transform_and_crs(tif_file): """ For TIFF file manipulation, reads the CRS and the transform, and returns them. """ with rasterio.open(tif_file) as src: transform = src.transform crs = src.crs return transform, crs
[docs] def create_shapefile_from_prop_tif(fn_tif, shapefile_path): """ Creates a shapefile of the study area based on the extent of an input TIFF file. Inputs: - fn_tif: the path to the input TIFF file. - shapefile_path: the location to save the output shapefile. """ _,_,width,height,_,_ = get_header_info(fn_tif) transform, crs = get_transform_and_crs(fn_tif) top_left = transform * (0, 0) bottom_left = transform * (0, height) top_right = transform * (width, 0) bottom_right = transform * (width, height) rectangle = Polygon([top_left, top_right, bottom_right, bottom_left, top_left]) gdf = gpd.GeoDataFrame({'geometry': [rectangle]}) gdf.set_crs(crs, allow_override=True, inplace=True) gdf.to_file(shapefile_path)
[docs] def get_header_info(fn): """ Reads the headers from the file at path 'fn'. """ class_header = header_wolf() class_header.read_txt_header(fn) dx,dy = class_header.dx, class_header.dy nbx,nby = class_header.nbx, class_header.nby X,Y = class_header.origx, class_header.origy return dx,dy,nbx,nby,X,Y
[docs] def get_header_comparison(list_fn): """ Reads the headers from the files in list_fn and compares them. The result 'comp' is True if the headers are identical, and False otherwise. """ header_infos = [get_header_info(fn) for fn in list_fn] variable_names = ["dx", "dy", "nbx", "nby", "X", "Y"] for idx, name in enumerate(variable_names): values = [header[idx] for header in header_infos] if len(set(values)) > 1: comp = False else: comp = True return comp
[docs] def display_info_header(self_dx, self_nbxy, self_O, fn): """ Displays the header at the path 'fn', and update the values displayed in the acceptability window. """ dx,dy,nbx,nby,X,Y= get_header_info(fn) self_dx.SetLabel(f"({dx},{dy})") self_nbxy.SetLabel(f"({nbx},{nby})") self_O.SetLabel(f"({X},{Y})") return dx,dy,nbx,nby,X,Y
[docs] def vanish_info_header(self_dx, self_nbxy, self_O): self_dx.SetLabel("") self_nbxy.SetLabel("") self_O.SetLabel("")
[docs] def update_info_header(self_dx, self_nbxy, self_O, fn): """ Upate the displayed header values by reading the simulations headers if exist. """ if not os.path.exists(fn): os.makedirs(fn) tif_files = [f for f in os.listdir(fn) if f.lower().endswith('.tif')] tif_list_fn = [os.path.join(fn, tif_file) for tif_file in tif_files] if tif_files: if get_header_comparison(tif_list_fn) : dx,dy,nbx,nby,X,Y = display_info_header(self_dx, self_nbxy, self_O, tif_list_fn[0]) return dx,dy,nbx,nby,X,Y else: logging.error("The interpolated files have different headers. Please fix it.") return False, False, False, False, False, False else : vanish_info_header(self_dx, self_nbxy, self_O) return False, False, False, False, False, False
[docs] def search_for_modif_bath_and_copy(main_gpu, from_path, path_vuln): """ When loading gpu simulations for last step extraction, search for modified bath_ topography file, according to the structure coded in the scenarios manager. If they exist, their extent is copied to CHANGE_VULNE, called vuln_ and MNTmodifs_, to enable the user to modify it later. In addition, returns True if such files exist and False if they do not. """ found_bath = False scen_manager = Config_Manager_2D_GPU(main_gpu, create_ui_if_wx=False) curtree = scen_manager.get_tree(from_path) curdicts = scen_manager.get_dicts(curtree) all_tif_bath = [scen_manager._select_tif_partname(curdict, 'bath_') for curdict in curdicts] all_tif_bath = [curel for curlist in all_tif_bath if len(curlist)>0 for curel in curlist if curel.name.startswith('bath_')] if len(all_tif_bath) : found_bath = True for tif_file in all_tif_bath: found_bath = True with rasterio.open(tif_file) as src: #vuln_ files metadata = src.meta.copy() metadata.update(dtype=rasterio.uint8, count=1, nodata=0) data = np.ones((metadata['height'], metadata['width']), dtype=rasterio.uint8) output_file = path_vuln / tif_file.name.replace('bath_', 'vuln_') with rasterio.open(output_file, 'w', **metadata) as dst: dst.write(data, 1) #MNTmodifs_ files metadata.update(dtype=rasterio.float32, count=1, nodata=0) data = np.ones((metadata['height'], metadata['width']), dtype=rasterio.float32) output_file = path_vuln / tif_file.name.replace('bath_', 'MNTmodifs_') with rasterio.open(output_file, 'w', **metadata) as dst: dst.write(data, 1) return found_bath
[docs] def mapviewer_display(list_path): """ Load the output in the mapviewer on WOLF """ results = " and ".join(Path(path).name for path in list_path) dlg = wx.MessageDialog(None, _(f'Do you want to load {results} in the mapviewer ?'), _('Load file'), wx.YES_NO) ret = dlg.ShowModal() dlg.Destroy() if ret != wx.ID_YES: return mapviewer = WolfMapViewer(title="OUTPUT Acceptability manager") for path in list_path: myarray = WolfArray(path) newid = Path(path).name mapviewer.add_object('array', newobj=myarray, id=newid) logging.info("Press F5 to refresh the mapviewer.") mapviewer.Refresh()
[docs] class AcceptabilityGui(wx.Frame): """ The main frame for the vulnerability/acceptability computation """ def __init__(self, parent=None, width=1024, height=500): super(wx.Frame, self).__init__(parent, title='Acceptability score manager', size=(width, height)) self._manager = None self._mapviewer = None self.InitUI() @property
[docs] def mapviewer(self): return self._mapviewer
@mapviewer.setter def mapviewer(self, value): from ..PyDraw import WolfMapViewer if not isinstance(value, WolfMapViewer): raise TypeError("The mapviewer must be a WolfMapViewer") self._mapviewer = value
[docs] def OnHoverEnter(self, event): """Dynamic colour layout 1""" self._but_creation.SetBackgroundColour(wx.Colour(100,100,100)) self._but_creation.Refresh() event.Skip()
[docs] def OnHoverLeave(self, event): """Dynamic colour layout 2""" self._but_creation.SetBackgroundColour(wx.Colour(150,150,150)) self._but_creation.Refresh() event.Skip()
[docs] def layout(self, self_fct): """Update the layers for the main buttons""" font = self_fct.GetFont() font.SetWeight(wx.FONTWEIGHT_BOLD) self_fct.SetFont(font) self_fct.SetBackgroundColour(wx.Colour(150,150,150)) self_fct.Bind(wx.EVT_ENTER_WINDOW, self.OnHoverEnter) self_fct.Bind(wx.EVT_LEAVE_WINDOW, self.OnHoverLeave)
[docs] def on_button_click(self, event): self.PopupMenu(self.menu)
[docs] def on_menu_click(self, event): """Two options for the 'Update Riverbed' button: either the new riverbed trace file already exists and the user selects it, or it does not exist, and the user points to a no-overflow simulation, allowing the code to create the trace.""" menu_id = event.GetId() if menu_id == 1: logging.info("Option 1 : the file exists, pointing towards it.") dlg = wx.FileDialog(None, "Please select the .tiff file with the NEW trace of the riverbed.", style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST, wildcard="TIFF files (*.tiff)|*.tiff") if dlg.ShowModal() == wx.ID_OK: selected_file = Path(dlg.GetPath()) copied_file = self._manager.OUT_SCEN_DIR / "copy_file" shutil.copy(selected_file, copied_file) logging.info(f"File copied to: {copied_file}") new_name = self._manager.OUT_MASKED_RIVER_S with wx.MessageDialog(self, f"Modified riverbed imported and called Masked_River_extent_scenarios.tiff.", "File imported.", wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal() if new_name.exists(): new_name.unlink() copied_file.rename(new_name) logging.info(f"File renamed to: {new_name}") else: logging.info('No file selected. Please try again.') elif menu_id == 2: #No file, so need to create logging.info("Option 2 : pointing to simulation with low discharge (no overflows!).") with wx.DirDialog(self, "Please select a simul_gpu_results folder of a simulation with low discharges (no overflows).", style=wx.DD_DEFAULT_STYLE) as dir_dlg: if dir_dlg.ShowModal() == wx.ID_OK: selected_folder = Path(dir_dlg.GetPath()) if os.path.basename(selected_folder) == "simul_gpu_results" : logging.info(f"Selected folder: {selected_folder}") fn_output = self._manager.OUT_MASKED_RIVER_S dlg = wx.TextEntryDialog(self, "What water depth threshold (in meters) should be used to define the riverbed trace, above which\n" "the water depth is considered part of the riverbed? Use a dot as a decimal separator (e.g 0.3).", "Type a water depth threshold in [m] (e.g 0.3)", "") if dlg.ShowModal() == wx.ID_OK: while True: try: valeur = dlg.GetValue() threshold = float(valeur) if threshold < 1e-5 or threshold > 150: wx.MessageBox( "Error: The value must be positive > 0 and reasonable. Please, try again.", "Error", wx.OK | wx.ICON_ERROR ) break wx.MessageBox( f"Threshold accepted. Considering riverbed where water depth > {threshold}[m]. Please wait.", "Succeed", wx.OK | wx.ICON_INFORMATION ) riverbed_trace(selected_folder, fn_output, threshold) logging.info("File created.") with wx.MessageDialog( self, "Masked_River_extent_scenarios.tiff successfully created.", "File created.", wx.OK | wx.ICON_INFORMATION ) as dlg_success: dlg_success.ShowModal() break except ValueError: wx.MessageBox( "Error: Invalid entry. Please enter a valid number (positive > 0, reasonable, using with DOT as a decimal separator).", "Error", wx.OK | wx.ICON_ERROR ) break else: logging.info("Cancelled.") dlg.Destroy() else: logging.info("No folder (or wrong one) selected. Please try again (must be simul_gpu_results).")
[docs] def layout_listbox(self, self_fct): """Changes the layout for the listbox : light grey.""" self_fct.SetBackgroundColour(wx.Colour(220, 220, 220))
[docs] def InitUI(self): self.gpu_bathy = None self.maindir = None sizer_hor_main = wx.BoxSizer(wx.HORIZONTAL) sizer_vert1 = wx.BoxSizer(wx.VERTICAL) sizer_hor_threads = wx.BoxSizer(wx.HORIZONTAL) sizer_hor1 = wx.BoxSizer(wx.HORIZONTAL) sizer_hor1_1 = wx.BoxSizer(wx.HORIZONTAL) sizer_hor2 = wx.BoxSizer(wx.HORIZONTAL) sizer_hor3 = wx.BoxSizer(wx.HORIZONTAL) sizer_hor4 = wx.BoxSizer(wx.HORIZONTAL) sizer_hor_scen = wx.BoxSizer(wx.HORIZONTAL) # 1st LINE - Loading acceptability folder panel = wx.Panel(self) self._but_maindir = wx.Button(panel, label='Main Directory') self._but_maindir.SetToolTip("To indicate where the main acceptability\n folder is located.") self._but_maindir.SetFont(wx.Font(9, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD)) self._but_maindir.Bind(wx.EVT_BUTTON, self.OnMainDir) self._listbox_studyarea = wx.ListBox(panel, choices=[], style=wx.LB_SINGLE) self.layout_listbox(self._listbox_studyarea) self._listbox_studyarea.Bind(wx.EVT_LISTBOX, self.OnStudyArea) self._listbox_studyarea.SetToolTip("Choose the study area existed in the folder.") self._listbox_scenario = wx.ListBox(panel, choices=[], style=wx.LB_SINGLE) self.layout_listbox(self._listbox_scenario) self._listbox_scenario.Bind(wx.EVT_LISTBOX, self.OnScenario) self._listbox_scenario.SetToolTip("Choose the acceptability scenario.") sizer_ver_small = wx.BoxSizer(wx.VERTICAL) self._but_checkfiles = wx.Button(panel, label='Check structure') self._but_checkfiles.Bind(wx.EVT_BUTTON, self.OnCheckFiles) self._but_checkfiles.SetToolTip("Checks if the folder is correctly structured\n with INPUT, TEMP, OUTPUT.") self._but_checksim = wx.Button(panel, label='Check simulations') self._but_checksim.SetToolTip("Displays the loaded simulations, interpolated in INTERP_WD.") self._but_checksim.Bind(wx.EVT_BUTTON, self.OnHydrodynInput) self._but_checkpond= wx.Button(panel, label='Check ponderation') self._but_checkpond.Bind(wx.EVT_BUTTON, self.OnCheckPond) self._but_checkpond.SetToolTip("Displays a graph of the computed weighting coefficient\n of the final acceptability computations.") # 2nd LINE - Hydrodynamic part self._but_loadgpu = wx.Button(panel, label='Load new\n hydraulic scenarios') self._but_loadgpu.SetToolTip("To load or change the hydraulic simulations") self._but_loadgpu.SetFont(wx.Font(9, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD)) self._but_loadgpu.Bind(wx.EVT_BUTTON, self.OnLoadingSimu) sizer_hor1_1.Add(self._but_loadgpu, 1, wx.ALL | wx.EXPAND, 0) self._check_listbox = wx.CheckListBox(panel, choices=[], style=wx.LB_MULTIPLE | wx.CHK_CHECKED) self.layout_listbox(self._check_listbox) self.sims = {} sizer_hor1_1.Add(self._check_listbox, 1, wx.ALL | wx.EXPAND, 0) #ajouter!! sinon s'affiche pas self._but_DEM = wx.Button(panel, label='Check DEM inputs\n for interpolation') self._but_DEM.SetToolTip("To display the existing DEM input for the interpolation of the simulated free surfaces.") self._but_DEM.Bind(wx.EVT_BUTTON, self.OnDEM) sizer_hor1_1.Add(self._but_DEM, 1, wx.ALL | wx.EXPAND, 0) self._but_extrinterp = wx.Button(panel, label='Reading and interpolating\n free surface') self._but_extrinterp.SetToolTip("To read the simulation, and created the hydraulic input for\n acceptability (interpolated simulated free surfaces)") self._but_extrinterp.Bind(wx.EVT_BUTTON, self.OnInterpolation) sizer_hor1_1.Add(self._but_extrinterp, 1, wx.ALL | wx.EXPAND, 0) sizer_hor1.Add(self._but_maindir, 2, wx.ALL | wx.EXPAND, 0) sizer_hor1.Add(self._listbox_studyarea, 1, wx.ALL | wx.EXPAND, 0) sizer_hor1.Add(self._listbox_scenario, 1, wx.ALL | wx.EXPAND, 0) sizer_hor1.Add(sizer_ver_small, 0, wx.ALL | wx.EXPAND, 5) #3rd line sizer_hor_threads = wx.BoxSizer(wx.HORIZONTAL) text_dx = wx.StaticText(panel, label='Resolution (dx,dy) [m]:') self.input_dx = wx.StaticText(panel) self.input_dx.SetMinSize((80, -1)) text_nbxy = wx.StaticText(panel, label='(nbx, nby):') self.input_nbxy = wx.StaticText(panel) self.input_nbxy.SetMinSize((90, -1)) text_O = wx.StaticText(panel, label='Origin (X,Y):') self.input_O = wx.StaticText(panel) self.input_O.SetMinSize((170, -1)) text_threads = wx.StaticText(panel, label='Number of threads:') self._nb_process = wx.SpinCtrl(panel, value=str(os.cpu_count()), min=1, max=os.cpu_count()) self._nb_process.SetToolTip("Number of threads to be used in the computations.") sizer_hor_threads.Add(text_dx, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1) sizer_hor_threads.Add(self.input_dx, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1) sizer_hor_threads.Add(text_nbxy, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1) sizer_hor_threads.Add(self.input_nbxy, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1) sizer_hor_threads.Add(text_O, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1) sizer_hor_threads.Add(self.input_O, 1, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1) sizer_hor_threads.Add(text_threads, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1) sizer_hor_threads.Add(self._nb_process, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1) # 3 last lines + scenarios #-------------------------- self._but_creation = wx.Button(panel, label='DataBase Creation') self.layout(self._but_creation) self._but_creation.Bind(wx.EVT_BUTTON, self.OnCreation) self._steps_db = wx.CheckListBox(panel, choices=steps_base_data_creation.get_list_names(), style=wx.LB_MULTIPLE | wx.CHK_CHECKED) self._but_vulnerability = wx.Button(panel, label='Vulnerability') self.layout(self._but_vulnerability) self._but_vulnerability.Bind(wx.EVT_BUTTON, self.OnVulnerability) step_Vuln_without_withoutscenarios = [item for item in steps_vulnerability.get_list_names() if item != 'APPLY_SCENARIOSVULN - 4'] self._steps_vulnerability = wx.CheckListBox(panel, choices=step_Vuln_without_withoutscenarios, style=wx.LB_MULTIPLE | wx.CHK_CHECKED) # Scenarios specifics -- self._but_checkscenario = wx.Button(panel, label='Check existing scenarios') self._but_checkscenario.SetToolTip("To display the scenario to be taken into account in CHANGE_VULNE.") self._but_checkscenario.Bind(wx.EVT_BUTTON, self.OnCheckScenario) self._but_upriverbed = wx.Button(panel, label="Update riverbed") self._but_upriverbed.SetToolTip("To create the raster of the riverbed trace.") self._but_upriverbed.Bind(wx.EVT_BUTTON, self.on_button_click) self.menu = wx.Menu() self.menu.Append(1, "File of riverbed trace exists.") self.menu.Append(2, "Point to a low discharge simulation and calculate the riverbed trace.") self.menu.Bind(wx.EVT_MENU, self.on_menu_click) self._but_toggle_scen = wx.ToggleButton(panel, label="Accounting for scenarios") self._but_toggle_scen.SetToolTip("To be activated to surimpose the vuln_ files, \n and so to take into account scenarios") self.toggle_state = False self._but_toggle_scen.Bind(wx.EVT_TOGGLEBUTTON, self.OnToggle) sizer_hor_scen.Add(self._but_checkscenario, 1, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1) sizer_hor_scen.Add(self._but_upriverbed, 1, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1) sizer_hor_scen.Add(self._but_toggle_scen, 1, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 1) self._but_toggle_resamp = wx.ToggleButton(panel, label="Resampling [m]:") self._but_toggle_resamp.SetToolTip("To compute the final raster with a coarser resolution than the original one.") self.toggle_resamp_state = False self._but_toggle_resamp.Bind(wx.EVT_TOGGLEBUTTON, self.OnToggleResampling) sizer_hor_scen.Add(self._but_toggle_resamp, flag=wx.ALIGN_CENTER | wx.TOP) self._but_resampling = wx.SpinCtrl(panel, value="100", min=1, max=1000) sizer_hor_scen.Add(self._but_resampling, flag=wx.ALIGN_CENTER | wx.TOP) #-- self._but_acceptability = wx.Button(panel, label='Acceptability') self.layout(self._but_acceptability) self._but_acceptability.Bind(wx.EVT_BUTTON, self.OnAcceptability) step_without_withoutscenarios = [item for item in steps_acceptability.get_list_names() if item != 'COMPUTE_WITH_SCENARIOS - 5'] step_without_withoutscenarios = [item for item in step_without_withoutscenarios if item != 'RESAMPLING - 6'] self._steps_acceptability = wx.CheckListBox(panel, choices=step_without_withoutscenarios, style=wx.LB_MULTIPLE | wx.CHK_CHECKED) sizer_hor2.Add(self._but_creation, 1, wx.ALL | wx.EXPAND, 0) sizer_hor2.Add(self._steps_db, 1, wx.ALL | wx.EXPAND, 0) sizer_hor3.Add(self._but_vulnerability, 1, wx.ALL | wx.EXPAND, 0) sizer_hor3.Add(self._steps_vulnerability, 1, wx.ALL | wx.EXPAND, 0) sizer_hor4.Add(self._but_acceptability, 1, wx.ALL | wx.EXPAND, 0) sizer_hor4.Add(self._steps_acceptability, 1, wx.ALL | wx.EXPAND, 0) #Lines order sizer_ver_small.Add(self._but_checkfiles, 0, wx.ALL | wx.EXPAND, 1) sizer_ver_small.Add(self._but_checksim, 0, wx.ALL | wx.EXPAND, 1) sizer_ver_small.Add(self._but_checkpond, 0, wx.ALL | wx.EXPAND, 1) sizer_vert1.Add(sizer_hor1, 1, wx.EXPAND, 0) sizer_vert1.Add(sizer_hor1_1, 1, wx.EXPAND, 0) sizer_vert1.Add(sizer_hor_threads, 0, wx.EXPAND, 0) sizer_vert1.Add(sizer_hor2, 1, wx.EXPAND, 0) sizer_vert1.Add(sizer_hor_scen, 1, wx.EXPAND, 0) sizer_vert1.Add(sizer_hor3, 1, wx.EXPAND, 0) sizer_vert1.Add(sizer_hor4, 1, wx.EXPAND, 0) sizer_hor_main.Add(sizer_vert1, proportion=1, flag=wx.EXPAND, border=0) #Disabled if Main Directory + SA + Scenario not selected self._but_acceptability.Enable(False) self._but_vulnerability.Enable(False) self._but_creation.Enable(False) self._but_checkfiles.Enable(False) self._but_DEM.Enable(False) self._but_extrinterp.Enable(False) self._but_toggle_scen.Enable(False) self._but_toggle_resamp.Enable(False) self._but_upriverbed.Enable(False) self._but_checkpond.Enable(False) self._but_checkscenario.Enable(False) self._but_checksim.Enable(False) self._but_creation.Enable(False) self._but_loadgpu.Enable(False) panel.SetSizer(sizer_hor_main) panel.Layout()
[docs] def OnSims(self, e:wx.ListEvent): """ Load sim into the mapviewer """ pass
[docs] def OnSimsDBLClick(self, e:wx.ListEvent): """ Load sim into the mapviewer """ if self.mapviewer is None: return from ..PyDraw import draw_type idx_sim = e.GetSelection() tmppath = self._manager.get_filepath_for_return_period(self._manager.get_return_periods()[idx_sim]) if tmppath.stem not in self.mapviewer.get_list_keys(drawing_type=draw_type.ARRAYS): self.mapviewer.add_object('array', filename=str(tmppath), id=tmppath.stem) self.mapviewer.Refresh()
[docs] def OnCheckFiles(self, e): """ Check the files """ if self._manager is None: logging.error("No main directory selected -- Nothing to check") return i=self._manager.check_inputs() if i == False : logging.error(f"Missing files in INPUT. Please provide them by following the right structure.") with wx.MessageDialog(self, f"Missing files in INPUT. Inputs can not be created automatically : you must provide them.\n Please read the logs and terminal to see the missing ones.", "Error", wx.OK | wx.ICON_ERROR) as dlg: dlg.ShowModal() return else : if (self._manager._study_area is None) or (self._manager._scenario is None): logging.error(f"No study area and/or scenario selected, no check of TEMP and OUTPUT.") with wx.MessageDialog(self, f"INPUT is well structured, but TEMP and OUTPUT have not been checked because there is no study area and scenario selected.", "Checking", wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal() else: logging.info(f"The folder is well structured.") t=self._manager.check_temporary() o=self._manager.check_outputs() with wx.MessageDialog(self, f"Main directory is checked.\nINPUT is well structured, and TEMP and OUTPUT have been checked. If folders were missing, they have been created\nMain directory at {self.maindir}", "Checking", wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal()
[docs] def OnHydrodynInput(self,e): """ A test to check if the FILLED water depths files exist. -If YES : the code can go on -If NO : either need to be computed, either the code will use the baseline ones """ if self._manager is None: logging.error("No main directory selected -- Nothing to check") return paths_FilledWD = self._manager.get_sims_files_for_scenario() if len(paths_FilledWD) == 0 : logging.info("There are no interpolated free surface files.") dialog = wx.MessageDialog(None, "There are no interpolated free surface files. Please choose an action.", "Checking- Choose an option", wx.YES_NO | wx.CANCEL | wx.ICON_QUESTION) dialog.SetYesNoLabels("Use the ones in the scenario_baseline (assumption)", "Load other simulations") response = dialog.ShowModal() if response == wx.ID_YES: logging.info("Decision of using baseline simulations.") paths_FilledWD_base = self._manager.get_sims_files_for_baseline() if len(paths_FilledWD_base) == 0 : logging.info("Cannot select files in the _baseline folder (no files or no folder!).") else: self._manager.copy_tif_files(paths_FilledWD_base, self._manager.IN_SA_INTERP) elif response == wx.ID_NO: logging.info("Decision of loading simulations.") with wx.MessageDialog(self, f"Please use the 'Load gpu simulations folder' button of the manager and follow the instructions.", "Redirecting", wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal() else: logging.info("Cancelled") dialog.Destroy() else: name_paths_FilledWD = [] for names in paths_FilledWD: logging.info(f"Interpolated free surface file(s) found: {names.name}. \n Reminder : the names of the simulations MUST be 'T.' or 'Q.' with '.' the return period.") name_paths_FilledWD.append(names.name) with wx.MessageDialog(self, f"{len(paths_FilledWD)} file(s) of interpolated free surface found in the folder : {name_paths_FilledWD}.", "Information", style=wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal() update_info_header(self.input_dx,self.input_nbxy,self.input_O,self._manager.IN_SA_INTERP)
[docs] def OnCheckPond(self,e): ponds = self._manager.get_ponderations() if isinstance(ponds, pd.DataFrame): logging.info(f"Plotting the coefficients graph.") ponds.plot(kind='bar', color='gray', edgecolor='black') plt.ylabel("Weighting coefficients [-]") plt.xlabel("Return period [years]") plt.grid(axis='y', linestyle='--', alpha=0.7) plt.show() else: with wx.MessageDialog(self, "No coefficients computed, because no return period found in the interpolated simulation folder. Try after loading gpu simulations", "Checking", style=wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal()
[docs] def OnMainDir(self, e): """Selects the main directory to be read.""" vanish_info_header(self.input_dx,self.input_nbxy,self.input_O) with wx.DirDialog(self, "Choose the main directory containing the data (folders INPUT, TEMP and OUTPUT):", style=wx.DD_DEFAULT_STYLE ) as dlg: if dlg.ShowModal() == wx.ID_OK: self._manager = Accept_Manager(dlg.GetPath()) self.maindir=dlg.GetPath() folders = ["INPUT", "TEMP", "OUTPUT"] if all(os.path.isdir(os.path.join(self.maindir, folder)) for folder in folders) == False: logging.info("Folder not loaded (incorrect structure).") wx.MessageBox( f"Missing folders among INPUT, TEMP and OUTPUT. Please organize correctly this folder.", "Error", wx.OK | wx.ICON_ERROR ) return self._but_acceptability.Enable(True) self._but_vulnerability.Enable(True) self._but_creation.Enable(True) self._but_checkfiles.Enable(True) self._but_toggle_scen.Enable(True) self._but_toggle_resamp.Enable(True) self._but_upriverbed.Enable(True) self._but_checkscenario.Enable(True) self._but_checkpond.Enable(True) self._but_checksim.Enable(True) self._but_loadgpu.Enable(True) self._listbox_scenario.Clear() studyareas = self._manager.get_list_studyareas() if len(studyareas) == 0 : logging.info("Folder loaded but no study areas found in the folder (INPUT/STUDY_AREA). Please use the button to load hydraulic simulations in the manager.") return self._listbox_studyarea.Clear() self._listbox_studyarea.InsertItems(studyareas, 0) logging.info("All the files are present") else: return
[docs] def OnStudyArea(self, e): """ Change the study area """ if self._manager is None: return vanish_info_header(self.input_dx,self.input_nbxy,self.input_O) self._listbox_scenario.Clear() study_area:str = self._manager.get_list_studyareas(with_suffix=True)[e.GetSelection()] self._manager.change_studyarea(study_area) sc = self._manager.get_list_scenarios() if len(sc)!=0: self._listbox_scenario.InsertItems(sc, 0) else : logging.error("No scenario available associated with this study area.") if self.mapviewer is not None: tmp_path = self._manager.IN_STUDY_AREA / study_area from ..PyDraw import draw_type if not tmp_path.stem in self.mapviewer.get_list_keys(drawing_type=draw_type.VECTORS): self.mapviewer.add_object('vector', filename=str(tmp_path), id=tmp_path.stem) self.mapviewer.Refresh()
[docs] def OnScenario(self, e): """ Change the scenario """ if self._manager is None: return scenario = self._manager.get_list_scenarios()[e.GetSelection()] self._manager.change_scenario(scenario) create_INPUT_TEMP_OUTPUT_forScenario(self.maindir, self._manager.Study_area, self._manager.scenario, None) update_info_header(self.input_dx,self.input_nbxy,self.input_O,self._manager.IN_SA_INTERP)
[docs] def OnCreation(self, e): """ Create the database """ if self._manager is None: return dx,_,_,_,_,_ = update_info_header(self.input_dx,self.input_nbxy,self.input_O,self._manager.IN_SA_INTERP) resolution = dx if resolution == '': wx.MessageBox( f"There are no files in INTERP_WD lease, use first the buttons at the second line.", "Attention", wx.OK | wx.ICON_ERROR ) else : steps = list(self._steps_db.GetCheckedStrings()) steps = [int(cur.split('-')[1]) for cur in steps] if len(steps) != 0: wx.MessageBox( f"The database will now be created, with a resolution of {dx}. This process may take some time, and the window may temporarily stop responding.", "Information", wx.OK | wx.ICON_INFORMATION ) Base_data_creation(self._manager.main_dir, Study_area=self._manager.Study_area, number_procs=self._nb_process.GetValue(), resolution=dx, steps=steps) wx.MessageBox( "The database is created with the selected steps.", "Information", wx.OK | wx.ICON_INFORMATION ) else : wx.MessageBox( f"No steps selected. The code for the DataBase creation will consider all steps by default, , with a resolution of {dx}. This process may take some time, and the window may temporarily stop responding.", "Information", wx.OK | wx.ICON_INFORMATION ) Base_data_creation(self._manager.main_dir, Study_area=self._manager.Study_area, number_procs=self._nb_process.GetValue(), resolution=dx) wx.MessageBox( "The database is created for every steps.", "Information", wx.OK | wx.ICON_INFORMATION )
[docs] def OnLoadingSimu(self,e): """ Link between acceptability and simulations -Load a hydraulic scenarios from the scenario manager -Create scenario and study area if needed. """ dlg = wx.DirDialog(None, "Please select the main scenario manager folder (containing the scenarios, the folder discharge, the scripts.py...), named after the STUDY AREA.", style=wx.DD_DEFAULT_STYLE) if dlg.ShowModal() == wx.ID_OK: main_gpu = Path(dlg.GetPath()) study_area = main_gpu.name logging.info(f"Selected folder for GPU result such as the STUDY AREA is {study_area}") dlg = wx.DirDialog(None, "Please select the scenarios folder (containing the 'simulations' folder) of the specific HYDRAULIC SCENARIO.", defaultPath=str(main_gpu), style=wx.DD_DEFAULT_STYLE) if dlg.ShowModal() == wx.ID_OK: scenario = Path(dlg.GetPath()) hydraulic_scen=scenario.joinpath("simulations") scenario=scenario.name logging.info(f"Selected hydraulic scenario : {scenario}") create_INPUT_TEMP_OUTPUT_forScenario(self.maindir, study_area, scenario, main_gpu) self._manager.change_studyarea(study_area+'.shp') self._manager.change_scenario(scenario) self._listbox_studyarea.Clear() self._listbox_studyarea.InsertItems(self._manager.get_list_studyareas(), 0) self._listbox_scenario.Clear() self._listbox_scenario.InsertItems(self._manager.get_list_scenarios(), 0) #Blue color of selection even if not directly clicked : index_to_select = self._listbox_scenario.FindString(scenario) if index_to_select != wx.NOT_FOUND: self._listbox_scenario.SetSelection(index_to_select) self._listbox_scenario.SetItemBackgroundColour(index_to_select, wx.Colour(0, 120, 215)) index_to_select = self._listbox_studyarea.FindString(study_area) if index_to_select != wx.NOT_FOUND: self._listbox_studyarea.SetSelection(index_to_select) self._listbox_studyarea.SetItemBackgroundColour(index_to_select, wx.Colour(0, 120, 215)) self._listbox_studyarea.Refresh() self._listbox_scenario.Refresh() else: logging.error('No hydraulic scenario selected.') else: logging.error('No folder found / selected. Please try again.') return self._check_listbox.Clear() self.sims = {} for subdir in hydraulic_scen.iterdir(): if subdir.is_dir() and subdir.name.startswith("sim_"): self.sims[subdir.name] = subdir else: logging.info('No folder sim_ found / selected. Please try again.') self.datadir_simulations = hydraulic_scen self.file_paths = {Path(sim).name: Path(sim) for sim in sorted(self.sims.keys())} self._check_listbox.Set(sorted(sim for sim in self.sims.keys())) logging.info(f"GPU simulations loaded in the checkbox.\n\nPlease select the ones you want to interpolate and use the button 'Reading and interpolating free surface'.") message = "GPU simulations loaded in the checkbox\n\nPlease select the ones you want to interpolate and use the button 'Reading and interpolating free surface'." found_bath = search_for_modif_bath_and_copy(Path(main_gpu), Path(hydraulic_scen.parent), self._manager.IN_CH_SA_SC) if found_bath : message+= "\nIn addition, modification files for bathymetry (bath_) have been found in the gpu simulations, a copy has been made for a change in the vulnerability and DEM (see vuln_ and MNTmodifs_ in CHANGE_VULNE). Please edit them." logging.info(f"Modification files for bathymetry (bath_) have been found in the gpu simulations, a copy has been made for a change in the vulnerability and DEM (see vuln_ and MNTmodifs_ in CHANGE_VULNE). Please edit them.") self.gpu_bathy = hydraulic_scen.parent / "__bathymetry.tif" self._but_extrinterp.Enable(True) self._but_DEM.Enable(True) with wx.MessageDialog(self, message, "Information", style=wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal()
[docs] def OnDEM(self,e): """Import and create the inputs for the interpolation routine holes.exe (name including 'MNT_...' and 'MNT_..._with_mask'. See function MTN_And_mask_creation_all""" if not hasattr(self, 'file_paths'): with wx.MessageDialog(self, f"Please, first load gpu simulations via the previous button.", "Attention", style=wx.OK | wx.ICON_ERROR) as dlg: dlg.ShowModal() return path = self._manager.IN_SA_DEM names_inDEM = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))] if len(names_inDEM) != 0 : dialog = wx.MessageDialog(None, f"The DEM_FILES folder is not empty and contains the files {names_inDEM}. ", "Confirmation", wx.YES_NO | wx.ICON_QUESTION) dialog.SetYesNoLabels("Delete and reload", "Keep and leave") response = dialog.ShowModal() if response == wx.ID_YES: for file_name in names_inDEM: file_path = os.path.join(path, file_name) os.remove(file_path) logging.info("Files in DEM_FILES deleted.") else : logging.info("No update of DEM_FILES.") return with wx.FileDialog(self, "Please select the DEM file in .tif format (without modifications).", wildcard="TIFF files (*.tif)|*.tif", style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) as dlg: result = dlg.ShowModal() if result != wx.ID_OK: return path_DEM_base = dlg.GetPath() logging.info("DEM file selected.") #DEM and masked DEM creation path = self._manager.IN_CH_SA_SC names_inCHVUL_MNTmodifs = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) and f.startswith("MNTmodifs_")] #path_MNT_computed is the path to the DEM with the MNTmodifs if exist, or the given true DEM if not path_MNT_computed = Path(path_DEM_base) if len(names_inCHVUL_MNTmodifs) !=0: path_MNT_computed = Path(self._manager.IN_CH_SA_SC_MNT_tif.with_suffix('.tif')) dialog = wx.MessageDialog(None, f"Please modify the 'MNTmodifs_' files in INPUT\CHANGE_VULNE\... as in the hydraulic scenario you want to study. They are: {names_inCHVUL_MNTmodifs}", "Confirmation", wx.YES_NO | wx.ICON_QUESTION) dialog.SetYesNoLabels("Done, continue", "Not done, stop") response = dialog.ShowModal() if response == wx.ID_NO: logging.info("No modifications done in MNTmodifs_ files, process stopped.") return #else : if os.path.exists(self._manager.IN_CH_SA_SC): existence=False existence = self._manager.create_vrtIfExists(Path(path_DEM_base), self._manager.IN_CH_SA_SC, self._manager.IN_CH_SA_SC_MNT_VRT, name="MNTmodifs_") if existence : self._manager.translate_vrt2tif(self._manager.IN_CH_SA_SC_MNT_VRT, self._manager.IN_CH_SA_SC_MNT_tif) logging.info(f"Scenarios have been applied to DEM see {self._manager.IN_CH_SA_SC_MNT_tif}.tif.") WA_mask = WolfArray(self._manager.IN_CH_SA_SC_MNT_tif.with_suffix('.tif')) WA_mask.write_all(Path(self._manager.IN_SA_DEM / "MNT_loaded.bin")) else : logging.info(f"No MNTmodifs_ files in {self._manager.IN_CH_SA_SC}. The given file {path_DEM_base} has not been modified") WA_mask = WolfArray(path_DEM_base) WA_mask.write_all(Path(self._manager.IN_SA_DEM / "MNT_loaded.bin")) else: logging.error(f"Path {self._manager.IN_CH_SA_SC} does not exist.") #self._manager.IN_CH_SA_SC_MNT_tif ou fn_mnt_cropped : ground + riverbed fn_wherebuildings_buffer = self._manager.IN_CH_SA_SC / "buffer_wherebuilding.tif" fn_mask = self._manager.IN_SA_DEM / "MNT_computed_with_mask.tif" MTN_And_mask_creation_all(self.gpu_bathy, path_MNT_computed, fn_wherebuildings_buffer, fn_mask) if fn_wherebuildings_buffer.exists(): fn_wherebuildings_buffer.unlink() if fn_mask.exists(): fn_mask.unlink() dlg = wx.MessageDialog(self, "DEM files created in INPUT\WATER_DEPTH\...\DEM_FILES.", "Success.", wx.OK | wx.ICON_INFORMATION) dlg.ShowModal() dlg.Destroy() return
[docs] def OnInterpolation(self,e): """Interpolates the last extracted time steps present in LAST_STEP_EXTRACTED using the fast marching interpolation routine holes.exe, by creating a batch file while performing multiple checks on the required input files.""" if not hasattr(self, 'file_paths'): with wx.MessageDialog(self, f"Please, first load gpu simulations via the previous button.", "Attention", style=wx.OK | wx.ICON_ERROR) as dlg: dlg.ShowModal() return checked_indices = self._check_listbox.GetCheckedItems() checked_items = [self._check_listbox.GetString(index) for index in checked_indices] selected_paths = [self.file_paths[item] for item in checked_items] path_simulations = self.datadir_simulations path_LastSteps = Path(self._manager.IN_SA_EXTRACTED) fn_write = None dx,dy,nbx,nby,X,Y = False, False, False, False, False, False for sim_ in selected_paths: if sim_.name.startswith("sim_"): self.sims[sim_.name] = sim_ fn_read = Path(path_simulations/ sim_ / "simul_gpu_results") logging.info(f"Found simulation folder: {sim_}") parts = sim_.name.split("sim_") if len(parts) > 1: name = parts[1] fn_write = Path(path_LastSteps / name ) dx,dy,nbx,nby,X,Y = display_info_header(self.input_dx, self.input_nbxy, self.input_O, fn_write.with_suffix(".bin")) read_export_z_bin(fn_read, fn_write, path_LastSteps) else: logging.info(f"Please, ensure your simulations are named with the return period, e.g sim_T4") else: logging.info('No folder found / selected. Please try again...') else: logging.error('No simulation selected! Please select some in the checkbox.') C = None D = None for file in os.listdir(Path(self._manager.IN_SA_DEM)): file_path = Path(self._manager.IN_SA_DEM) / file if file_path.is_file() and file.startswith("MNT_") and file_path.suffix == ".bin": if "mask" not in file: D = file_path else: C = file_path if D is None: logging.info("DEM (.bin) not found in DEM_FILES. The file must begins by 'MNT_' and CANNOT include the word 'mask'") with wx.MessageDialog(self, f"DEM (.bin) not found in DEM_FILES. The file must begins by 'MNT_' and CANNOT include the word 'mask'", "Missing file", style=wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal() return if C is None: logging.info("DEM mask (.bin) not found in DEM_FILES. The file must begins by 'MNT_' and MUST include the word 'mask'") with wx.MessageDialog(self, f"DEM mask (.bin) not found in DEM_FILES. The file must begins by 'MNT_' and MUST include the word 'mask'", "Missing file", style=wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal() return if not get_header_comparison([fn_write.with_suffix(".bin"), C, D]): logging.info("Files in DEM_FILES do not have the same properties as the simulations files. Please, fix them.") with wx.MessageDialog(self, f"Files in DEM_FILES do not have the same properties as the simulations files. Please, fix them.", "Error in DEM_FILES files", style=wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal() return checked_names = self._check_listbox.GetCheckedStrings() if not checked_names: logging.info("No items selected. Adding all paths.") checked_paths = list(self.file_paths.values()) message_info = "No simulations were checked in the box; so the computations will consider all of them. The interpolation of the given free surface will begin when you press OK, please wait." else: logging.info("Adding only the selected simulations.") checked_paths = [self.file_paths[name] for name in checked_names] message_info = "The interpolation of the given free surface will begin (for the selected simulation(s)), please wait." if len(self.file_paths) == 0 : with wx.MessageDialog(self, f"No files in EXTRACTED_LAST_STEP_WD. Please provide some or use the 'Load gpu simulation' button.", "OK", wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal() else : path_Interp = Path(self._manager.IN_SA_INTERP) path_bat_file = os.path.join(self._manager.IN_SCEN_DIR, "process_files.bat") if os.path.exists(path_bat_file): logging.info(f"The file {path_bat_file} already exists and will be replaced.") os.remove(path_bat_file) path_code = os.path.join(self._manager.IN_WATER_DEPTH, "holes.exe") renamed_files = [] if True: A=[] for path in checked_paths: parts = path.name.split("sim_") A.extend([os.path.join(path_LastSteps, g) for g in os.listdir(path_LastSteps) if g.endswith(f"{parts[1]}.bin")]) B = [os.path.join(path_Interp, os.path.splitext(os.path.basename(f))[0]) for f in A] if not A or not B or not C or not D: logging.info("Missing files.") with wx.MessageDialog(self, f"The interpolation cannot go on, as some files are missing (see logs): please check the DEM_FILES or the EXTRACTED_LAST_STEP_WD folders.", "Missing files.", wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal() with open(path_bat_file, "w") as bat_file: for a, b in zip(A, B): line = f'"{path_code}" filling in="{a}" out="{b}" mask="{C}" dem="{D} avoid_last=1"\n' bat_file.write(line) logging.info(message_info) empty_folder(self._manager.IN_SA_INTERP) path_bat_file = os.path.join(self._manager.IN_SCEN_DIR, "process_files.bat") # FORTRAN HOLES.EXE subprocess.run([path_bat_file], check=True) path_fichier=self._manager.IN_SA_INTERP for file in path_fichier.glob("*.tif"): if "_h" in file.name: new_name = file.stem.split("_h")[0].replace(".bin", "") + ".tif" file.rename(file.with_name(new_name)) renamed_files.append(new_name) #deleting the other for file in path_fichier.glob("*.tif"): if "_combl" in file.name or file.name not in renamed_files: file.unlink() else: #Python eikonal model from ..eikonal import inpaint_array, inpaint_waterlevel dtm = WolfArray(D) A=[] for path in checked_paths: parts = path.name.split("sim_") A.extend([os.path.join(path_LastSteps, g) for g in os.listdir(path_LastSteps) if g.endswith(f"{parts[1]}.tif")]) if not A or not C or not D: logging.info("Missing files.") with wx.MessageDialog(self, f"The interpolation cannot go on, as some files are missing (see logs): please check the DEM_FILES or the EXTRACTED_LAST_STEP_WD folders.", "Missing files.", wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal() empty_folder(self._manager.IN_SA_INTERP) for a in A: a = Path(a) in_wa = WolfArray(a) dem_wa = WolfArray(a.parent / a.stem / "_dem.tif") _t, _wse, _wd = inpaint_waterlevel(in_wa.array, dem_wa.array.data, dtm.array.data, ignore_last_patches=1) b = os.path.join(path_Interp, os.path.splitext(os.path.basename(a))[0]) new_name = Path(b).with_suffix('.tif') renamed_files.append(new_name) in_wa.array.data[:,:] = _wd in_wa.array.mask[:,:] = _wd == 0. in_wa.write_all(new_name) logging.info("Filling completed.") with wx.MessageDialog(self, f"Filling completed. Created files : {renamed_files}", "Redirecting", wx.OK | wx.ICON_INFORMATION) as dlg: dlg.ShowModal() update_info_header(self.input_dx,self.input_nbxy,self.input_O,self._manager.IN_SA_INTERP)
[docs] def OnToggle(self,e): """Creates a toggle button to be activated if the scenarios vuln_ have to be taken into account.""" self.toggle_state = False if self._but_toggle_scen.GetValue(): logging.info("Activating the scenario button.") self._but_toggle_scen.SetBackgroundColour(wx.Colour(175, 175, 175)) self._but_toggle_scen_state = True tif_files = [file for file in Path(self._manager.IN_CH_SA_SC).glob("*.tif") if file.name.startswith("vuln_")] if not tif_files: wx.MessageBox( "The scenario button cannot be activated because there is no change in vulnerability 'vuln_' in CHANGE_VULNE. Please reload the simulations containing 'bath_' files via 'Load new hydraulic scenario' and edit it, or directly introduce your 'vuln_' files.", "Information", wx.OK | wx.ICON_INFORMATION ) logging.info("Desactivating the scenario button.") self._but_toggle_scen.SetValue(False) self._but_toggle_scen.SetBackgroundColour(wx.NullColour) self._but_toggle_scen_state = False else : self.toggle_state = True else: self._but_toggle_scen.SetBackgroundColour(wx.NullColour) self.toggle_state = False logging.info("Desactivating the scenario button.")
[docs] def OnToggleResampling(self,e): """Creates a toggle button for the acceptability resampling to be activated.""" self.toggle_resamp_state = False toggle = self._but_toggle_resamp if toggle.GetValue(): self._but_toggle_resamp.SetBackgroundColour(wx.Colour(175, 175, 175)) self.toggle_resamp_state = True logging.info("Resampling activated") current_res = self._but_resampling.GetValue() resolution = self.input_dx.GetLabel() if resolution != '': values = resolution.strip("()").split(",") dx = float(values[0]) #selection of size if current_res < dx: wx.MessageBox( "The resampling size cannot be inferior to the resolution. Please select another resampling size.", "Attention", wx.OK | wx.ICON_ERROR ) self.toggle_resamp_state = False self._but_toggle_resamp.SetValue(False) self._but_toggle_resamp.SetBackgroundColour(wx.NullColour) logging.info("Resampling disactivated because of a bad resampling size.") else : logging.info(f"Allowed resampling value : {current_res}[m].") else: self.toggle_resamp_state = False self._but_toggle_resamp.SetValue(False) self._but_toggle_resamp.SetBackgroundColour(wx.NullColour) logging.info("No simulations in INTERP_WD.") else: self.toggle_resamp_state = False self._but_toggle_resamp.SetValue(False) self._but_toggle_resamp.SetBackgroundColour(wx.NullColour) logging.info("Resampling disactivated")
[docs] def OnCheckScenario(self,e): """Checks if scenarios exist in CHANGE_VULNE.""" tif_files = self._manager.get_modifiedrasters() logging.info("checking the scenarios for vulnerability and acceptability.") if not tif_files: wx.MessageBox( "No files 'vuln_' or 'MNTmodifs_' found in CHANGE_VULNE.", "Information", wx.OK | wx.ICON_INFORMATION ) else : wx.MessageBox( f"There exist vuln_ file(s) in CHANGE_VULNE:\n {', '.join([tif_file for tif_file in tif_files])}", "Information", wx.OK | wx.ICON_INFORMATION)
[docs] def OnVulnerability(self, e): """ Run the vulnerability """ if self._manager is None: return path = [self._manager.OUT_VULN] steps = list(self._steps_vulnerability.GetCheckedStrings()) steps = [int(cur.split('-')[1]) for cur in steps] resolution,_,_,_,_,_ =update_info_header(self.input_dx,self.input_nbxy,self.input_O,self._manager.IN_SA_INTERP) if resolution == '': wx.MessageBox( f"There are no files in INTERP_WD lease, use first the buttons at the second line.", "Attention", wx.OK | wx.ICON_ERROR ) else : message_supp = "." if len(steps) == 0: steps = [1,10,11,2,3] if self.toggle_state == True : message_supp = " AND scenario(s) vuln_ taken into account" steps = [1,10,11,2,3,4] if self._manager.OUT_VULN.exists: message_supp = " FOR scenario(s) (vuln_ taken into account)" logging.info("Attention - The manager ONLY computes Vulnerability_scenario, as Vulnerability_baseline already computed.") steps=[4] else : logging.info("Attention - The manager computes also Vulnerability_baseline, as Vulnerability_scenario needs it as input.") path = [self._manager.OUT_VULN_Stif] dialog = wx.MessageDialog(None, f"Please modify the 'vuln_' files in INPUT\CHANGE_VULNE\... as desired. Default value set to one. ", "Confirmation", wx.YES_NO | wx.ICON_QUESTION) dialog.SetYesNoLabels("Done, continue", "Not done, stop") response = dialog.ShowModal() if response == wx.ID_NO: return logging.info("No steps selected. By default every steps will be performed" + message_supp) Vulnerability(str(self._manager.main_dir), scenario=str(self._manager.scenario), Study_area=str(self._manager.Study_area), resolution=resolution, steps=steps) wx.MessageBox( "Vulnerability computed with every steps" + message_supp, "Information", wx.OK | wx.ICON_INFORMATION ) else : if self.toggle_state == True : steps.append(4) message_supp = " AND scenario(s) vuln_ taken into account" if self._manager.OUT_VULN.exists: message_supp = " FOR scenario(s) (vuln_ taken into account)" logging.info("Attention - The manager ONLY computes Vulnerability_scenario, as Vulnerability_baseline already computed.") steps=[4] else : logging.info("Attention - The manager computes also Vulnerability_baseline, as Vulnerability_scenario needs it as input.") path = [self._manager.OUT_VULN_Stif] dialog = wx.MessageDialog(None, f"Please modify the 'vuln_' files in INPUT\CHANGE_VULNE\... as desired. Default value set to one. ", "Confirmation", wx.YES_NO | wx.ICON_QUESTION) dialog.SetYesNoLabels("Done, continue", "Not done, stop") response = dialog.ShowModal() if response == wx.ID_NO: return Vulnerability(self._manager.main_dir, scenario=self._manager.scenario, Study_area=self._manager.Study_area, resolution=resolution, steps=steps) wx.MessageBox( "Vulnerability computed with the selected steps" + message_supp, "Information", wx.OK | wx.ICON_INFORMATION ) mapviewer_display(path)
[docs] def OnAcceptability(self, e): """ Run the acceptability """ if self._manager is None: return river_trace = self._manager.wich_river_trace() if self.toggle_state == True and not os.path.isfile(str(self._manager.OUT_VULN_Stif)) and not os.path.isfile(str(river_trace)) : wx.MessageBox("Necessary files are missing, please ensure the DataBase or Vulnerability or Updating riverbed steps were performed. ","Error", wx.OK | wx.ICON_ERROR ) return if self.toggle_state == False and not os.path.isfile(str(self._manager.OUT_VULN)) and not os.path.isfile(str(river_trace)) : wx.MessageBox("Necessary files are missing, please ensure the DataBase or Vulnerability or Updating riverbed steps were performed. ","Error", wx.OK | wx.ICON_ERROR ) return steps = list(self._steps_acceptability.GetCheckedStrings()) steps = [int(cur.split('-')[1]) for cur in steps] message_supp = "." resampling=100 path = [self._manager.OUT_ACCEPT] if len(steps) == 0: steps = [1,2,3,4] if self.toggle_state == True : steps = [1,2,3,4,5] message_supp = " AND scenario(s) vuln_ taken into account" path = [self._manager.OUT_ACCEPT_Stif] if self._manager.OUT_ACCEPT.exists: steps = [x for x in steps if x != 4] logging.info('Acceptability_baseline not computed because it already exists.') message_supp = " FOR scenario(s) vuln_ taken into account" if river_trace == self._manager.OUT_MASKED_RIVER : message_supp=message_supp +" WITH the _baseline riverbed trace." if river_trace == self._manager.OUT_MASKED_RIVER_S :message_supp+= " WITH the _scenarios riverbed trace." if self.toggle_resamp_state == True : steps.append(6) resampling = self._but_resampling.GetValue() resolution = self.input_dx.GetLabel() if resolution != '': values = resolution.strip("()").split(",") resolution = float(values[0]) message_supp+= f" It has been created for the resolution {resolution}m and the resampling size {resampling}m." logging.info("No steps selected. By default every steps will be performed.") Acceptability(self._manager.main_dir, scenario=self._manager.scenario, Study_area=self._manager.Study_area, resample_size=resampling, steps=steps) wx.MessageBox( "Acceptability computed with every steps" + message_supp, "Information", wx.OK | wx.ICON_INFORMATION ) else : if self.toggle_state == True : steps.append(5) message_supp = " AND scenario(s) vuln_ taken into account" if self._manager.OUT_ACCEPT.exists: steps = [x for x in steps if x != 4] logging.info('Acceptability_baseline not computed because it already exists.') message_supp = "FOR scenario(s) (vuln_taken into account)" path = [self._manager.OUT_ACCEPT_Stif] river_trace = self._manager.wich_river_trace() if river_trace == self._manager.OUT_MASKED_RIVER : message_supp =message_supp + " WITH the _baseline riverbed trace." if river_trace == self._manager.OUT_MASKED_RIVER_S : message_supp =message_supp + " WITH the _scenarios riverbed trace." if self.toggle_resamp_state == True : resampling = self._but_resampling.GetValue() steps.append(6) Acceptability(self._manager.main_dir, scenario=self._manager.scenario, Study_area=self._manager.Study_area, resample_size=resampling, steps=steps) wx.MessageBox( "Acceptability computed with the selected steps" + message_supp, "Information", wx.OK | wx.ICON_INFORMATION ) mapviewer_display(path)