"""
Author: University of Liege, HECE, Damien Sansen
Date: 2025
Copyright (c) 2025 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
"""Imports et bons paths"""
import os
import geopandas as gpd
import logging
from pathlib import Path
from tqdm import tqdm
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from osgeo import gdal
from scipy.ndimage import label
from scipy.ndimage import binary_dilation
from ..wolf_vrt import create_vrt_from_diverged_files_first_based, translate_vrt2tif
from ..wolf_array import WolfArray
from ..PyTranslate import _
from ..PyVertexvectors import Zones
from ..picc import Picc_data, bbox_creation
from ..Results2DGPU import wolfres2DGPU
try:
from wolfdamage.insyde_be.Py_INBE import INBE_Manager
except ImportError:
raise ImportError("The module 'wolfdamage' is required. Please, install it.")
[docs]
class INBE_functions():
def __init__(self):
pass
[docs]
def riverbed_trace_existingdanger(self, wd_tif: Path, interation_choice: int = 2, rivermask_out: Path = None, threshold = 0.5):
"""Creates a bool matrix with the largest connected component of a raster after thresholding.
This function opens a GeoTIFF (float32), applies a threshold to create a mask,
isolates the largest object (the riverbed) by removing noise,
and then applies a binary dilation to ensure INSYDE-BE does not use hazards
values from the river. This returns a boolean matrix mask."""
WA_river = WolfArray(wd_tif)
mask = WA_river.array > threshold
structure = np.array([[0,1,0],
[1,1,1],
[0,1,0]], dtype=int)
labeled, n = label(mask, structure)
if n > 0:
sizes = np.bincount(labeled.ravel())
sizes[0] = 0
largest_label = sizes.argmax()
mask = labeled == largest_label
else:
mask[:] = False
mask = binary_dilation(mask, iterations=interation_choice)
if rivermask_out is not None:
WA_river.array[:,:] = mask[:,:]
WA_river.array[:,:] = mask.astype(np.float32) #! conversion implicite Numpy is no astype
WA_river.write_all(rivermask_out)
return mask
[docs]
def riverbed_trace_simu(self, fn_read_simu, fn_output, threshold, type_extraction, id_begin=None, id_end=None, id_step=None):
"""
Recognizes the riverbed trace based on a simulation or existing danger map, where water depth above a given threshold is considered part of the riverbed.
Inputs:
- fn_read_simu: the simulation file to read.
- fn_output: the location to save the riverbed trace as a .tiff file.
- threshold: the water depth threshold above which the areas are considered riverbed.
"""
if type_extraction == "last_step":
wolfres2DGPU_test = wolfres2DGPU(fn_read_simu)
wolfres2DGPU_test.read_oneresult(-1)
wd = wolfres2DGPU_test.get_h_for_block(1)
#wd.array[wd.array > 1000] = 0
#wd.array[wd.array > threshold] = 1
#wd.array[wd.array < threshold] = 0
wd.array[:] = np.where((wd.array > threshold) & (wd.array <= 1000), 1, 0)
wd.nodata=0
wd.write_all(Path(fn_output))
elif type_extraction=="danger_map":
wd = wolfres2DGPU(fn_read_simu).danger_map_only_h(id_begin,id_end,id_step)
wd.array[:] = np.where((wd.array > threshold) & (wd.array <= 1000), 1, 0)
wd.nodata=0
wd.write_all(Path(fn_output))
[docs]
def dilatation_mask_river(self, manager_inbe):
mask_river = Path(manager_inbe.IN_RIVER_MASK_SCEN) / "Masked_River_extent.tiff"
WA_mask_river = WolfArray(mask_river)
array = binary_dilation(WA_mask_river.array)
WA_mask_river.array[:,:] = array[:,:]
out = Path(manager_inbe.IN_RIVER_MASK_SCEN_tif)
WA_mask_river.write_all(out)
return print(f"Dilated mask river written in {Path(manager_inbe.IN_RIVER_MASK_SCEN_tif)}")
[docs]
def apply_bathy_to_rivermask(self, rivermask_out: Path, source_tif: Path, output_tif: Path):
WA_mask = WolfArray(rivermask_out)
WA_src = WolfArray(source_tif)
nodata = WA_mask.nodata
if nodata != 99999.0:
nodata = 99999.0
out_arr = np.where(WA_mask.array, WA_src.array, nodata).astype(np.float32)
WA_src.array[:,:] = out_arr[:,:]
WA_mask.nodata = nodata
WA_src.write_all(output_tif)
# Assembly (FR : agglomération)
# -----------------------------
"""Basically the same operations as in the config manager to agglomerate several rasters
The class Config_Manager_2D_GPU is called, however some functions were rewritten to allow
the search of a more specific word ('vuln', and not 'bath', 'mann', or 'inf').
"""
[docs]
def tree_name_tif(self, folder_path, name):
"""Find all .tiff files starting with 'vuln' in the directory and return paths"""
folder = Path(folder_path)
vuln_tiff_files = {file for file in folder.rglob("*.tiff") if file.name.startswith(name)}
vuln_tif_files = {file for file in folder.rglob("*.tif") if file.name.startswith(name)}
vuln_files = vuln_tiff_files.union(vuln_tif_files)
tiff_trees = []
if len(vuln_files) !=0:
for tiff in vuln_files:
curtree = [tiff]
while tiff.parent != folder:
tiff = tiff.parent
curtree.insert(0, tiff)
tiff_trees.append(curtree)
return tiff_trees
[docs]
def select_name_tif(self, path_baseline: Path, folder_path: Path, name, filter_type = False) -> tuple[list[Path], list[Path]]:
"""
Collects all .tiff files starting with `name` from `folder_path` and appends them to a list.
Checks each file's data type against the baseline raster and renames files with a mismatched type,
(allowing it not to be surimposed as it does not begin by "vuln_" anymore).
:param path_baseline: Path to the baseline raster file.
:type path_baseline: pathlib.Path
:param folder_path: Path to the folder containing raster files to check.
:type folder_path: pathlib.Path
:param name: Prefix name to filter .tiff files in the folder.
:type name: str
:return: A tuple containing two lists:
- files: List of paths with matching data type.
- unused_files: List of paths renamed due to type mismatch.
:rtype: tuple[list[pathlib.Path], list[pathlib.Path]]
"""
files = []
unused_files = []
#type of base of assembly
WA_open = WolfArray(path_baseline.as_posix())
type_base = WA_open.array.dtype.name #gdal.GetDataTypeName
files.append(path_baseline.as_posix())
#any files that begin by 'name'
tiff_trees = self.tree_name_tif(folder_path, name)
for tree in tiff_trees:
file_path = tree[-1]
ds = WolfArray(file_path.as_posix())
file_type = ds.array.dtype.name
ds = None
#filter if not the same type and renaming
if filter_type:
if file_type != type_base:
suffix_to_add = f"_{file_type}"
new_name = file_path.parent / f"{file_path.name}{suffix_to_add}"
if not file_path.stem.endswith(suffix_to_add):
file_path.rename(new_name)
unused_files.append(new_name)
else:
unused_files.append(file_path)
else:
files.append(file_path.as_posix())
else:
files.append(file_path.as_posix())
return files, unused_files
[docs]
def check_nodata(self, name, path_baseline, fn_scenario):
""" Check nodata in a path """
from ..wolf_array import WOLF_ARRAY_FULL_INTEGER8
list_tif, unused = self.select_name_tif(path_baseline, fn_scenario, name)
for cur_lst in list_tif:
curarray:WolfArray = WolfArray(cur_lst)
if curarray.wolftype != WOLF_ARRAY_FULL_INTEGER8:
if curarray.nullvalue != 99999.:
curarray.nullvalue = 99999.
curarray.set_nullvalue_in_mask()
curarray.write_all()
logging.warning(_('nodata changed in favor of 99999. value for file {} !'.format(cur_lst)))
else:
logging.info(_('nodata value is {} for file {} !'.format(curarray.nullvalue, cur_lst)))
[docs]
def create_vrtIfExists(self, fn_baseline, fn_scenario, fn_vrt, name):
""" Create a vrt file from a path """
logging.info(_('Checking nodata values...'))
self.check_nodata(name, fn_baseline, fn_scenario)
list_tif, list_fail = self.select_name_tif(fn_baseline, fn_scenario, name, filter_type = True)
if len(list_fail)>0:
return False, list_fail
#création du fichier vrt - assembly/agglomération
if len(list_tif)>1:
logging.info(_('Creating .vrt from files (first based)...'))
if name == "vuln_":
create_vrt_from_diverged_files_first_based(list_tif, fn_vrt, Nodata = 127)
else:
create_vrt_from_diverged_files_first_based(list_tif, fn_vrt)
return True, list_fail
else:
return False, list_fail
[docs]
def translate_vrt2tif(self, fn_VRT, fn_vuln_s):
""" Translate vrt from OUTPUT > ... > Scenario to tif saved in the same folder, and delete the vrt file """
if (fn_VRT).exists():
translate_vrt2tif(fn_VRT, fn_vuln_s)
os.remove(fn_VRT)
[docs]
def copy_tif_files(self, files: list[Path], destination_dir: Path) -> None:
destination_dir.mkdir(parents=True, exist_ok=True)
for file in files:
destination_file = destination_dir / file.name
dataset = WolfArray(file) #gdal.open
if dataset is None:
logging.warning(f"Could not open {file} with GDAL.")
continue
dataset.write_all(str(destination_file))
dataset = None
logging.info(_("All the existing .tif files have been copied to the destination directory."))
# Divers
[docs]
def name_existence(self, Zones1, key, name):
names = [str(curzone.get_values(key)[0]) for curzone in Zones1.myzones]
if (str(name) in names):
logging.info(_(f'There exists objects with name {name}.'))
else :
logging.info(_(f'No objects with name {name}'))
[docs]
def PICC_read(self, manager_inbe, name:str = "Habitation"):
study_area_path = Path(manager_inbe.IN_STUDY_AREA) / manager_inbe._study_area
bbox = bbox_creation(study_area_path)
_picc_data = Picc_data(data_dir=Path(),bbox=bbox)
path_vector = Path(manager_inbe.IN_DATABASE) / "PICC_Vesdre.shp"
if not path_vector.exists():
return None, None
zones_hab = _picc_data.create_zone_picc(path_vector = path_vector, name=name, column = "GEOREF_ID", bbox=bbox)
return zones_hab, zones_hab.nbzones, bbox
[docs]
def create_table_wolfsimu(self, manager_inbe, simu, operator_wd:str, operator_doi:str, hazard = ["wd", "velocity", "flood duration"], ZonesX:Zones = None,
Zones_v_doi:Zones = None, removed_buildings = [], percentil:float=None, percentil_doi:float=None, percentil_v:float=None, title: str = "table_wolfsimu.xlsx"):
"""Creates the minimum inputs needed for INBE based on the simulations, via interpolated files and operator for wd, and via buffer for other (doi, v). One table per simulations."""
where_wd = manager_inbe.IN_SA_INTERP
where_v = manager_inbe.IN_SCEN_DIR_V
where_doi = manager_inbe.IN_SCEN_DIR_D
out= manager_inbe.IN_CSV_SCEN
out = Path(out) / title.replace(".xlsx", f"_{simu}.xlsx")
colonnes = [
"code", "he_max", "he_out", "h_gf", "v", "date_arr_base",
"date_leave_base", "no_s", "no_q", "FA", "NF", "GL", "YY", "Y_renov", "EP"
]
df = pd.DataFrame(columns=colonnes)
where_v = Path(where_v) / f"v_danger_{simu}_{manager_inbe.scenario}.tif"
where_doi = Path(where_doi) / f"doi_danger_{simu}_{manager_inbe.scenario}.tif"
simu_path = simu + ".tif"
if not (Path(where_wd) / simu_path).exists():
logging.info(_(f"where_wd : {Path(where_wd) / simu_path}"))
return logging.info(_(f"Files of water depths do not exist."))
#Exclure le mask dilaté (car pas plein bord...) de la rivière !
WA_river = WolfArray(manager_inbe.IN_RIVER_MASK_SCEN_tif)
i,j = np.where(~WA_river.array.mask)
WA_wd = WolfArray(where_wd / simu_path)
WA_wd.array[i,j] = WA_wd.nodata
if "velocity" in hazard:
if not Path(where_v).exists():
logging.info(_(f"where_v : {where_v}"))
return logging.error(_(f"Files of velocity danger maps do not exist."))
WA_v = WolfArray(where_v)
WA_v.array[i,j] = WA_v.nodata
if "flood duration" in hazard:
if not Path(where_doi).exists():
logging.info(_(f"where_doi : {where_doi}"))
return logging.error(_(f"Files of flood inundation danger maps do not exist."))
WA_doi = WolfArray(where_doi)
WA_doi.array[i,j] = WA_doi.nodata
i = 0
val_h = 0
logging.info(_("Scanning zone for water depth inside buildings."))
for curzone in tqdm(ZonesX.myzones):
for curvector in curzone.myvectors:
if i >= ZonesX.nbzones:
break
if str(curvector) in removed_buildings:
continue
wd_values = WA_wd.get_values_insidepoly(curvector)
wd_filtered = [x for x in wd_values[0] if x not in (0, None)]
wd_filtered = np.array(wd_filtered)
if len(wd_filtered) != 0:#np.all(len(wd_values[0]) != 0): #critère sur WD #statistics WA
if operator_wd == "mean":
val_h = wd_filtered.mean()
elif operator_wd == "max":
val_h = wd_filtered.max()
elif operator_wd == "median":
val_h = np.median(wd_filtered)
elif operator_wd == "percentil":
if percentil != None:
val_h = np.quantile(wd_filtered, percentil / 100)
else:
logging.error(_(f"Bad value for percentil {percentil}"))
return
else:
logging.error(_("To be coded."))
if val_h !=0:
df.loc[i, "code"] = str(curvector)
df.loc[i, "he_out"] = val_h
df.loc[i, "FA"] = curzone.area
curvector.update_lengths()
df.loc[i, "EP"] = curvector.length2D#perimeters[0]
i += 1
if any(c in hazard for c in ["velocity", "flood duration"]):
logging.info(_("Scanning zone for velocity along the buffers."))
for curzone_v in tqdm(Zones_v_doi.myzones):
for curvector_v in curzone_v.myvectors:
if i >= Zones_v_doi.nbzones:
break
if str(curvector_v) not in df['code'].astype(str).values:
continue
if "velocity" in hazard:
v_values = WA_v.get_values_insidepoly(curvector_v, usemask=False)
v_values[0][v_values[0] == -99999] = 0
v_filtered = [x for x in v_values[0] if x not in (0, None)]
v_filtered = np.array(v_filtered)
if len(v_filtered) != 0:
df.loc[df['code'] == str(curvector_v), "v"] = np.quantile(v_filtered, percentil_v / 100)
if "flood duration" in hazard:
doi_values = WA_doi.get_values_insidepoly(curvector_v, usemask=False)
doi_values[0][doi_values[0] == -99999] = 0
doi_filtered = [x for x in doi_values[0] if x not in (0, None)]
doi_filtered = np.array(doi_filtered)
if len(doi_filtered) != 0:
if operator_doi == "mean":
val_doi = doi_filtered.mean()
elif operator_doi == "max":
val_doi = doi_filtered.max() #simplifier max et median avec percentil auto
elif operator_doi == "median":
val_doi = np.median(doi_filtered)
elif operator_doi == "percentil":
if percentil_doi != None:
val_doi = np.quantile(doi_filtered, percentil_doi / 100)
else:
logging.error(_(f"Bad value for percentil doi {percentil_doi}"))
return
if val_doi > 0 :
df.loc[df['code'] == str(curvector_v), "date_arr_base"] = val_doi /3600
df.loc[df['code'] == str(curvector_v), "date_leave_base"] = 0
try:
df.to_excel(out, index=False)
except PermissionError:
logging.error(_(f"Permission denied: please ensure the file '{out.name}' is not open in another program."))
return
[docs]
def pre_processing_auto(self, name_conditionnel, Ti_list, main_dir, Study_area, scenario, multiple, dx, removed_buildings = [], percentil = None,
percentil_doi = None, percentil_v=None, operator_wd = "median", operator_doi = "median", hazard = ["wd", "velocity", "flood duration"]):
manager_inbe = INBE_Manager(main_dir=main_dir, Study_area=Study_area, scenario = scenario)
Zones_reading, unused, unused = self.PICC_read(manager_inbe, name_conditionnel)
"""Buffer creation"""
dx_buffer = multiple #multiple de la resolution topo
dx_buffer = multiple*dx
Zones_v_doi = None
if any(c in hazard for c in ["velocity", "flood duration"]):
Zones_v_doi = Zones_reading.buffer(distance = dx_buffer, resolution= 4, inplace = False)
save_where_output = []
for curT in Ti_list:
#create_table_wolfsimu_BUFFERWD(manager_inbe = manager_inbe, simu=curT, ZonesX= Zones_reading, Zones_v_doi = Zones_reading2)
self.create_table_wolfsimu(manager_inbe = manager_inbe, simu=curT, operator_wd = operator_wd, operator_doi = operator_doi, hazard=hazard, ZonesX= Zones_reading, Zones_v_doi = Zones_v_doi, removed_buildings = removed_buildings, percentil=percentil, percentil_doi=percentil_doi, percentil_v=percentil_v)
logging.info(_(f"--Preprocessing table created in {manager_inbe.IN_CSV_SCEN}"))
p = Path(manager_inbe.IN_CSV_SCEN)
save_where_output.append(Path(*p.parts[-4:]))
return save_where_output
[docs]
def pre_processing_for_MAX(self, name_conditionnel, main_dir, Study_area, scenario, removed_buildings = []):
manager_inbe = INBE_Manager(main_dir=main_dir, Study_area=Study_area, scenario = scenario)
ZonesX, unused, unused = self.PICC_read(manager_inbe, name_conditionnel)
title = "table_wolfsimu_for_max.xlsx"
#self.create_table_wolfsimu(manager_inbe = manager_inbe, simu=curT, ZonesX= Zones_reading, Zones_v_doi = True, percentil=percentil)
out= manager_inbe.IN_CSV_SCEN
out = Path(out) / title
colonnes = [
"code", "he_max", "he_out", "h_gf", "v", "date_arr_base",
"date_leave_base", "no_s", "no_q", "FA", "NF", "GL", "YY", "Y_renov", "EP"
]
df = pd.DataFrame(columns=colonnes)
logging.info(_("Scanning zone for water depth inside buildings."))
i=0
for curzone in tqdm(ZonesX.myzones):
for curvector in curzone.myvectors:
if i >= ZonesX.nbzones:
break
if str(curvector) in removed_buildings:
continue
df.loc[i, "code"] = str(curvector)
df.loc[i, "he_out"] = 99999
df.loc[i, "v"] = 99999
df.loc[i, "FA"] = curzone.area
curvector.update_lengths()
df.loc[i, "EP"] = curvector.length2D#perimeters[0]
i += 1
try:
df.to_excel(out, index=False)
except PermissionError:
logging.error(_(f"Permission denied: please ensure the file '{out.name}' is not open in another program."))
return
logging.info(_(f"--Preprocessing table created in {manager_inbe.IN_CSV_SCEN}"))
[docs]
def computation_dfresults(self, input_table,type_computation, inflation):
Pyinsyde = PyINBE(input_table=input_table)
df_results,unused,unused,unused,unused,unused,unused,unused,df_input = Pyinsyde.run_R_insyde(type_computation=type_computation, inflation=inflation)
df_ifvalue = df_results[df_results['d_total'] > 0]
return df_ifvalue, df_input
#--- Aversion models Merz et al. (2009), DOI 10.5194/nhess-9-1033-2009
[docs]
def get_alpha(self, relative_damage, model_name):
"""
Return the risk aversion factor alpha for a given relative damage and model.
:param relative_damage: Relative damage value (%) for the municipality.
:type relative_damage: float
:param model_name: Model identifier ("A1", "A2", "A3").
:type model_name: str
:returns: Risk aversion factor corresponding to the given relative damage.
:rtype: float
.. note::
Implements the multiplicative aversion model from Merz et al. (2009):
* **A1**: alpha_max=10, D_l=1.0, D_u=20.0
* **A2**: alpha_max=10, D_l=1.0, D_u=10.0
* **A3**: alpha_max=50, D_l=1.0, D_u=10.0
"""
if model_name == "A0":
return 1
alpha_max, D_l, D_u = self.get_aversion_params(model_name)
if relative_damage <= D_l:
return 1.0
elif relative_damage >= D_u:
return alpha_max
else:
return 1.0 + ( (alpha_max - 1.0) / ( D_u - D_l) ) * (relative_damage - D_l)
[docs]
def plotting_oneslike_alpha(self, D, alpha_max, D_l, D_u):
"""
Compute the multiplicative risk aversion factor alpha(D) as proposed by Merz et al. (2009),
DOI 10.5194/nhess-9-1033-2009.
The function models the increase in risk aversion with the relative damage D,
using a linear transition between $D_l$ and $D_u$, and a constant alpha beyond $D_u$.
:param D: Relative damage values (%) at the intervention zone scale
(ratio of total event damage to total exposed asset value).
:type D: array_like
:param alpha_max: Maximum aversion factor applied for high-damage events.
:type alpha_max: float
:param D_l: Lower threshold of relative damage beyond which aversion starts increasing.
:type D_l: float
:param D_u: Upper threshold of relative damage at which alpha reaches its maximum.
:type D_u: float
:returns: Risk aversion factor corresponding to each relative damage value.
:rtype: ndarray
"""
alpha = np.ones_like(D)
mask = (D > D_l) & (D < D_u)
alpha[mask] = 1 + (alpha_max-1)*(D[mask]- D_l)/( D_u- D_l)
alpha[D >= D_u] = alpha_max
return alpha
[docs]
def get_aversion_params(self, model_name):
"""
Return parameter set (alpha_max, D_l, D_u) for the selected aversion model (Merz et al., 2009).
It is used to derive the A1, A2, and A3 aversion models defined as:
* **A1**: alpha_max = 10, D_l = 1.0, D_u = 20.0
* **A2**: alpha_max = 10, D_l = 1.0, D_u = 10.0
* **A3**: alpha_max = 50, D_l = 1.0, D_u = 10.0
:param model_name: Model identifier ("A1", "A2", or "A3").
:type model_name: str
:returns: (alpha_max, D_l, D_u) corresponding to the selected model.
:rtype: tuple
"""
models = {
"A1": (10, 1.0, 20.0),
"A2": (10, 1.0, 10.0),
"A3": (50, 1.0, 10.0),
}
if model_name not in models:
raise ValueError("Model name must be 'A1', 'A2', or 'A3'.")
return models[model_name]
[docs]
def plot_aversion_curves(self, path_output=None):
"""
Display the A1, A2, A3 aversion curves as a matplotlib figure.
"""
d = np.linspace(0, 25, 500)
alpha_A0 = self.plotting_oneslike_alpha(d, alpha_max=1, D_l=1.0, D_u=20.0)
alpha_A1 = self.plotting_oneslike_alpha(d, alpha_max=10, D_l=1.0, D_u=20.0)
alpha_A2 = self.plotting_oneslike_alpha(d, alpha_max=10, D_l=1.0, D_u=10.0)
alpha_A3 = self.plotting_oneslike_alpha(d, alpha_max=50, D_l=1.0, D_u=10.0)
with plt.rc_context({'text.usetex': True, 'font.size': 12, 'font.family': 'serif'}):
fig, ax = plt.subplots(figsize=(6, 4))
ax.plot(d, alpha_A0, label=r'$\alpha_0$', linestyle='-', color='black')
ax.plot(d, alpha_A1, label=r'$\alpha_1$', linestyle='-.', color='black')
ax.plot(d, alpha_A2, label=r'$\alpha_2$', linestyle='--', color='black')
ax.plot(d, alpha_A3, label=r'$\alpha_3$', linestyle=':', color='black')
ax.set_xlabel(r'Relative damage $[\%]$')
ax.set_ylabel(r'Risk aversion factor $\alpha$ [-]')
ax.legend()
ax.grid(alpha=0.3)
fig.tight_layout()
if path_output is not None:
fig.savefig(path_output)
fig.show(block=False)
return fig, ax
#---
[docs]
def computation_combined_damage(self, pond: dict, manager_inbe, aversion_model="A0", total_damage=0) -> pd.DataFrame:
"""
Compute the weighted sum of damage results across multiple return‑period DataFrames.
:param df_results_Ti: Dictionary mapping return‑period keys (e.g. "T2", "T5", ...) to pandas DataFrames.
Each DataFrame must have a "code" column identifying each building and one or more numeric columns representing damage categories.
:type df_results_Ti: dict
:param pond: Dictionary mapping the same return‑period keys to their weighting coefficients (e.g. {"T2": 0.65, "T5": 0.216667, ...}).
:type pond: dict
:return: A DataFrame with one row per unique building "code" (coordinate of the center of the polygon), containing the column "code"
plus each damage category column equal to the weighted sum across all Ti.
:rtype: pd.DataFrame
"""
df_results_Ti = manager_inbe.get_individualdamage()
weighted_dfs = []
cols = ["Ti", "relD", "alpha_A0", "D_A0", "alpha_A1", "D_A1", "alpha_A2", "D_A2", "alpha_A3", "D_A3",]
excel_path = manager_inbe.ALPHA
if excel_path.exists():
alpha_df = pd.read_excel(excel_path)
else:
alpha_df = pd.DataFrame(columns=cols)
logging.info(_(f"Ponderation coefficients (computed automatically based on available set of damage files): {pond}"))
for Ti, df in sorted(df_results_Ti.items()):
coeff = pond.loc[int(Ti), "Ponderation"]
df_w = df.copy()
numeric_cols = df_w.select_dtypes(include="number").columns
damage_Ti_sum = df_w['d_total'].sum()
damage_Ti_sum = float(damage_Ti_sum)
total_damage = float(total_damage)
try:
if total_damage != 0:
relative_damage = 100 * (damage_Ti_sum / total_damage)
else:
relative_damage = 0.0
except (TypeError, ValueError):
logging.error("Invalid type for damage values — cannot compute relative damage.")
relative_damage = np.nan
if aversion_model == "A0":
#df_w[numeric_cols] = df_w[numeric_cols].mul(coeff)
output_path = manager_inbe.OUT_COMB
alpha=1
else:
aversion_model = str(aversion_model)
output_path = manager_inbe.OUT_COMB_DIR / f"combined_damage_aversion_{aversion_model}.xlsx"
alpha=self.get_alpha(relative_damage, aversion_model)
logging.info(f"|For T{Ti}, relative damage = {np.round(relative_damage,3)}%%, aversion {aversion_model} => alpha(Drel) = {np.round(alpha,2)}.") # "%" bugs !!
df_w[numeric_cols] = df_w[numeric_cols].mul(coeff * alpha)
#saving and computing
row_dict = {
"Ti": f"T{Ti}",
"relD": round(relative_damage,3),
"alpha_A0": 1
}
for model in ["A1", "A2", "A3"]:
row_dict[f"alpha_{model}"] = np.round(alpha,3) if aversion_model == model else np.nan
if (alpha_df["Ti"] == f"T{Ti}").any():
idx = alpha_df.index[alpha_df["Ti"] == f"T{Ti}"][0]
alpha_df.loc[idx, "relD"] = round(relative_damage,3)
alpha_df.loc[idx, f"alpha_{aversion_model}"] = np.round(alpha,3)
alpha_df.loc[idx, f"D_{aversion_model}"] = df_w["d_total"].sum()
else:
alpha_df = pd.concat([alpha_df, pd.DataFrame([row_dict])], ignore_index=True)
alpha_df.to_excel(excel_path, index=False)
weighted_dfs.append(df_w)
all_concat = pd.concat(weighted_dfs, ignore_index=True, sort=False)
final_df = all_concat.groupby("code", as_index=False).sum()
try:
with open(output_path, 'wb') as f:
pass
final_df.to_excel(output_path, index=False)
return final_df, output_path
except PermissionError:
logging.error(_(f"Permission denied: please ensure the file '{output_path}' is not open in another program."))
return None, None
[docs]
def plot_combinaison(self, manager_inbe, pond: dict):
damage_dict = manager_inbe.get_individualdamage()
alpha_path = manager_inbe.ALPHA
if not alpha_path.exists():
return logging.error(f"The file {alpha_path} doesn't exist")
alpha_df = pd.read_excel(alpha_path)
models = ["A0", "A1", "A2", "A3"]
return_periods = sorted([int(k) for k in damage_dict.keys()])
hist_df = pd.DataFrame(index=models, columns=return_periods, dtype=float)
for Ti in return_periods:
df = damage_dict[str(Ti)]
coeff = pond.loc[Ti, "Ponderation"]
row = alpha_df[alpha_df["Ti"] == f"T{Ti}"]
if row.empty:
continue
for model in models:
alpha_val = row[f"alpha_{model}"].values[0]
if pd.isna(alpha_val):
continue
#Présent dans alpha file, mais permet de recalculer et comparer pour check si calculs corrects, sinon les valeurs sont dans les D_Anumero
weighted_sum = df["d_total"].sum() * coeff * alpha_val
hist_df.loc[model, Ti] = weighted_sum /1000
bottom = np.zeros(len(models))
pastel_colors = [
(198/255, 219/255, 239/255),
(158/255, 202/255, 225/255),
(107/255, 174/255, 214/255),
(253/255, 174/255, 97/255),
(239/255, 128/255, 62/255),
(203/255, 24/255, 29/255),
(128/255, 0/255, 38/255)
]
totals = hist_df.sum(axis=1)
hist_df_pct = hist_df.div(totals, axis=0) * 100
bottom = np.zeros(len(models))
fig, ax = plt.subplots(figsize=(10, 6))
for i, Ti in enumerate(sorted(return_periods)):
vals = hist_df_pct[Ti].fillna(0).values
plt.bar(models, vals, bottom=bottom, color=pastel_colors[i % len(pastel_colors)], label=f"T{Ti}")
bottom += vals
ax.set_ylabel("Contribution to total weighted damage [%]")
ax.set_xticks(models)
ax.set_xticklabels(["EAD(A0)", "A1", "A2", "A3"])
ax.legend(title="Return period [yrs]", loc='upper center', bbox_to_anchor=(0.5, 1.15), ncol=len(return_periods))
ax.grid(axis='y', linestyle='--', alpha=0.5)
fig.tight_layout()
fig.show()
return fig, ax
[docs]
def plot_damage(self, df_results, idx=None, cat=None, sorted_cond=None):
"""
Displays a damage bar chart for a specific index (specific building) or for all entries, and for all
damage categories or only one.
:param df_results: DataFrame containing damage computed by INBE.
:type df_results: pd.DataFrame
:param idx: Index of a specific row to plot. Defaults to None (every building plotted).
:type idx: int, optional
:param cat: Specific damage category to plot if idx is None (global mode). Defaults to None (every category plotted).
:type cat: str, optional
"""
labels = ['d_cleaning', 'd_removal', 'd_non_stru', 'd_structural', 'd_finishing', 'd_systems']
labels_latex = [r'$D_{\mathrm{cleaning}}$',r'$D_{\mathrm{removal}}$',r'$D_{\mathrm{non stru}}$',r'$D_{\mathrm{structural}}$',r'$D_{\mathrm{finishing}}$',r'$D_{\mathrm{systems}}$']
if cat is not None:
labels = [cat]
if sorted_cond:
df_sorted = df_results.sort_values('d_total')
df_plot = df_sorted
else:
df_plot = df_results
fig, ax = plt.subplots(figsize=(8,6))
if idx is not None:
values = [df_plot[label][idx] / 1e3 for label in labels]
x = np.arange(len(labels))
colors = plt.cm.tab10.colors[:len(labels)]
bars = ax.bar(x, values, color=colors, alpha=0.6)
ax.set_xticks(x)
ax.set_xticklabels(labels, rotation=45)
ax.tick_params(axis='y', labelsize=13)
ax.set_xlabel("Categories of damage [-]")
ax.set_ylabel("d_total [10³€]", fontsize=13)
ax.grid(True, axis='y', linestyle='--', alpha=0.7)
else:
combined = list(zip(labels, labels_latex))
combined_sorted = sorted(combined, key=lambda x: df_plot[x[0]].sum(), reverse=True)
labels, labels_latex = zip(*combined_sorted)
colors = plt.cm.tab10.colors[:len(labels)]
x = np.arange(len(df_plot.index))
bottom = np.zeros(len(df_plot.index))
bars = []
for i, label in enumerate(labels):
values = df_plot[label] / 100
bar = ax.bar(x, values, bottom=bottom, color=colors[i], label=labels[i], alpha=0.6)
bars.append(bar)
bottom += values
ax.set_xticks([]) # pas de ticks sur x
ax.set_ylabel("Damage [10³€]")
ax.legend(loc='lower center', bbox_to_anchor=(0.5, 1.02), ncol=3, borderaxespad=0.)
ax.grid(True, axis='y', linestyle='--', alpha=0.7)
# Tooltip setup - gadget... mais sympa
annot = ax.annotate("", xy=(0,0), xytext=(15,15), textcoords="offset points",
bbox=dict(boxstyle="round", fc="w"),
arrowprops=dict(arrowstyle="->"))
annot.set_visible(False)
def update_annot(bar, idx_bar):
x_mid_data = (ax.get_xlim()[0] + ax.get_xlim()[1]) / 2
y_pos = bar.get_height() + bar.get_y()
annot.xy = (bar.get_x() + bar.get_width() / 2, y_pos)
code = df_plot['code'].iloc[idx_bar]
annot.set_text(f"code: {code}")
annot.get_bbox_patch().set_alpha(0.8)
annot.set_fontsize(10)
arrow_disp = ax.transData.transform(annot.xy)
fig_width = fig.bbox.width
x_text_disp = fig_width / 2
y_text_disp = arrow_disp[1] + 15
annot.set_position((x_text_disp - arrow_disp[0], y_text_disp - arrow_disp[1]))
def hover(event):
vis = annot.get_visible()
for bar_group in bars:
for i, bar in enumerate(bar_group):
if bar.contains(event)[0]:
update_annot(bar, i)
annot.set_visible(True)
fig.canvas.draw_idle()
return
if vis:
annot.set_visible(False)
fig.canvas.draw_idle()
fig.canvas.mpl_connect("motion_notify_event", hover)
fig.tight_layout()
fig.show()
[docs]
type_computation = "from_wolfsimu"
[docs]
def computation_dfesults_forfolder(self, manager_inbe, type_computation, Ti_list, inflation, ifMAX= False): #changer nom "for folder" --> "for selected Ti"
"""
Process Excel files in the folder INPUT>CSVs matching the pattern 'table_wolfsimu_T<number>.xlsx',
extracts and sorts the T identifiers numerically, computes results (INBE) for each file using
computation_dfresults, and returns a dictionary of results keyed by each T identifier.
:param manager_inbe: Object containing the path to the input CSV scenario folder.
:type manager_inbe: Any
:param type_computation: Parameter specifying the type of computation to perform.
:type type_computation: Any
:param inflation: Inflation parameter used in the computation.
:type inflation: Any
:ifMAX: Boolean to indicate if maximum damage computation for aversion
:return: Dictionary with keys as T identifiers (e.g., 'T2') and values as computation results.
:rtype: dict
"""
total_damage = None
df_results = {} #dictionnaire dont la clé = les Ti dispo dans CSVs
df_input = {}
save_output_TEMP = []
save_output_defaultINPUT = []
for curT in Ti_list:
if ifMAX:
input_table = manager_inbe.IN_CSV_SCEN / ("table_wolfsimu_for_max.xlsx")
else :
input_table = manager_inbe.IN_CSV_SCEN / ("table_wolfsimu_" + curT + ".xlsx")
logging.info(_(f"Computing damage for table_wolfsimu{curT}"))
if not input_table.exists():
logging.error(_(f"Input table table_wolfsimu_{curT}.xlsx does not exist."))
continue
df_results[curT], df_input[curT] = self.computation_dfresults(input_table, type_computation, inflation)
output_path = Path(manager_inbe.OUT_SCEN_DIR) / f"individual_damage_{curT}.xlsx"
if ifMAX:
output_path = Path(manager_inbe.INDIV_MAX)
#Dmax - pour (et si) aversion à calculer
total_damage = df_results[curT]["d_total"].sum()
try:
with open(output_path, 'wb') as f:
pass
df_results[curT].to_excel(output_path, index=False)
except PermissionError:
logging.error(_(f"Permission denied: please ensure the file '{output_path}' is not open in another program."))
return
output_path = Path(*output_path.parts[-5:])
save_output_TEMP.append(output_path)
output_path = Path(manager_inbe.OUT_SCEN_DIR_IN) / f"input_with_default_data_{curT}.xlsx"
if ifMAX:
output_path = Path(manager_inbe.OUT_SCEN_DIR_IN) / f"input_with_default_for_max.xlsx"
try:
with open(output_path, 'wb') as f:
pass
df_input[curT].to_excel(output_path, index=False)
except PermissionError:
logging.error(_(f"Permission denied: please ensure the file '{output_path}' is not open in another program."))
return
output_path = Path(*output_path.parts[-5:])
save_output_defaultINPUT.append(output_path)
if ifMAX:
break
return df_results, save_output_TEMP, save_output_defaultINPUT, total_damage
[docs]
def adding_INBEresults_toZones(self, manager_inbex, Zones_trix, Ti = None):
cols = ["code", "d_cleaning", "d_removal", "d_non_stru", "d_structural", "d_finishing", "d_systems", "d_total"]
cols_d = ["d_cleaning", "d_removal", "d_non_stru", "d_structural", "d_finishing", "d_systems", "d_total"]
if Ti != None:
path_to_indiv_damage = manager_inbex.OUT_SCEN_DIR / ("individual_damage_" + str(Ti) + ".xlsx")
df_results = pd.read_excel(path_to_indiv_damage, usecols=cols)
if Ti == None :
Ti = None
df_results = pd.read_excel(manager_inbex.OUT_COMB, usecols=cols)
Zones_torasterize = Zones_trix
for curzone in tqdm(Zones_torasterize.myzones):
for curvector in curzone.myvectors:
row_match = df_results[df_results['code'].astype(str) == str(curvector)]#donne bool
if not row_match.empty:
row = row_match.iloc[0]
for key in cols_d:
read_value = []
read_value.append(str(row[key]))
arr = np.array(read_value, dtype=str)
curzone.add_values(key=str(key), values=arr)
return Zones_torasterize
[docs]
def label_0_needed(self, Zones1):
for curzone in tqdm(Zones1.myzones):
cols_d = ["d_cleaning", "d_removal", "d_non_stru", "d_structural", "d_finishing", "d_systems", "d_total"]
id_vals = [curzone.get_values(col) for col in cols_d]
id_clean = [v[0] if isinstance(v, (list, np.ndarray)) else v for v in id_vals]
if all(v is None for v in id_clean):
for col in cols_d:
curzone.add_values(key=col, values=np.array([0], dtype=int))
[docs]
def raster_damage_in_zones(self, path_damage_out, path_onedanger, Zones_torasterize, scenario, Ti):
self.label_0_needed(Zones_torasterize)
WA_raster = WolfArray(path_onedanger)
WA_raster.unmask
WA_raster.array[WA_raster.array<=99999] = 99999
WA_raster.rasterize_zones_valuebyid(Zones_torasterize, id="d_total", method="nearest")
if Ti != None:
out_title = Path(path_damage_out) / f"inbe_{scenario}_{Ti}.tif"
else :
out_title = Path(path_damage_out) / ("raster_" + str(scenario) + ".tif")
WA_raster.unmask
WA_raster.nodata=99999
WA_raster.write_all(out_title)
return logging.info(_(f"WolfArray written at {out_title}"))
[docs]
def select_and_count_in_zones(self, Zones1, name):
comptage = 0
Zones2=Zones()
for curzone in Zones1.myzones:
if str(curzone.get_values("NATUR_DESC")[0]) == str(name):
comptage +=1
Zones2.add_zone(curzone, forceparent = True)
#logging.info(_(f"Number of {name} : {comptage}"))
return Zones2, comptage
[docs]
def raster_auto(self, name_conditionnel, manager_inbe, which_result, Ti_list=None):
#1. Lecture du PICC
if which_result == "individual_damage":
for Ti in Ti_list:
Zones_tri,unused,unused = self.PICC_read(manager_inbe, name_conditionnel)
logging.info(_(f"Raster creation for {Ti}."))
#3. """Calcul damage : individual and combined""" --> computed and saved in excel
Zones_torasterize = self.adding_INBEresults_toZones(manager_inbe, Zones_tri, Ti = Ti)
tif_files = list(manager_inbe.IN_SA_INTERP.glob("T*.tif"))
if not tif_files:
logging.error(_(r"Please provide the water depth input."))
return
one_danger = tif_files[0]
self.raster_damage_in_zones(manager_inbe.OUT_SCEN_DIR, one_danger, Zones_torasterize, manager_inbe.scenario, Ti)
if which_result == "combined" :
Zones_tri,unused,unused = self.PICC_read(manager_inbe, name_conditionnel)
Zones_tri,unused = self.select_and_count_in_zones(Zones_tri, name_conditionnel)
tif_files = list(manager_inbe.IN_SA_INTERP.glob("T*.tif"))
if not tif_files:
logging.error(_(r"Please provide the water depth input."))
return
one_danger = tif_files[0]
Zones_torasterize = self.adding_INBEresults_toZones(manager_inbe, Zones_tri, Ti = None)
self.raster_damage_in_zones(manager_inbe.OUT_SCEN_DIR, one_danger, Zones_torasterize, manager_inbe.scenario, None)
[docs]
def load_quant_sum_backup(self, path, quant):
d_total = pd.read_excel(path)['d_total']
p75 = d_total.quantile(quant)
return d_total[d_total <= p75].sum()
[docs]
def load_quant_sum(self, path, quant):
df = pd.read_excel(path)
cols = ["d_total", "d_cleaning", "d_removal", "d_non_stru",
"d_structural", "d_finishing", "d_systems"]
results = {}
for col in cols:
series = df[col]
p75 = series.quantile(quant)
results[col] = series[series <= p75].sum()
return results
[docs]
def histo_total_from_list(self, list_path, quant, type=None):
damages = []
labels = []
colors = []
base_color = (212/255, 101/255, 10/255)
alt_colors = [
(119/255, 147/255, 60/255),
(127/255, 127/255, 127/255),
(91/255, 155/255, 213/255),
(255/255, 192/255, 0/255),
(255/255, 0/255, 255/255)
]
for i, path in enumerate(list_path):
res = self.load_quant_sum(path, quant)
D_tot = res["d_total"] / 1000
damages.append(D_tot)
comp = [res["d_cleaning"]/1000,
res["d_removal"]/1000,
res["d_non_stru"]/1000,
res["d_structural"]/1000,
res["d_finishing"]/1000,
res["d_systems"]/1000]
if i == 0:
comps = [comp.copy()]
else:
comps.append(comp.copy())
if type == "Combined damage":
p = Path(path)
scenario = p.parent.parent.name.split("scenario_")[-1]
filename = p.stem.replace("combined_damage_", "")
labels.append(f"{scenario}_{filename}")
else :
labels.append(self.extract_scenario_label(path))
colors.append(base_color if i == 0 else alt_colors[(i - 1) % len(alt_colors)])
base_damage = damages[0]
rel_percents = [0]
if base_damage != 0 :
for dmg in damages[1:]:
rel = round(dmg / base_damage * 100 - 100,3)
rel_percents.append(rel)
else:
logging.error(_(f"No % indicated, because the reference is equal to 0."))
labels2 = [str(lab).replace("scenario_", "") for lab in labels]
bottom = [0]*len(damages)
stack_colors = plt.cm.tab10.colors[:6]
d_labels = ["d_cleaning", "d_removal", "d_non_stru", "d_structural", "d_finishing", "d_systems"]
fig, ax = plt.subplots(figsize=(10, 6))
for j in range(len(comps[0])):
vals = [c[j] for c in comps]
ax.bar(labels2, vals, bottom=bottom, color=stack_colors[j])
bottom = [bottom[k] + vals[k] for k in range(len(vals))]
ax.legend(d_labels, loc='upper center', bbox_to_anchor=(0.5, 1.4), ncol=3)
ylim_max = max(damages) * 1.15
for i in range(1, len(damages)):
if base_damage != 0:
rel = rel_percents[i]
label = f"+{rel}\%" if rel > 0 else f"{rel}\%"
color_txt = 'red' if rel > 0 else 'green'
ax.text(i, damages[i] + ylim_max * 0.02, label, ha='center', color=color_txt, fontsize=13)
ax.set_ylabel('Total damage [k€]')
ax.tick_params(axis='x', labelrotation=90)
ax.grid(alpha=0.3)
ax.set_ylim(0, ylim_max)
fig.tight_layout()
fig.show()
return fig, ax
[docs]
def NO_division_by_zero(self, numerator, denominator):
if denominator == 0:
return 0
else :
return numerator / denominator
[docs]
def analysis_table(self, instances, path_analysis, ref = "comb"):
from openpyxl.styles import Font
from openpyxl import load_workbook
all_possible_Ti = sorted(instances[0].get_list_individual_T(instances[0]), key=int)
skeleton = []
for Ti in all_possible_Ti:
skeleton.append([f"T{Ti}", "Sum D"])
#skeleton.append(["", "Sum Di/Si"])
#skeleton.append(["", "SumD/SumS"])
skeleton.append(["", "Nb"])
#skeleton.append(["", "SumS"])
skeleton.append(["EAD", "Sum D"])
analysis = pd.DataFrame(skeleton, columns=["Return periods", "Metric"])
ref_values = []
path_combD = instances[0].OUT_COMB
df0 = pd.read_excel(path_combD)
SumcombD = df0["d_total"].sum()
percentage_ref = []
for i, inst in enumerate(instances):
j=0 #indice utile pour compter les lignes (structure skeleton imposée, pas simple de savoir où on en est quand on calcule les %, ceci fonctionne)
sa = inst.Study_area
sc = inst.scenario
col_name = f"{sa}/{sc}"
logging.info(f"Computing column {i} : {sa}/{sc}.")
values = []
path_combD_i = inst.OUT_COMB
if path_combD_i.exists():
df_comD_i = pd.read_excel(path_combD_i)
SumcombD_i = df_comD_i["d_total"].sum()
else:
logging.error(f"Missing COMB file for inst={sa}/{sc}, leaving cell empty.")
SumcombD_i = None
for Ti in all_possible_Ti:
path_indivD = inst.OUT_SCEN_DIR / f"individual_damage_T{Ti}.xlsx"
path_defaultI = inst.OUT_SCEN_DIR_IN / f"input_with_default_data_T{Ti}.xlsx"
if not path_indivD.exists() or not path_defaultI.exists():
logging.error(f"Missing file for Ti={Ti}, inst={sa}/{sc}, leaving cells empty.")
#values.extend([None, None, None, None, None])#si plus d'arg
values.extend([None, None])
continue
df1 = pd.read_excel(path_indivD)
df2 = pd.read_excel(path_defaultI)
merged = pd.merge(df1, df2, on="code", how="inner")
sumD = merged["d_total"].sum()
nb = len(merged)
#SumS = merged["FA"].sum()#si plus d'arg
#sum_Di_by_Si = (merged["d_total"] / merged["FA"]).sum()#si plus d'arg
#sumD_by_SumS = self.NO_division_by_zero(sumD, SumS)#si plus d'arg
#values.extend([sumD, sum_Di_by_Si, sumD_by_SumS, nb, SumS])#si plus d'arg
values.extend([sumD, nb])
values.append(SumcombD_i)
if len(values) != len(analysis):
logging.warning(f"Length mismatch for {col_name}: {len(values)} vs {len(analysis)}. Filling missing with None.")
while len(values) < len(analysis):
values.append(None)
analysis[col_name] = values
if ref == "comb":
#ref_values_with_comb = [SumcombD, 0, 0, 0, 0]*len(all_possible_Ti) + [SumcombD] #si plus d'arg
ref_values_with_comb = [SumcombD, 0]*len(all_possible_Ti) + [SumcombD]
comp_values = []
delta_values = []
for v, r in zip(values, ref_values_with_comb):
#Le %
if v is None or r == 0:
comp_values.append(None)
delta_values.append(None)
else:
val_percentage=round((v / r) * 100, 1)
comp_values.append(val_percentage)
if i == 0 : #Si 1e instance, alors c'est la ref
percentage_ref.append(val_percentage)
#La comparaison à la ref % - %
#jbis = int(j/len([SumcombD, 0, 0, 0, 0])) #si plus d'arg
jbis = int(j/len([SumcombD, 0]))
delta_values.append(percentage_ref[jbis] - val_percentage)
j+=1
comp_col_name = f"{col_name} % vs ref"
idx = analysis.columns.get_loc(col_name) + 1
analysis.insert(idx, comp_col_name, comp_values)
empty_col_name = f"DELTA {col_name}"
analysis.insert(idx + 1, empty_col_name, delta_values)
else:
if i == 0:
ref_values = values.copy()
logging.error(_("First column taken as reference."))
else:
comp_values = []
for v, r in zip(values, ref_values):
if v is None or r == 0:
comp_values.append(None)
else:
comp_values.append(round((v / r - 1) * 100, 3))
comp_col_name = f"{col_name} % vs ref"
idx = analysis.columns.get_loc(col_name) + 1
analysis.insert(idx, comp_col_name, comp_values)
try:
analysis.to_excel(path_analysis, index=False)
except PermissionError as e:
logging.error("permission denied : please check if the file is closed or used in another program")
return False, str(e)
wb = load_workbook(path_analysis)
ws = wb.active
for col in ws.iter_cols(min_col=3, max_col=ws.max_column):
if "% vs ref" in str(col[0].value):
for cell in col[1:]:
if cell.value is not None:
if cell.value > 0:
cell.font = Font(color="FF0000") # vert
elif cell.value < 0:
cell.font = Font(color="008000") # rouge
try:
wb.save(path_analysis)
logging.info(f"Table analysis with % comparison and color created in {path_analysis}.")
return True, ""
except PermissionError as e:
logging.error("permission denied : please check if the file is closed or used in another program")
return False, str(e)
[docs]
def scatter_INBE_dtotal(out, manager1, manager2, max_val=None, Ti=None, quant=None):
#...2 : baseline (...1 is the scenario we compare to ...2)
if Ti != None:
path2 = Path(manager1.OUT_SCEN_DIR) / f"individual_damage_T{Ti}.xlsx"
path1 = Path(manager2.OUT_SCEN_DIR) / f"individual_damage_T{Ti}.xlsx"
title = rf"{Ti}-year flood"
else :
path2 = Path(manager1.OUT_COMB)
path1 = Path(manager2.OUT_COMB)
title = r"Combined damage"
df1 = pd.read_excel(path1)
df2 = pd.read_excel(path2)
df1 = df1[['code', 'd_total']].copy()
df2 = df2[['code', 'd_total']].copy()
df1.rename(columns={'d_total': 'd_total_sc1'}, inplace=True)
df2.rename(columns={'d_total': 'd_total_sc2'}, inplace=True)
merged = pd.merge(df1, df2, on='code', how='outer')
if quant != None :
percentile_sc1 = merged['d_total_sc1'].quantile(quant)
percentile_sc2 = merged['d_total_sc2'].quantile(quant)
filtered = merged[(merged['d_total_sc1'] <= percentile_sc1) | (merged['d_total_sc2'] <= percentile_sc2)]
merged=filtered
#ENSEMBLES
common = merged[ merged['d_total_sc1'].notna() & merged['d_total_sc2'].notna() ]
only_sc1 = merged[ merged['d_total_sc1'].notna() & merged['d_total_sc2'].isna() ]
only_sc2 = merged[ merged['d_total_sc2'].notna() & merged['d_total_sc1'].isna() ]
only_sc1_filled = only_sc1.copy()
only_sc1_filled['d_total_sc2'] = 0
only_sc2_filled = only_sc2.copy()
only_sc2_filled['d_total_sc1'] = 0
fig, ax = plt.subplots(figsize=(8, 8))
ax.scatter(common['d_total_sc2'], common['d_total_sc1'], marker = 'x', color='black')
ax.scatter(only_sc1_filled['d_total_sc2'], only_sc1_filled['d_total_sc1'], marker = 'x', color='black')
ax.scatter(only_sc2_filled['d_total_sc2'], only_sc2_filled['d_total_sc1'], marker = 'x', color='black')
label2 = manager1.scenario
label1 = manager2.scenario
if max_val ==None :
max_val = merged[['d_total_sc1', 'd_total_sc2']].max().max()
xlabel = rf"Total damage [€] for {label2}"
ylabel = rf"Total damage [€] for {label1}"
ax.plot([0, max_val], [0, max_val], '--', color='grey')
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.set_title(title, loc='left')
ax.grid(True)
fig.tight_layout()
fig.show()
return fig, ax