Source code for wolfhece.insyde_be.INBE_func

"""
Author: University of Liege, HECE, Damien Sansen
Date: 2025

Copyright (c) 2025 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

"""Imports et bons paths"""
import os
import geopandas as gpd
import logging
from pathlib import Path
from tqdm import tqdm
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from osgeo import gdal
from scipy.ndimage import label
from scipy.ndimage import binary_dilation

from ..wolf_vrt import create_vrt_from_diverged_files_first_based, translate_vrt2tif
from ..wolf_array import WolfArray
from ..PyTranslate import _
from ..PyVertexvectors import Zones
from ..picc import Picc_data, bbox_creation
from ..Results2DGPU import wolfres2DGPU

try:
    from wolfdamage.insyde_be.Py_INBE import INBE_Manager
except ImportError:
    raise ImportError("The module 'wolfdamage' is required. Please, install it.")

[docs] class INBE_functions(): def __init__(self): pass
[docs] def riverbed_trace_existingdanger(self, wd_tif: Path, interation_choice: int = 2, rivermask_out: Path = None, threshold = 0.5): """Creates a bool matrix with the largest connected component of a raster after thresholding. This function opens a GeoTIFF (float32), applies a threshold to create a mask, isolates the largest object (the riverbed) by removing noise, and then applies a binary dilation to ensure INSYDE-BE does not use hazards values from the river. This returns a boolean matrix mask.""" WA_river = WolfArray(wd_tif) mask = WA_river.array > threshold structure = np.array([[0,1,0], [1,1,1], [0,1,0]], dtype=int) labeled, n = label(mask, structure) if n > 0: sizes = np.bincount(labeled.ravel()) sizes[0] = 0 largest_label = sizes.argmax() mask = labeled == largest_label else: mask[:] = False mask = binary_dilation(mask, iterations=interation_choice) if rivermask_out is not None: WA_river.array[:,:] = mask[:,:] WA_river.array[:,:] = mask.astype(np.float32) #! conversion implicite Numpy is no astype WA_river.write_all(rivermask_out) return mask
[docs] def riverbed_trace_simu(self, fn_read_simu, fn_output, threshold, type_extraction, id_begin=None, id_end=None, id_step=None): """ Recognizes the riverbed trace based on a simulation or existing danger map, where water depth above a given threshold is considered part of the riverbed. Inputs: - fn_read_simu: the simulation file to read. - fn_output: the location to save the riverbed trace as a .tiff file. - threshold: the water depth threshold above which the areas are considered riverbed. """ if type_extraction == "last_step": wolfres2DGPU_test = wolfres2DGPU(fn_read_simu) wolfres2DGPU_test.read_oneresult(-1) wd = wolfres2DGPU_test.get_h_for_block(1) #wd.array[wd.array > 1000] = 0 #wd.array[wd.array > threshold] = 1 #wd.array[wd.array < threshold] = 0 wd.array[:] = np.where((wd.array > threshold) & (wd.array <= 1000), 1, 0) wd.nodata=0 wd.write_all(Path(fn_output)) elif type_extraction=="danger_map": wd = wolfres2DGPU(fn_read_simu).danger_map_only_h(id_begin,id_end,id_step) wd.array[:] = np.where((wd.array > threshold) & (wd.array <= 1000), 1, 0) wd.nodata=0 wd.write_all(Path(fn_output))
[docs] def dilatation_mask_river(self, manager_inbe): mask_river = Path(manager_inbe.IN_RIVER_MASK_SCEN) / "Masked_River_extent.tiff" WA_mask_river = WolfArray(mask_river) array = binary_dilation(WA_mask_river.array) WA_mask_river.array[:,:] = array[:,:] out = Path(manager_inbe.IN_RIVER_MASK_SCEN_tif) WA_mask_river.write_all(out) return print(f"Dilated mask river written in {Path(manager_inbe.IN_RIVER_MASK_SCEN_tif)}")
[docs] def apply_bathy_to_rivermask(self, rivermask_out: Path, source_tif: Path, output_tif: Path): WA_mask = WolfArray(rivermask_out) WA_src = WolfArray(source_tif) nodata = WA_mask.nodata if nodata != 99999.0: nodata = 99999.0 out_arr = np.where(WA_mask.array, WA_src.array, nodata).astype(np.float32) WA_src.array[:,:] = out_arr[:,:] WA_mask.nodata = nodata WA_src.write_all(output_tif)
# Assembly (FR : agglomération) # ----------------------------- """Basically the same operations as in the config manager to agglomerate several rasters The class Config_Manager_2D_GPU is called, however some functions were rewritten to allow the search of a more specific word ('vuln', and not 'bath', 'mann', or 'inf'). """
[docs] def tree_name_tif(self, folder_path, name): """Find all .tiff files starting with 'vuln' in the directory and return paths""" folder = Path(folder_path) vuln_tiff_files = {file for file in folder.rglob("*.tiff") if file.name.startswith(name)} vuln_tif_files = {file for file in folder.rglob("*.tif") if file.name.startswith(name)} vuln_files = vuln_tiff_files.union(vuln_tif_files) tiff_trees = [] if len(vuln_files) !=0: for tiff in vuln_files: curtree = [tiff] while tiff.parent != folder: tiff = tiff.parent curtree.insert(0, tiff) tiff_trees.append(curtree) return tiff_trees
[docs] def select_name_tif(self, path_baseline: Path, folder_path: Path, name, filter_type = False) -> tuple[list[Path], list[Path]]: """ Collects all .tiff files starting with `name` from `folder_path` and appends them to a list. Checks each file's data type against the baseline raster and renames files with a mismatched type, (allowing it not to be surimposed as it does not begin by "vuln_" anymore). :param path_baseline: Path to the baseline raster file. :type path_baseline: pathlib.Path :param folder_path: Path to the folder containing raster files to check. :type folder_path: pathlib.Path :param name: Prefix name to filter .tiff files in the folder. :type name: str :return: A tuple containing two lists: - files: List of paths with matching data type. - unused_files: List of paths renamed due to type mismatch. :rtype: tuple[list[pathlib.Path], list[pathlib.Path]] """ files = [] unused_files = [] #type of base of assembly WA_open = WolfArray(path_baseline.as_posix()) type_base = WA_open.array.dtype.name #gdal.GetDataTypeName files.append(path_baseline.as_posix()) #any files that begin by 'name' tiff_trees = self.tree_name_tif(folder_path, name) for tree in tiff_trees: file_path = tree[-1] ds = WolfArray(file_path.as_posix()) file_type = ds.array.dtype.name ds = None #filter if not the same type and renaming if filter_type: if file_type != type_base: suffix_to_add = f"_{file_type}" new_name = file_path.parent / f"{file_path.name}{suffix_to_add}" if not file_path.stem.endswith(suffix_to_add): file_path.rename(new_name) unused_files.append(new_name) else: unused_files.append(file_path) else: files.append(file_path.as_posix()) else: files.append(file_path.as_posix()) return files, unused_files
[docs] def check_nodata(self, name, path_baseline, fn_scenario): """ Check nodata in a path """ from ..wolf_array import WOLF_ARRAY_FULL_INTEGER8 list_tif, unused = self.select_name_tif(path_baseline, fn_scenario, name) for cur_lst in list_tif: curarray:WolfArray = WolfArray(cur_lst) if curarray.wolftype != WOLF_ARRAY_FULL_INTEGER8: if curarray.nullvalue != 99999.: curarray.nullvalue = 99999. curarray.set_nullvalue_in_mask() curarray.write_all() logging.warning(_('nodata changed in favor of 99999. value for file {} !'.format(cur_lst))) else: logging.info(_('nodata value is {} for file {} !'.format(curarray.nullvalue, cur_lst)))
[docs] def create_vrtIfExists(self, fn_baseline, fn_scenario, fn_vrt, name): """ Create a vrt file from a path """ logging.info(_('Checking nodata values...')) self.check_nodata(name, fn_baseline, fn_scenario) list_tif, list_fail = self.select_name_tif(fn_baseline, fn_scenario, name, filter_type = True) if len(list_fail)>0: return False, list_fail #création du fichier vrt - assembly/agglomération if len(list_tif)>1: logging.info(_('Creating .vrt from files (first based)...')) if name == "vuln_": create_vrt_from_diverged_files_first_based(list_tif, fn_vrt, Nodata = 127) else: create_vrt_from_diverged_files_first_based(list_tif, fn_vrt) return True, list_fail else: return False, list_fail
[docs] def translate_vrt2tif(self, fn_VRT, fn_vuln_s): """ Translate vrt from OUTPUT > ... > Scenario to tif saved in the same folder, and delete the vrt file """ if (fn_VRT).exists(): translate_vrt2tif(fn_VRT, fn_vuln_s) os.remove(fn_VRT)
[docs] def copy_tif_files(self, files: list[Path], destination_dir: Path) -> None: destination_dir.mkdir(parents=True, exist_ok=True) for file in files: destination_file = destination_dir / file.name dataset = WolfArray(file) #gdal.open if dataset is None: logging.warning(f"Could not open {file} with GDAL.") continue dataset.write_all(str(destination_file)) dataset = None logging.info(_("All the existing .tif files have been copied to the destination directory."))
# Divers
[docs] def name_existence(self, Zones1, key, name): names = [str(curzone.get_values(key)[0]) for curzone in Zones1.myzones] if (str(name) in names): logging.info(_(f'There exists objects with name {name}.')) else : logging.info(_(f'No objects with name {name}'))
[docs] def PICC_read(self, manager_inbe, name:str = "Habitation"): study_area_path = Path(manager_inbe.IN_STUDY_AREA) / manager_inbe._study_area bbox = bbox_creation(study_area_path) _picc_data = Picc_data(data_dir=Path(),bbox=bbox) path_vector = Path(manager_inbe.IN_DATABASE) / "PICC_Vesdre.shp" if not path_vector.exists(): return None, None zones_hab = _picc_data.create_zone_picc(path_vector = path_vector, name=name, column = "GEOREF_ID", bbox=bbox) return zones_hab, zones_hab.nbzones, bbox
[docs] def create_table_wolfsimu(self, manager_inbe, simu, operator_wd:str, operator_doi:str, hazard = ["wd", "velocity", "flood duration"], ZonesX:Zones = None, Zones_v_doi:Zones = None, removed_buildings = [], percentil:float=None, percentil_doi:float=None, percentil_v:float=None, title: str = "table_wolfsimu.xlsx"): """Creates the minimum inputs needed for INBE based on the simulations, via interpolated files and operator for wd, and via buffer for other (doi, v). One table per simulations.""" where_wd = manager_inbe.IN_SA_INTERP where_v = manager_inbe.IN_SCEN_DIR_V where_doi = manager_inbe.IN_SCEN_DIR_D out= manager_inbe.IN_CSV_SCEN out = Path(out) / title.replace(".xlsx", f"_{simu}.xlsx") colonnes = [ "code", "he_max", "he_out", "h_gf", "v", "date_arr_base", "date_leave_base", "no_s", "no_q", "FA", "NF", "GL", "YY", "Y_renov", "EP" ] df = pd.DataFrame(columns=colonnes) where_v = Path(where_v) / f"v_danger_{simu}_{manager_inbe.scenario}.tif" where_doi = Path(where_doi) / f"doi_danger_{simu}_{manager_inbe.scenario}.tif" simu_path = simu + ".tif" if not (Path(where_wd) / simu_path).exists(): logging.info(_(f"where_wd : {Path(where_wd) / simu_path}")) return logging.info(_(f"Files of water depths do not exist.")) #Exclure le mask dilaté (car pas plein bord...) de la rivière ! WA_river = WolfArray(manager_inbe.IN_RIVER_MASK_SCEN_tif) i,j = np.where(~WA_river.array.mask) WA_wd = WolfArray(where_wd / simu_path) WA_wd.array[i,j] = WA_wd.nodata if "velocity" in hazard: if not Path(where_v).exists(): logging.info(_(f"where_v : {where_v}")) return logging.error(_(f"Files of velocity danger maps do not exist.")) WA_v = WolfArray(where_v) WA_v.array[i,j] = WA_v.nodata if "flood duration" in hazard: if not Path(where_doi).exists(): logging.info(_(f"where_doi : {where_doi}")) return logging.error(_(f"Files of flood inundation danger maps do not exist.")) WA_doi = WolfArray(where_doi) WA_doi.array[i,j] = WA_doi.nodata i = 0 val_h = 0 logging.info(_("Scanning zone for water depth inside buildings.")) for curzone in tqdm(ZonesX.myzones): for curvector in curzone.myvectors: if i >= ZonesX.nbzones: break if str(curvector) in removed_buildings: continue wd_values = WA_wd.get_values_insidepoly(curvector) wd_filtered = [x for x in wd_values[0] if x not in (0, None)] wd_filtered = np.array(wd_filtered) if len(wd_filtered) != 0:#np.all(len(wd_values[0]) != 0): #critère sur WD #statistics WA if operator_wd == "mean": val_h = wd_filtered.mean() elif operator_wd == "max": val_h = wd_filtered.max() elif operator_wd == "median": val_h = np.median(wd_filtered) elif operator_wd == "percentil": if percentil != None: val_h = np.quantile(wd_filtered, percentil / 100) else: logging.error(_(f"Bad value for percentil {percentil}")) return else: logging.error(_("To be coded.")) if val_h !=0: df.loc[i, "code"] = str(curvector) df.loc[i, "he_out"] = val_h df.loc[i, "FA"] = curzone.area curvector.update_lengths() df.loc[i, "EP"] = curvector.length2D#perimeters[0] i += 1 if any(c in hazard for c in ["velocity", "flood duration"]): logging.info(_("Scanning zone for velocity along the buffers.")) for curzone_v in tqdm(Zones_v_doi.myzones): for curvector_v in curzone_v.myvectors: if i >= Zones_v_doi.nbzones: break if str(curvector_v) not in df['code'].astype(str).values: continue if "velocity" in hazard: v_values = WA_v.get_values_insidepoly(curvector_v, usemask=False) v_values[0][v_values[0] == -99999] = 0 v_filtered = [x for x in v_values[0] if x not in (0, None)] v_filtered = np.array(v_filtered) if len(v_filtered) != 0: df.loc[df['code'] == str(curvector_v), "v"] = np.quantile(v_filtered, percentil_v / 100) if "flood duration" in hazard: doi_values = WA_doi.get_values_insidepoly(curvector_v, usemask=False) doi_values[0][doi_values[0] == -99999] = 0 doi_filtered = [x for x in doi_values[0] if x not in (0, None)] doi_filtered = np.array(doi_filtered) if len(doi_filtered) != 0: if operator_doi == "mean": val_doi = doi_filtered.mean() elif operator_doi == "max": val_doi = doi_filtered.max() #simplifier max et median avec percentil auto elif operator_doi == "median": val_doi = np.median(doi_filtered) elif operator_doi == "percentil": if percentil_doi != None: val_doi = np.quantile(doi_filtered, percentil_doi / 100) else: logging.error(_(f"Bad value for percentil doi {percentil_doi}")) return if val_doi > 0 : df.loc[df['code'] == str(curvector_v), "date_arr_base"] = val_doi /3600 df.loc[df['code'] == str(curvector_v), "date_leave_base"] = 0 try: df.to_excel(out, index=False) except PermissionError: logging.error(_(f"Permission denied: please ensure the file '{out.name}' is not open in another program.")) return
[docs] def pre_processing_auto(self, name_conditionnel, Ti_list, main_dir, Study_area, scenario, multiple, dx, removed_buildings = [], percentil = None, percentil_doi = None, percentil_v=None, operator_wd = "median", operator_doi = "median", hazard = ["wd", "velocity", "flood duration"]): manager_inbe = INBE_Manager(main_dir=main_dir, Study_area=Study_area, scenario = scenario) Zones_reading, unused, unused = self.PICC_read(manager_inbe, name_conditionnel) """Buffer creation""" dx_buffer = multiple #multiple de la resolution topo dx_buffer = multiple*dx Zones_v_doi = None if any(c in hazard for c in ["velocity", "flood duration"]): Zones_v_doi = Zones_reading.buffer(distance = dx_buffer, resolution= 4, inplace = False) save_where_output = [] for curT in Ti_list: #create_table_wolfsimu_BUFFERWD(manager_inbe = manager_inbe, simu=curT, ZonesX= Zones_reading, Zones_v_doi = Zones_reading2) self.create_table_wolfsimu(manager_inbe = manager_inbe, simu=curT, operator_wd = operator_wd, operator_doi = operator_doi, hazard=hazard, ZonesX= Zones_reading, Zones_v_doi = Zones_v_doi, removed_buildings = removed_buildings, percentil=percentil, percentil_doi=percentil_doi, percentil_v=percentil_v) logging.info(_(f"--Preprocessing table created in {manager_inbe.IN_CSV_SCEN}")) p = Path(manager_inbe.IN_CSV_SCEN) save_where_output.append(Path(*p.parts[-4:])) return save_where_output
[docs] def pre_processing_for_MAX(self, name_conditionnel, main_dir, Study_area, scenario, removed_buildings = []): manager_inbe = INBE_Manager(main_dir=main_dir, Study_area=Study_area, scenario = scenario) ZonesX, unused, unused = self.PICC_read(manager_inbe, name_conditionnel) title = "table_wolfsimu_for_max.xlsx" #self.create_table_wolfsimu(manager_inbe = manager_inbe, simu=curT, ZonesX= Zones_reading, Zones_v_doi = True, percentil=percentil) out= manager_inbe.IN_CSV_SCEN out = Path(out) / title colonnes = [ "code", "he_max", "he_out", "h_gf", "v", "date_arr_base", "date_leave_base", "no_s", "no_q", "FA", "NF", "GL", "YY", "Y_renov", "EP" ] df = pd.DataFrame(columns=colonnes) logging.info(_("Scanning zone for water depth inside buildings.")) i=0 for curzone in tqdm(ZonesX.myzones): for curvector in curzone.myvectors: if i >= ZonesX.nbzones: break if str(curvector) in removed_buildings: continue df.loc[i, "code"] = str(curvector) df.loc[i, "he_out"] = 99999 df.loc[i, "v"] = 99999 df.loc[i, "FA"] = curzone.area curvector.update_lengths() df.loc[i, "EP"] = curvector.length2D#perimeters[0] i += 1 try: df.to_excel(out, index=False) except PermissionError: logging.error(_(f"Permission denied: please ensure the file '{out.name}' is not open in another program.")) return logging.info(_(f"--Preprocessing table created in {manager_inbe.IN_CSV_SCEN}"))
[docs] def computation_dfresults(self, input_table,type_computation, inflation): Pyinsyde = PyINBE(input_table=input_table) df_results,unused,unused,unused,unused,unused,unused,unused,df_input = Pyinsyde.run_R_insyde(type_computation=type_computation, inflation=inflation) df_ifvalue = df_results[df_results['d_total'] > 0] return df_ifvalue, df_input
#--- Aversion models Merz et al. (2009), DOI 10.5194/nhess-9-1033-2009
[docs] def get_alpha(self, relative_damage, model_name): """ Return the risk aversion factor alpha for a given relative damage and model. :param relative_damage: Relative damage value (%) for the municipality. :type relative_damage: float :param model_name: Model identifier ("A1", "A2", "A3"). :type model_name: str :returns: Risk aversion factor corresponding to the given relative damage. :rtype: float .. note:: Implements the multiplicative aversion model from Merz et al. (2009): * **A1**: alpha_max=10, D_l=1.0, D_u=20.0 * **A2**: alpha_max=10, D_l=1.0, D_u=10.0 * **A3**: alpha_max=50, D_l=1.0, D_u=10.0 """ if model_name == "A0": return 1 alpha_max, D_l, D_u = self.get_aversion_params(model_name) if relative_damage <= D_l: return 1.0 elif relative_damage >= D_u: return alpha_max else: return 1.0 + ( (alpha_max - 1.0) / ( D_u - D_l) ) * (relative_damage - D_l)
[docs] def plotting_oneslike_alpha(self, D, alpha_max, D_l, D_u): """ Compute the multiplicative risk aversion factor alpha(D) as proposed by Merz et al. (2009), DOI 10.5194/nhess-9-1033-2009. The function models the increase in risk aversion with the relative damage D, using a linear transition between $D_l$ and $D_u$, and a constant alpha beyond $D_u$. :param D: Relative damage values (%) at the intervention zone scale (ratio of total event damage to total exposed asset value). :type D: array_like :param alpha_max: Maximum aversion factor applied for high-damage events. :type alpha_max: float :param D_l: Lower threshold of relative damage beyond which aversion starts increasing. :type D_l: float :param D_u: Upper threshold of relative damage at which alpha reaches its maximum. :type D_u: float :returns: Risk aversion factor corresponding to each relative damage value. :rtype: ndarray """ alpha = np.ones_like(D) mask = (D > D_l) & (D < D_u) alpha[mask] = 1 + (alpha_max-1)*(D[mask]- D_l)/( D_u- D_l) alpha[D >= D_u] = alpha_max return alpha
[docs] def get_aversion_params(self, model_name): """ Return parameter set (alpha_max, D_l, D_u) for the selected aversion model (Merz et al., 2009). It is used to derive the A1, A2, and A3 aversion models defined as: * **A1**: alpha_max = 10, D_l = 1.0, D_u = 20.0 * **A2**: alpha_max = 10, D_l = 1.0, D_u = 10.0 * **A3**: alpha_max = 50, D_l = 1.0, D_u = 10.0 :param model_name: Model identifier ("A1", "A2", or "A3"). :type model_name: str :returns: (alpha_max, D_l, D_u) corresponding to the selected model. :rtype: tuple """ models = { "A1": (10, 1.0, 20.0), "A2": (10, 1.0, 10.0), "A3": (50, 1.0, 10.0), } if model_name not in models: raise ValueError("Model name must be 'A1', 'A2', or 'A3'.") return models[model_name]
[docs] def plot_aversion_curves(self, path_output=None): """ Display the A1, A2, A3 aversion curves as a matplotlib figure. """ d = np.linspace(0, 25, 500) alpha_A0 = self.plotting_oneslike_alpha(d, alpha_max=1, D_l=1.0, D_u=20.0) alpha_A1 = self.plotting_oneslike_alpha(d, alpha_max=10, D_l=1.0, D_u=20.0) alpha_A2 = self.plotting_oneslike_alpha(d, alpha_max=10, D_l=1.0, D_u=10.0) alpha_A3 = self.plotting_oneslike_alpha(d, alpha_max=50, D_l=1.0, D_u=10.0) with plt.rc_context({'text.usetex': True, 'font.size': 12, 'font.family': 'serif'}): fig, ax = plt.subplots(figsize=(6, 4)) ax.plot(d, alpha_A0, label=r'$\alpha_0$', linestyle='-', color='black') ax.plot(d, alpha_A1, label=r'$\alpha_1$', linestyle='-.', color='black') ax.plot(d, alpha_A2, label=r'$\alpha_2$', linestyle='--', color='black') ax.plot(d, alpha_A3, label=r'$\alpha_3$', linestyle=':', color='black') ax.set_xlabel(r'Relative damage $[\%]$') ax.set_ylabel(r'Risk aversion factor $\alpha$ [-]') ax.legend() ax.grid(alpha=0.3) fig.tight_layout() if path_output is not None: fig.savefig(path_output) fig.show(block=False) return fig, ax
#---
[docs] def computation_combined_damage(self, pond: dict, manager_inbe, aversion_model="A0", total_damage=0) -> pd.DataFrame: """ Compute the weighted sum of damage results across multiple return‑period DataFrames. :param df_results_Ti: Dictionary mapping return‑period keys (e.g. "T2", "T5", ...) to pandas DataFrames. Each DataFrame must have a "code" column identifying each building and one or more numeric columns representing damage categories. :type df_results_Ti: dict :param pond: Dictionary mapping the same return‑period keys to their weighting coefficients (e.g. {"T2": 0.65, "T5": 0.216667, ...}). :type pond: dict :return: A DataFrame with one row per unique building "code" (coordinate of the center of the polygon), containing the column "code" plus each damage category column equal to the weighted sum across all Ti. :rtype: pd.DataFrame """ df_results_Ti = manager_inbe.get_individualdamage() weighted_dfs = [] cols = ["Ti", "relD", "alpha_A0", "D_A0", "alpha_A1", "D_A1", "alpha_A2", "D_A2", "alpha_A3", "D_A3",] excel_path = manager_inbe.ALPHA if excel_path.exists(): alpha_df = pd.read_excel(excel_path) else: alpha_df = pd.DataFrame(columns=cols) logging.info(_(f"Ponderation coefficients (computed automatically based on available set of damage files): {pond}")) for Ti, df in sorted(df_results_Ti.items()): coeff = pond.loc[int(Ti), "Ponderation"] df_w = df.copy() numeric_cols = df_w.select_dtypes(include="number").columns damage_Ti_sum = df_w['d_total'].sum() damage_Ti_sum = float(damage_Ti_sum) total_damage = float(total_damage) try: if total_damage != 0: relative_damage = 100 * (damage_Ti_sum / total_damage) else: relative_damage = 0.0 except (TypeError, ValueError): logging.error("Invalid type for damage values — cannot compute relative damage.") relative_damage = np.nan if aversion_model == "A0": #df_w[numeric_cols] = df_w[numeric_cols].mul(coeff) output_path = manager_inbe.OUT_COMB alpha=1 else: aversion_model = str(aversion_model) output_path = manager_inbe.OUT_COMB_DIR / f"combined_damage_aversion_{aversion_model}.xlsx" alpha=self.get_alpha(relative_damage, aversion_model) logging.info(f"|For T{Ti}, relative damage = {np.round(relative_damage,3)}%%, aversion {aversion_model} => alpha(Drel) = {np.round(alpha,2)}.") # "%" bugs !! df_w[numeric_cols] = df_w[numeric_cols].mul(coeff * alpha) #saving and computing row_dict = { "Ti": f"T{Ti}", "relD": round(relative_damage,3), "alpha_A0": 1 } for model in ["A1", "A2", "A3"]: row_dict[f"alpha_{model}"] = np.round(alpha,3) if aversion_model == model else np.nan if (alpha_df["Ti"] == f"T{Ti}").any(): idx = alpha_df.index[alpha_df["Ti"] == f"T{Ti}"][0] alpha_df.loc[idx, "relD"] = round(relative_damage,3) alpha_df.loc[idx, f"alpha_{aversion_model}"] = np.round(alpha,3) alpha_df.loc[idx, f"D_{aversion_model}"] = df_w["d_total"].sum() else: alpha_df = pd.concat([alpha_df, pd.DataFrame([row_dict])], ignore_index=True) alpha_df.to_excel(excel_path, index=False) weighted_dfs.append(df_w) all_concat = pd.concat(weighted_dfs, ignore_index=True, sort=False) final_df = all_concat.groupby("code", as_index=False).sum() try: with open(output_path, 'wb') as f: pass final_df.to_excel(output_path, index=False) return final_df, output_path except PermissionError: logging.error(_(f"Permission denied: please ensure the file '{output_path}' is not open in another program.")) return None, None
[docs] def plot_combinaison(self, manager_inbe, pond: dict): damage_dict = manager_inbe.get_individualdamage() alpha_path = manager_inbe.ALPHA if not alpha_path.exists(): return logging.error(f"The file {alpha_path} doesn't exist") alpha_df = pd.read_excel(alpha_path) models = ["A0", "A1", "A2", "A3"] return_periods = sorted([int(k) for k in damage_dict.keys()]) hist_df = pd.DataFrame(index=models, columns=return_periods, dtype=float) for Ti in return_periods: df = damage_dict[str(Ti)] coeff = pond.loc[Ti, "Ponderation"] row = alpha_df[alpha_df["Ti"] == f"T{Ti}"] if row.empty: continue for model in models: alpha_val = row[f"alpha_{model}"].values[0] if pd.isna(alpha_val): continue #Présent dans alpha file, mais permet de recalculer et comparer pour check si calculs corrects, sinon les valeurs sont dans les D_Anumero weighted_sum = df["d_total"].sum() * coeff * alpha_val hist_df.loc[model, Ti] = weighted_sum /1000 bottom = np.zeros(len(models)) pastel_colors = [ (198/255, 219/255, 239/255), (158/255, 202/255, 225/255), (107/255, 174/255, 214/255), (253/255, 174/255, 97/255), (239/255, 128/255, 62/255), (203/255, 24/255, 29/255), (128/255, 0/255, 38/255) ] totals = hist_df.sum(axis=1) hist_df_pct = hist_df.div(totals, axis=0) * 100 bottom = np.zeros(len(models)) fig, ax = plt.subplots(figsize=(10, 6)) for i, Ti in enumerate(sorted(return_periods)): vals = hist_df_pct[Ti].fillna(0).values plt.bar(models, vals, bottom=bottom, color=pastel_colors[i % len(pastel_colors)], label=f"T{Ti}") bottom += vals ax.set_ylabel("Contribution to total weighted damage [%]") ax.set_xticks(models) ax.set_xticklabels(["EAD(A0)", "A1", "A2", "A3"]) ax.legend(title="Return period [yrs]", loc='upper center', bbox_to_anchor=(0.5, 1.15), ncol=len(return_periods)) ax.grid(axis='y', linestyle='--', alpha=0.5) fig.tight_layout() fig.show() return fig, ax
[docs] def plot_damage(self, df_results, idx=None, cat=None, sorted_cond=None): """ Displays a damage bar chart for a specific index (specific building) or for all entries, and for all damage categories or only one. :param df_results: DataFrame containing damage computed by INBE. :type df_results: pd.DataFrame :param idx: Index of a specific row to plot. Defaults to None (every building plotted). :type idx: int, optional :param cat: Specific damage category to plot if idx is None (global mode). Defaults to None (every category plotted). :type cat: str, optional """ labels = ['d_cleaning', 'd_removal', 'd_non_stru', 'd_structural', 'd_finishing', 'd_systems'] labels_latex = [r'$D_{\mathrm{cleaning}}$',r'$D_{\mathrm{removal}}$',r'$D_{\mathrm{non stru}}$',r'$D_{\mathrm{structural}}$',r'$D_{\mathrm{finishing}}$',r'$D_{\mathrm{systems}}$'] if cat is not None: labels = [cat] if sorted_cond: df_sorted = df_results.sort_values('d_total') df_plot = df_sorted else: df_plot = df_results fig, ax = plt.subplots(figsize=(8,6)) if idx is not None: values = [df_plot[label][idx] / 1e3 for label in labels] x = np.arange(len(labels)) colors = plt.cm.tab10.colors[:len(labels)] bars = ax.bar(x, values, color=colors, alpha=0.6) ax.set_xticks(x) ax.set_xticklabels(labels, rotation=45) ax.tick_params(axis='y', labelsize=13) ax.set_xlabel("Categories of damage [-]") ax.set_ylabel("d_total [10³€]", fontsize=13) ax.grid(True, axis='y', linestyle='--', alpha=0.7) else: combined = list(zip(labels, labels_latex)) combined_sorted = sorted(combined, key=lambda x: df_plot[x[0]].sum(), reverse=True) labels, labels_latex = zip(*combined_sorted) colors = plt.cm.tab10.colors[:len(labels)] x = np.arange(len(df_plot.index)) bottom = np.zeros(len(df_plot.index)) bars = [] for i, label in enumerate(labels): values = df_plot[label] / 100 bar = ax.bar(x, values, bottom=bottom, color=colors[i], label=labels[i], alpha=0.6) bars.append(bar) bottom += values ax.set_xticks([]) # pas de ticks sur x ax.set_ylabel("Damage [10³€]") ax.legend(loc='lower center', bbox_to_anchor=(0.5, 1.02), ncol=3, borderaxespad=0.) ax.grid(True, axis='y', linestyle='--', alpha=0.7) # Tooltip setup - gadget... mais sympa annot = ax.annotate("", xy=(0,0), xytext=(15,15), textcoords="offset points", bbox=dict(boxstyle="round", fc="w"), arrowprops=dict(arrowstyle="->")) annot.set_visible(False) def update_annot(bar, idx_bar): x_mid_data = (ax.get_xlim()[0] + ax.get_xlim()[1]) / 2 y_pos = bar.get_height() + bar.get_y() annot.xy = (bar.get_x() + bar.get_width() / 2, y_pos) code = df_plot['code'].iloc[idx_bar] annot.set_text(f"code: {code}") annot.get_bbox_patch().set_alpha(0.8) annot.set_fontsize(10) arrow_disp = ax.transData.transform(annot.xy) fig_width = fig.bbox.width x_text_disp = fig_width / 2 y_text_disp = arrow_disp[1] + 15 annot.set_position((x_text_disp - arrow_disp[0], y_text_disp - arrow_disp[1])) def hover(event): vis = annot.get_visible() for bar_group in bars: for i, bar in enumerate(bar_group): if bar.contains(event)[0]: update_annot(bar, i) annot.set_visible(True) fig.canvas.draw_idle() return if vis: annot.set_visible(False) fig.canvas.draw_idle() fig.canvas.mpl_connect("motion_notify_event", hover) fig.tight_layout() fig.show()
[docs] type_computation = "from_wolfsimu"
[docs] def computation_dfesults_forfolder(self, manager_inbe, type_computation, Ti_list, inflation, ifMAX= False): #changer nom "for folder" --> "for selected Ti" """ Process Excel files in the folder INPUT>CSVs matching the pattern 'table_wolfsimu_T<number>.xlsx', extracts and sorts the T identifiers numerically, computes results (INBE) for each file using computation_dfresults, and returns a dictionary of results keyed by each T identifier. :param manager_inbe: Object containing the path to the input CSV scenario folder. :type manager_inbe: Any :param type_computation: Parameter specifying the type of computation to perform. :type type_computation: Any :param inflation: Inflation parameter used in the computation. :type inflation: Any :ifMAX: Boolean to indicate if maximum damage computation for aversion :return: Dictionary with keys as T identifiers (e.g., 'T2') and values as computation results. :rtype: dict """ total_damage = None df_results = {} #dictionnaire dont la clé = les Ti dispo dans CSVs df_input = {} save_output_TEMP = [] save_output_defaultINPUT = [] for curT in Ti_list: if ifMAX: input_table = manager_inbe.IN_CSV_SCEN / ("table_wolfsimu_for_max.xlsx") else : input_table = manager_inbe.IN_CSV_SCEN / ("table_wolfsimu_" + curT + ".xlsx") logging.info(_(f"Computing damage for table_wolfsimu{curT}")) if not input_table.exists(): logging.error(_(f"Input table table_wolfsimu_{curT}.xlsx does not exist.")) continue df_results[curT], df_input[curT] = self.computation_dfresults(input_table, type_computation, inflation) output_path = Path(manager_inbe.OUT_SCEN_DIR) / f"individual_damage_{curT}.xlsx" if ifMAX: output_path = Path(manager_inbe.INDIV_MAX) #Dmax - pour (et si) aversion à calculer total_damage = df_results[curT]["d_total"].sum() try: with open(output_path, 'wb') as f: pass df_results[curT].to_excel(output_path, index=False) except PermissionError: logging.error(_(f"Permission denied: please ensure the file '{output_path}' is not open in another program.")) return output_path = Path(*output_path.parts[-5:]) save_output_TEMP.append(output_path) output_path = Path(manager_inbe.OUT_SCEN_DIR_IN) / f"input_with_default_data_{curT}.xlsx" if ifMAX: output_path = Path(manager_inbe.OUT_SCEN_DIR_IN) / f"input_with_default_for_max.xlsx" try: with open(output_path, 'wb') as f: pass df_input[curT].to_excel(output_path, index=False) except PermissionError: logging.error(_(f"Permission denied: please ensure the file '{output_path}' is not open in another program.")) return output_path = Path(*output_path.parts[-5:]) save_output_defaultINPUT.append(output_path) if ifMAX: break return df_results, save_output_TEMP, save_output_defaultINPUT, total_damage
[docs] def adding_INBEresults_toZones(self, manager_inbex, Zones_trix, Ti = None): cols = ["code", "d_cleaning", "d_removal", "d_non_stru", "d_structural", "d_finishing", "d_systems", "d_total"] cols_d = ["d_cleaning", "d_removal", "d_non_stru", "d_structural", "d_finishing", "d_systems", "d_total"] if Ti != None: path_to_indiv_damage = manager_inbex.OUT_SCEN_DIR / ("individual_damage_" + str(Ti) + ".xlsx") df_results = pd.read_excel(path_to_indiv_damage, usecols=cols) if Ti == None : Ti = None df_results = pd.read_excel(manager_inbex.OUT_COMB, usecols=cols) Zones_torasterize = Zones_trix for curzone in tqdm(Zones_torasterize.myzones): for curvector in curzone.myvectors: row_match = df_results[df_results['code'].astype(str) == str(curvector)]#donne bool if not row_match.empty: row = row_match.iloc[0] for key in cols_d: read_value = [] read_value.append(str(row[key])) arr = np.array(read_value, dtype=str) curzone.add_values(key=str(key), values=arr) return Zones_torasterize
[docs] def label_0_needed(self, Zones1): for curzone in tqdm(Zones1.myzones): cols_d = ["d_cleaning", "d_removal", "d_non_stru", "d_structural", "d_finishing", "d_systems", "d_total"] id_vals = [curzone.get_values(col) for col in cols_d] id_clean = [v[0] if isinstance(v, (list, np.ndarray)) else v for v in id_vals] if all(v is None for v in id_clean): for col in cols_d: curzone.add_values(key=col, values=np.array([0], dtype=int))
[docs] def raster_damage_in_zones(self, path_damage_out, path_onedanger, Zones_torasterize, scenario, Ti): self.label_0_needed(Zones_torasterize) WA_raster = WolfArray(path_onedanger) WA_raster.unmask WA_raster.array[WA_raster.array<=99999] = 99999 WA_raster.rasterize_zones_valuebyid(Zones_torasterize, id="d_total", method="nearest") if Ti != None: out_title = Path(path_damage_out) / f"inbe_{scenario}_{Ti}.tif" else : out_title = Path(path_damage_out) / ("raster_" + str(scenario) + ".tif") WA_raster.unmask WA_raster.nodata=99999 WA_raster.write_all(out_title) return logging.info(_(f"WolfArray written at {out_title}"))
[docs] def select_and_count_in_zones(self, Zones1, name): comptage = 0 Zones2=Zones() for curzone in Zones1.myzones: if str(curzone.get_values("NATUR_DESC")[0]) == str(name): comptage +=1 Zones2.add_zone(curzone, forceparent = True) #logging.info(_(f"Number of {name} : {comptage}")) return Zones2, comptage
[docs] def raster_auto(self, name_conditionnel, manager_inbe, which_result, Ti_list=None): #1. Lecture du PICC if which_result == "individual_damage": for Ti in Ti_list: Zones_tri,unused,unused = self.PICC_read(manager_inbe, name_conditionnel) logging.info(_(f"Raster creation for {Ti}.")) #3. """Calcul damage : individual and combined""" --> computed and saved in excel Zones_torasterize = self.adding_INBEresults_toZones(manager_inbe, Zones_tri, Ti = Ti) tif_files = list(manager_inbe.IN_SA_INTERP.glob("T*.tif")) if not tif_files: logging.error(_(r"Please provide the water depth input.")) return one_danger = tif_files[0] self.raster_damage_in_zones(manager_inbe.OUT_SCEN_DIR, one_danger, Zones_torasterize, manager_inbe.scenario, Ti) if which_result == "combined" : Zones_tri,unused,unused = self.PICC_read(manager_inbe, name_conditionnel) Zones_tri,unused = self.select_and_count_in_zones(Zones_tri, name_conditionnel) tif_files = list(manager_inbe.IN_SA_INTERP.glob("T*.tif")) if not tif_files: logging.error(_(r"Please provide the water depth input.")) return one_danger = tif_files[0] Zones_torasterize = self.adding_INBEresults_toZones(manager_inbe, Zones_tri, Ti = None) self.raster_damage_in_zones(manager_inbe.OUT_SCEN_DIR, one_danger, Zones_torasterize, manager_inbe.scenario, None)
[docs] def load_quant_sum_backup(self, path, quant): d_total = pd.read_excel(path)['d_total'] p75 = d_total.quantile(quant) return d_total[d_total <= p75].sum()
[docs] def load_quant_sum(self, path, quant): df = pd.read_excel(path) cols = ["d_total", "d_cleaning", "d_removal", "d_non_stru", "d_structural", "d_finishing", "d_systems"] results = {} for col in cols: series = df[col] p75 = series.quantile(quant) results[col] = series[series <= p75].sum() return results
[docs] def extract_scenario_label(self, path_str): p = Path(path_str) parts = p.parts label = None for i, part in enumerate(parts): if part == "TEMP" or part == "OUTPUT": if i + 1 < len(parts): label = parts[i + 2] break return label
[docs] def histo_total_from_list(self, list_path, quant, type=None): damages = [] labels = [] colors = [] base_color = (212/255, 101/255, 10/255) alt_colors = [ (119/255, 147/255, 60/255), (127/255, 127/255, 127/255), (91/255, 155/255, 213/255), (255/255, 192/255, 0/255), (255/255, 0/255, 255/255) ] for i, path in enumerate(list_path): res = self.load_quant_sum(path, quant) D_tot = res["d_total"] / 1000 damages.append(D_tot) comp = [res["d_cleaning"]/1000, res["d_removal"]/1000, res["d_non_stru"]/1000, res["d_structural"]/1000, res["d_finishing"]/1000, res["d_systems"]/1000] if i == 0: comps = [comp.copy()] else: comps.append(comp.copy()) if type == "Combined damage": p = Path(path) scenario = p.parent.parent.name.split("scenario_")[-1] filename = p.stem.replace("combined_damage_", "") labels.append(f"{scenario}_{filename}") else : labels.append(self.extract_scenario_label(path)) colors.append(base_color if i == 0 else alt_colors[(i - 1) % len(alt_colors)]) base_damage = damages[0] rel_percents = [0] if base_damage != 0 : for dmg in damages[1:]: rel = round(dmg / base_damage * 100 - 100,3) rel_percents.append(rel) else: logging.error(_(f"No % indicated, because the reference is equal to 0.")) labels2 = [str(lab).replace("scenario_", "") for lab in labels] bottom = [0]*len(damages) stack_colors = plt.cm.tab10.colors[:6] d_labels = ["d_cleaning", "d_removal", "d_non_stru", "d_structural", "d_finishing", "d_systems"] fig, ax = plt.subplots(figsize=(10, 6)) for j in range(len(comps[0])): vals = [c[j] for c in comps] ax.bar(labels2, vals, bottom=bottom, color=stack_colors[j]) bottom = [bottom[k] + vals[k] for k in range(len(vals))] ax.legend(d_labels, loc='upper center', bbox_to_anchor=(0.5, 1.4), ncol=3) ylim_max = max(damages) * 1.15 for i in range(1, len(damages)): if base_damage != 0: rel = rel_percents[i] label = f"+{rel}\%" if rel > 0 else f"{rel}\%" color_txt = 'red' if rel > 0 else 'green' ax.text(i, damages[i] + ylim_max * 0.02, label, ha='center', color=color_txt, fontsize=13) ax.set_ylabel('Total damage [k€]') ax.tick_params(axis='x', labelrotation=90) ax.grid(alpha=0.3) ax.set_ylim(0, ylim_max) fig.tight_layout() fig.show() return fig, ax
[docs] def NO_division_by_zero(self, numerator, denominator): if denominator == 0: return 0 else : return numerator / denominator
[docs] def analysis_table(self, instances, path_analysis, ref = "comb"): from openpyxl.styles import Font from openpyxl import load_workbook all_possible_Ti = sorted(instances[0].get_list_individual_T(instances[0]), key=int) skeleton = [] for Ti in all_possible_Ti: skeleton.append([f"T{Ti}", "Sum D"]) #skeleton.append(["", "Sum Di/Si"]) #skeleton.append(["", "SumD/SumS"]) skeleton.append(["", "Nb"]) #skeleton.append(["", "SumS"]) skeleton.append(["EAD", "Sum D"]) analysis = pd.DataFrame(skeleton, columns=["Return periods", "Metric"]) ref_values = [] path_combD = instances[0].OUT_COMB df0 = pd.read_excel(path_combD) SumcombD = df0["d_total"].sum() percentage_ref = [] for i, inst in enumerate(instances): j=0 #indice utile pour compter les lignes (structure skeleton imposée, pas simple de savoir où on en est quand on calcule les %, ceci fonctionne) sa = inst.Study_area sc = inst.scenario col_name = f"{sa}/{sc}" logging.info(f"Computing column {i} : {sa}/{sc}.") values = [] path_combD_i = inst.OUT_COMB if path_combD_i.exists(): df_comD_i = pd.read_excel(path_combD_i) SumcombD_i = df_comD_i["d_total"].sum() else: logging.error(f"Missing COMB file for inst={sa}/{sc}, leaving cell empty.") SumcombD_i = None for Ti in all_possible_Ti: path_indivD = inst.OUT_SCEN_DIR / f"individual_damage_T{Ti}.xlsx" path_defaultI = inst.OUT_SCEN_DIR_IN / f"input_with_default_data_T{Ti}.xlsx" if not path_indivD.exists() or not path_defaultI.exists(): logging.error(f"Missing file for Ti={Ti}, inst={sa}/{sc}, leaving cells empty.") #values.extend([None, None, None, None, None])#si plus d'arg values.extend([None, None]) continue df1 = pd.read_excel(path_indivD) df2 = pd.read_excel(path_defaultI) merged = pd.merge(df1, df2, on="code", how="inner") sumD = merged["d_total"].sum() nb = len(merged) #SumS = merged["FA"].sum()#si plus d'arg #sum_Di_by_Si = (merged["d_total"] / merged["FA"]).sum()#si plus d'arg #sumD_by_SumS = self.NO_division_by_zero(sumD, SumS)#si plus d'arg #values.extend([sumD, sum_Di_by_Si, sumD_by_SumS, nb, SumS])#si plus d'arg values.extend([sumD, nb]) values.append(SumcombD_i) if len(values) != len(analysis): logging.warning(f"Length mismatch for {col_name}: {len(values)} vs {len(analysis)}. Filling missing with None.") while len(values) < len(analysis): values.append(None) analysis[col_name] = values if ref == "comb": #ref_values_with_comb = [SumcombD, 0, 0, 0, 0]*len(all_possible_Ti) + [SumcombD] #si plus d'arg ref_values_with_comb = [SumcombD, 0]*len(all_possible_Ti) + [SumcombD] comp_values = [] delta_values = [] for v, r in zip(values, ref_values_with_comb): #Le % if v is None or r == 0: comp_values.append(None) delta_values.append(None) else: val_percentage=round((v / r) * 100, 1) comp_values.append(val_percentage) if i == 0 : #Si 1e instance, alors c'est la ref percentage_ref.append(val_percentage) #La comparaison à la ref % - % #jbis = int(j/len([SumcombD, 0, 0, 0, 0])) #si plus d'arg jbis = int(j/len([SumcombD, 0])) delta_values.append(percentage_ref[jbis] - val_percentage) j+=1 comp_col_name = f"{col_name} % vs ref" idx = analysis.columns.get_loc(col_name) + 1 analysis.insert(idx, comp_col_name, comp_values) empty_col_name = f"DELTA {col_name}" analysis.insert(idx + 1, empty_col_name, delta_values) else: if i == 0: ref_values = values.copy() logging.error(_("First column taken as reference.")) else: comp_values = [] for v, r in zip(values, ref_values): if v is None or r == 0: comp_values.append(None) else: comp_values.append(round((v / r - 1) * 100, 3)) comp_col_name = f"{col_name} % vs ref" idx = analysis.columns.get_loc(col_name) + 1 analysis.insert(idx, comp_col_name, comp_values) try: analysis.to_excel(path_analysis, index=False) except PermissionError as e: logging.error("permission denied : please check if the file is closed or used in another program") return False, str(e) wb = load_workbook(path_analysis) ws = wb.active for col in ws.iter_cols(min_col=3, max_col=ws.max_column): if "% vs ref" in str(col[0].value): for cell in col[1:]: if cell.value is not None: if cell.value > 0: cell.font = Font(color="FF0000") # vert elif cell.value < 0: cell.font = Font(color="008000") # rouge try: wb.save(path_analysis) logging.info(f"Table analysis with % comparison and color created in {path_analysis}.") return True, "" except PermissionError as e: logging.error("permission denied : please check if the file is closed or used in another program") return False, str(e)
[docs] def scatter_INBE_dtotal(out, manager1, manager2, max_val=None, Ti=None, quant=None): #...2 : baseline (...1 is the scenario we compare to ...2) if Ti != None: path2 = Path(manager1.OUT_SCEN_DIR) / f"individual_damage_T{Ti}.xlsx" path1 = Path(manager2.OUT_SCEN_DIR) / f"individual_damage_T{Ti}.xlsx" title = rf"{Ti}-year flood" else : path2 = Path(manager1.OUT_COMB) path1 = Path(manager2.OUT_COMB) title = r"Combined damage" df1 = pd.read_excel(path1) df2 = pd.read_excel(path2) df1 = df1[['code', 'd_total']].copy() df2 = df2[['code', 'd_total']].copy() df1.rename(columns={'d_total': 'd_total_sc1'}, inplace=True) df2.rename(columns={'d_total': 'd_total_sc2'}, inplace=True) merged = pd.merge(df1, df2, on='code', how='outer') if quant != None : percentile_sc1 = merged['d_total_sc1'].quantile(quant) percentile_sc2 = merged['d_total_sc2'].quantile(quant) filtered = merged[(merged['d_total_sc1'] <= percentile_sc1) | (merged['d_total_sc2'] <= percentile_sc2)] merged=filtered #ENSEMBLES common = merged[ merged['d_total_sc1'].notna() & merged['d_total_sc2'].notna() ] only_sc1 = merged[ merged['d_total_sc1'].notna() & merged['d_total_sc2'].isna() ] only_sc2 = merged[ merged['d_total_sc2'].notna() & merged['d_total_sc1'].isna() ] only_sc1_filled = only_sc1.copy() only_sc1_filled['d_total_sc2'] = 0 only_sc2_filled = only_sc2.copy() only_sc2_filled['d_total_sc1'] = 0 fig, ax = plt.subplots(figsize=(8, 8)) ax.scatter(common['d_total_sc2'], common['d_total_sc1'], marker = 'x', color='black') ax.scatter(only_sc1_filled['d_total_sc2'], only_sc1_filled['d_total_sc1'], marker = 'x', color='black') ax.scatter(only_sc2_filled['d_total_sc2'], only_sc2_filled['d_total_sc1'], marker = 'x', color='black') label2 = manager1.scenario label1 = manager2.scenario if max_val ==None : max_val = merged[['d_total_sc1', 'd_total_sc2']].max().max() xlabel = rf"Total damage [€] for {label2}" ylabel = rf"Total damage [€] for {label1}" ax.plot([0, max_val], [0, max_val], '--', color='grey') ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.set_title(title, loc='left') ax.grid(True) fig.tight_layout() fig.show() return fig, ax