Source code for wolfhece.mockup_spw_xlsx

"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2026

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

"""
This module provides a mockup of the SPW XLSX file for testing purposes.
"""

import json
from pathlib import Path

import numpy as np
import pandas as pd
from enum import Enum

from dataclasses import dataclass, field

import logging

import pint

from .ins import Localities
from .irm_qdf import Qdf_IRM, MontanaIRM
from .PyTranslate import _
from . import UNITS

@dataclass
[docs] class LandUseType:
[docs] name: str
[docs] runoff_coefficient: float
[docs] description: str = field(default="")
[docs] class LandUse(Enum):
[docs] FORESTS = LandUseType(name=_("Forest"), description="Forêts et bois", runoff_coefficient=0.05)
[docs] GRASSLANDS = LandUseType(name=_("Grassland"), description="Prairies, jardins et zones enherbées", runoff_coefficient=0.15)
[docs] CULTIVATED_FIELDS = LandUseType(name=_("Fields"), description="Champs cultivés, landes et broussailles", runoff_coefficient=0.25)
[docs] GREEN_ROOFS = LandUseType(name=_("Green Roofs"), description="Dalles de gazon et toitures vertes", runoff_coefficient=0.4)
[docs] BARE_SOIL = LandUseType(name=_("Bare Soil"), description="Terres battues et toitures vertes", runoff_coefficient=0.5)
[docs] DRAINED_PAVING = LandUseType(name=_("Drained Paving"), description="Pavés à joints écartés et pavés drainants", runoff_coefficient=0.7)
[docs] IMPERVIOUS_SURFACES = LandUseType(name=_("Impervious Surfaces"), description="Allées pavées, trottoirs pavés et parkings", runoff_coefficient=0.9)
[docs] WATER_AND_ROOFS = LandUseType(name=_("Water and Roofs"), description="Toitures, routes et plans d'eau", runoff_coefficient=1)
@classmethod
[docs] def print_summary(cls): """Print a formatted table of all land-use types with their runoff coefficients.""" header = f"{'Name':<25} {'Description':<55} {'C [-]':>6}" print(header) print("-" * len(header)) for member in cls: lu = member.value print(f"{lu.name:<25} {lu.description:<55} {lu.runoff_coefficient:>6.2f}")
[docs] class MockupSPWXLSX: """Simplified stormwater retention basin sizing tool following the SPW methodology. This class computes the required retention capacity for a given site by combining IRM rainfall data (QDF / Montana curves), land-use characteristics and infiltration parameters. Main features ------------- - Management of sub-areas by land-use type (``LandUse``), each weighted by a runoff coefficient. - Chicago hyetograph computation from IRM curves for a given return period. - Retention capacity estimation accounting for infiltration (Darcy's law) and the admissible outflow to the drainage network. - Design rainfall duration and basin emptying time, computed alongside the retention capacity and exposed as read-only properties (``design_rainfall_duration``, ``emptying_time``). - Full import/export to JSON (``to_json`` / ``from_json``). Architecture and performance ---------------------------- - QDF data is cached at the class level (shared across instances); hyetographs are cached per return period at the instance level. The ``total_area`` and ``total_area_ponderated`` properties use a dirty-flag mechanism to avoid unnecessary recomputation during sensitivity analyses. - All physical quantities are handled via ``pint.Quantity``, so unit conversions are automatic and verified. Usage example ------------- >>> from wolfhece.mockup_spw_xlsx import MockupSPWXLSX, UNITS, LandUse >>> m = MockupSPWXLSX() >>> m.set_locality("Liège") >>> m.infiltration_area = 100 * UNITS.meter**2 >>> m.infiltration_darcy = 1e-5 * UNITS.meter / UNITS.second >>> m.admissible_outflow = 5.0 # L/s/ha by default >>> m.add_area("parking", LandUse.IMPERVIOUS_SURFACES, 500) >>> m.add_area("garden", LandUse.GRASSLANDS, 0.1 * UNITS.hectare) >>> vol = m.get_retention_capacity(return_period=25) # -> pint.Quantity in m³ >>> m.design_rainfall_duration.to('hour') # design storm duration >>> m.emptying_time.to('hour') # basin emptying time >>> m.to_json("my_project.json") # save >>> m2 = MockupSPWXLSX.from_json("my_project.json") # reload """
[docs] _localities = Localities('2018')
[docs] _qdf_cache: dict[int, 'Qdf_IRM'] = {} # Class-level cache: NSI -> Qdf_IRM
def __init__(self):
[docs] self._locality_nsi: int = 0 # NSI of the locality
[docs] self._qdf_irm = None # QDF IRM data for the locality
[docs] self.areas: dict[str, tuple[LandUse, pint.Quantity]] = {}
[docs] self.reference_area: pint.Quantity = 0 * UNITS.meter**2 # Reference area
[docs] self._infiltration_area: pint.Quantity = 0 * UNITS.meter**2 # Infiltration area
[docs] self._infiltration_darcy: pint.Quantity = 0 * UNITS.meter / UNITS.second # Infiltration Darcy velocity
[docs] self._infiltration_security_factor: float = 2.0 # Infiltration security factor (dimensionless)
[docs] self._admissible_outflow: pint.Quantity = 5 * UNITS.liter / UNITS.second / UNITS.hectare # Relative outflow in L/s/ha
[docs] self._climate_delta_t: float = 0.0 # Climate change: temperature increase [°C]
[docs] self._climate_rate: float = 0.07 # Rainfall increase rate per °C (7 %/°C)
[docs] self._areas_dirty: bool = True # Flag to indicate if area caches need recomputation
[docs] self._cached_total_area: pint.Quantity | None = None
[docs] self._cached_total_area_ponderated: pint.Quantity | None = None
[docs] self._rainfall_cache: dict[int, tuple[np.ndarray, np.ndarray]] = {} # return_period -> (time, intensity)
[docs] self._design_rainfall_duration: pint.Quantity | None = None # Duration of the design rainfall event
[docs] self._emptying_time: pint.Quantity | None = None # Time to empty the retention basin
# Last computation results (populated by get_retention_capacity)
[docs] self._last_return_period: int | None = None
[docs] self._last_retention_capacity: pint.Quantity | None = None
[docs] self._last_time: np.ndarray | None = None # time array [min]
[docs] self._last_rainfall: np.ndarray | None = None # rainfall intensity [mm/h]
[docs] self._last_net_rainfall: pint.Quantity | None = None # net inflow [m³/s]
[docs] self._last_total_outflow: pint.Quantity | None = None # total outflow [m³/s]
[docs] self._last_stored: pint.Quantity | None = None # cumulative stored volume [m³]
[docs] def _invalidate_area_cache(self): """ Mark area-derived caches as stale. """ self._areas_dirty = True self._cached_total_area = None self._cached_total_area_ponderated = None
[docs] def add_area(self, key:str, land_use: LandUseType | str, area: float | pint.Quantity): """ Add an area with its land use type and area in square meters. :param key: A unique identifier for the area (e.g., "Area1", "Area2", etc.) :param land_use: The land use type of the area (e.g., LandUse.FORESTS.value) :param area: The area in square meters or a pint.Quantity representing the area. """ if isinstance(area, (int, float)): area = area * UNITS.meter**2 if isinstance(land_use, str): try: land_use = LandUse[land_use.upper()].value except KeyError: raise ValueError(f"Invalid land use type: {land_use}") # check if the key already exists if key in self.areas: raise ValueError(f"Area with key '{key}' already exists. Please use a unique key for each area.") self.areas[key] = (land_use, area) self._invalidate_area_cache()
[docs] def update_area(self, key:str, land_use: LandUseType | str = None, area: float | pint.Quantity = None): """ Update an existing area with a new land use type and/or area in square meters. :param key: The unique identifier for the area to update (e.g., "Area1", "Area2", etc.) :param land_use: The new land use type of the area (e.g., LandUse.FORESTS.value). If None, the land use will not be updated. :param area: The new area in square meters or a pint.Quantity representing the area. If None, the area will not be updated. """ if key not in self.areas: raise ValueError(f"Area with key '{key}' does not exist. Please add the area before updating it.") current_land_use, current_area = self.areas[key] if land_use is not None: if isinstance(land_use, str): try: land_use = LandUse[land_use.upper()].value except KeyError: raise ValueError(f"Invalid land use type: {land_use}") current_land_use = land_use if area is not None: if isinstance(area, (int, float)): area = area * UNITS.meter**2 current_area = area.to(UNITS.meter**2) self.areas[key] = (current_land_use, current_area) self._invalidate_area_cache()
@property
[docs] def total_area(self) -> pint.Quantity: """ Calculate the total area by summing up all areas in the mockup. Result is cached and invalidated when areas are added or updated. """ if self._cached_total_area is None: total = sum(area for __, area in self.areas.values()) self._cached_total_area = total.to(UNITS.meter**2) return self._cached_total_area
[docs] def area_by_land_use(self, land_use: LandUseType) -> pint.Quantity: """ Calculate the total area for a specific land use type. """ total = sum(area for lu, area in self.areas.values() if lu == land_use) return total.to(UNITS.meter**2)
@property
[docs] def total_area_ponderated(self) -> pint.Quantity: """ Calculate the total area weighted by the runoff coefficients. Result is cached and invalidated when areas are added or updated. """ if self._cached_total_area_ponderated is None: total = sum(area * lu.value.runoff_coefficient for lu, area in self.areas.values()) self._cached_total_area_ponderated = total.to(UNITS.meter**2) return self._cached_total_area_ponderated
[docs] def set_reference_area(self, area: float | pint.Quantity): """ Set the reference area for the mockup. """ if isinstance(area, (int, float)): area = area * UNITS.meter**2 self.reference_area = area.to(UNITS.meter**2)
[docs] def set_locality(self, nsi_or_name: int | str): """ Set the locality NSI for the mockup. """ if isinstance(nsi_or_name, str): nsi = self._localities.get_INSfromname(nsi_or_name) else: nsi = nsi_or_name if nsi not in self._localities.inscode2name: raise ValueError(f"NSI {nsi} is not valid. Available NSIs: {list(self._localities.inscode2name.keys())}") self._locality_nsi = nsi self._qdf_irm = None # Reset QDF data for the new locality self._rainfall_cache.clear() # Clear cached rainfall data
@property
[docs] def locality_name(self) -> str: """ Get the name of the locality based on the NSI. """ return self._localities.get_namefromINS(self._locality_nsi)
@property
[docs] def qdf_irm(self): """ Get the QDF IRM data for the locality. Uses a class-level cache keyed by NSI to avoid reloading the same data across multiple instances. """ if self._qdf_irm is None: nsi = self._locality_nsi if nsi not in MockupSPWXLSX._qdf_cache: MockupSPWXLSX._qdf_cache[nsi] = Qdf_IRM.make(nsi) self._qdf_irm = MockupSPWXLSX._qdf_cache[nsi] return self._qdf_irm
[docs] def get_rainfall(self, return_period: int = 25) -> tuple[np.ndarray, np.ndarray]: """ Get the rainfall depth for a given return period based on the QDF IRM data. Results are cached per return period for repeated calls. :param return_period: The return period in years (e.g., 10, 25, 30, 100). """ if self.qdf_irm is None: raise ValueError("QDF IRM data is not available for the locality.") if return_period not in self._rainfall_cache: time, intensity = self.qdf_irm.montana.get_hyetogram_Chicago(48*60, return_period) self._rainfall_cache[return_period] = (time, intensity) return self._rainfall_cache[return_period]
@property
[docs] def infiltration_area(self) -> pint.Quantity: """ Get the infiltration area in m^2. """ return self._infiltration_area.to(UNITS.meter**2)
@infiltration_area.setter def infiltration_area(self, value: float | pint.Quantity): """ Set the infiltration area in m^2. """ if isinstance(value, (int, float)): value = value * UNITS.meter**2 self._infiltration_area = value.to(UNITS.meter**2) @property
[docs] def infiltration_darcy(self) -> pint.Quantity: """ Get the infiltration Darcy velocity in m/s. """ return self._infiltration_darcy.to(UNITS.meter / UNITS.second)
@infiltration_darcy.setter def infiltration_darcy(self, value: float | pint.Quantity): """ Set the infiltration Darcy velocity in m/s. """ if isinstance(value, (int, float)): value = value * UNITS.meter / UNITS.second self._infiltration_darcy = value.to(UNITS.meter / UNITS.second) @property
[docs] def climate_delta_t(self) -> float: """ Temperature increase for climate change scenario [°C]. """ return self._climate_delta_t
@climate_delta_t.setter def climate_delta_t(self, value: float): """ Set the temperature increase [°C]. """ self._climate_delta_t = float(value) self._rainfall_cache.clear() # cached hyetographs no longer valid @property
[docs] def climate_factor(self) -> float: """ Multiplicative climate change factor on rainfall intensity. Computed as ``1 + climate_rate * climate_delta_t`` where *climate_rate* defaults to 0.07 (7 %/°C, Belgian convention). """ return 1.0 + self._climate_rate * self._climate_delta_t
@property
[docs] def admissible_outflow(self) -> pint.Quantity: """ Get the admissible outflow in L/s/ha. """ return self._admissible_outflow.to(UNITS.liter / UNITS.second / UNITS.hectare)
@admissible_outflow.setter def admissible_outflow(self, value: float | pint.Quantity): """ Set the admissible outflow in L/s/ha. """ if isinstance(value, (int, float)): value = value * UNITS.liter / UNITS.second / UNITS.hectare self._admissible_outflow = value.to(UNITS.liter / UNITS.second / UNITS.hectare)
[docs] def get_retention_capacity(self, return_period: int = 25) -> pint.Quantity: """ Calculate the retention capacity based on the infiltration area and Darcy velocity. Also computes and stores the design rainfall duration and the emptying time, accessible via the ``design_rainfall_duration`` and ``emptying_time`` properties. :param return_period: The return period in years (e.g., 10, 25, 30, 100).""" time, rainfall = self.get_rainfall(return_period) dt = 300 * UNITS.second # time step of the hyetogram outflow_infil = self._infiltration_area * self._infiltration_darcy / self._infiltration_security_factor outfflow_network = self._admissible_outflow * self.total_area total_outflow = (outflow_infil + outfflow_network).to(UNITS.meter**3 / UNITS.second) # rainfall is in mm/h, apply climate factor, convert to m/s rainfall_m_s = (rainfall * self.climate_factor * UNITS.mm / UNITS.hour).to(UNITS.meter / UNITS.second) # Total net rainfall, convert to m^3/s net_rainfall = (rainfall_m_s * self.total_area_ponderated).to(UNITS.meter**3 / UNITS.second) # Retention capacity is the difference between outflow and net rainfall stored_rainfall = net_rainfall - total_outflow # Indices where water must be stored positive_mask = stored_rainfall.magnitude > 0 # Sum the positive values of stored rainfall to get the total retention capacity retention_capacity = stored_rainfall[positive_mask].sum() * dt # Design rainfall duration: time span during which storage occurs if np.any(positive_mask): filling_indices = np.where(positive_mask)[0] n_steps = filling_indices[-1] - filling_indices[0] + 1 self._design_rainfall_duration = (n_steps * dt).to(UNITS.hour) else: self._design_rainfall_duration = 0 * UNITS.hour # Emptying time: volume / total outflow if total_outflow.magnitude > 0: self._emptying_time = (retention_capacity / total_outflow).to(UNITS.hour) else: self._emptying_time = float('inf') * UNITS.hour # Store intermediate results for report / plots self._last_return_period = return_period self._last_retention_capacity = retention_capacity.to(UNITS.meter**3) self._last_time = time self._last_rainfall = rainfall * self.climate_factor self._last_net_rainfall = net_rainfall self._last_total_outflow = total_outflow # Basin volume over time (filling AND emptying, clamped to >= 0) dv = stored_rainfall.magnitude * dt.magnitude # volume increment per step [m³] volume_curve = np.empty_like(dv) v = 0.0 for i in range(len(dv)): v = max(0.0, v + dv[i]) volume_curve[i] = v self._last_stored = volume_curve * UNITS.meter**3 return self._last_retention_capacity
@property
[docs] def design_rainfall_duration(self) -> pint.Quantity: """ Duration of the design rainfall event (filling period). Available after calling ``get_retention_capacity()``. """ if self._design_rainfall_duration is None: raise RuntimeError("Call get_retention_capacity() first.") return self._design_rainfall_duration
@property
[docs] def emptying_time(self) -> pint.Quantity: """ Time required to fully empty the retention basin via infiltration and network outflow. Available after calling ``get_retention_capacity()``. """ if self._emptying_time is None: raise RuntimeError("Call get_retention_capacity() first.") return self._emptying_time
# ------------------------------------------------------------------ # Report # ------------------------------------------------------------------
[docs] def generate_report(self) -> str: """Return a multi-line text report summarising the sizing calculation. The report includes: - Locality information - Land-use area breakdown (table) - Infiltration / outflow parameters - Retention capacity, design rainfall duration, emptying time :raises RuntimeError: if ``get_retention_capacity()`` has not been called yet. """ if self._last_retention_capacity is None: raise RuntimeError("Call get_retention_capacity() first.") sep = "=" * 70 lines: list[str] = [] lines.append(sep) lines.append(" STORMWATER RETENTION BASIN — SIZING REPORT (SPW methodology)") lines.append(sep) lines.append("") # --- Locality --- lines.append(f"Locality : {self.locality_name} (NSI {self._locality_nsi})") lines.append(f"Return period : {self._last_return_period} years") lines.append("") # --- Land-use table --- lines.append("-" * 70) lines.append(f"{'Key':<15} {'Land use':<22} {'C [-]':>6} {'Area [m²]':>12}") lines.append("-" * 70) for key, (lu, area) in self.areas.items(): lu_type = lu.value if isinstance(lu, LandUse) else lu lines.append( f"{key:<15} {lu_type.name:<22} {lu_type.runoff_coefficient:>6.2f} " f"{area.to(UNITS.meter**2).magnitude:>12.1f}" ) lines.append("-" * 70) total_a = self.total_area.to(UNITS.meter**2).magnitude total_ap = self.total_area_ponderated.to(UNITS.meter**2).magnitude c_mean = total_ap / total_a if total_a > 0 else 0.0 lines.append(f"{'Total':<15} {'':22} {c_mean:>6.2f} {total_a:>12.1f}") lines.append(f"{'Weighted area':<15} {'':22} {'':>6} {total_ap:>12.1f}") lines.append("") # --- Infiltration / outflow parameters --- lines.append("-" * 70) lines.append("Infiltration & outflow parameters") lines.append("-" * 70) infil_a = self._infiltration_area.to(UNITS.meter**2).magnitude infil_k = self._infiltration_darcy.to(UNITS.meter / UNITS.second).magnitude infil_k_mmh = self._infiltration_darcy.to(UNITS.mm / UNITS.hour).magnitude q_infil = (self._infiltration_area * self._infiltration_darcy / self._infiltration_security_factor).to(UNITS.liter / UNITS.second).magnitude q_net = self._admissible_outflow.to(UNITS.liter / UNITS.second / UNITS.hectare).magnitude q_net_abs = (self._admissible_outflow * self.total_area).to(UNITS.liter / UNITS.second).magnitude lines.append(f" Infiltration area : {infil_a:.1f} m²") lines.append(f" Darcy velocity : {infil_k:.2e} m/s ({infil_k_mmh:.2f} mm/h)") lines.append(f" Security factor : {self._infiltration_security_factor:.1f}") lines.append(f" Infiltration outflow : {q_infil:.4f} L/s") lines.append(f" Admissible outflow : {q_net:.2f} L/s/ha ({q_net_abs:.4f} L/s)") lines.append(f" Total outflow : {q_infil + q_net_abs:.4f} L/s") lines.append("") if self._climate_delta_t != 0.0: lines.append(f" Climate change ΔT : +{self._climate_delta_t:.1f} °C") lines.append(f" Rainfall increase rate : {self._climate_rate*100:.0f} %/°C") lines.append(f" Climate factor : {self.climate_factor:.2f}{self.climate_factor:.2f} on rainfall)") lines.append("") # --- Results --- lines.append(sep) lines.append(" RESULTS") lines.append(sep) vol = self._last_retention_capacity.to(UNITS.meter**3).magnitude dur = self._design_rainfall_duration.to(UNITS.hour).magnitude emp = self._emptying_time.to(UNITS.hour).magnitude lines.append(f" Retention capacity : {vol:.2f} m³") lines.append(f" Design rainfall duration : {dur:.2f} h") lines.append(f" Emptying time : {emp:.2f} h") if total_a > 0: lines.append(f" Specific volume : {vol / total_a * 1000:.1f} L/m²") lines.append(sep) lines.append("") return "\n".join(lines)
# ------------------------------------------------------------------ # Plots (matplotlib) # ------------------------------------------------------------------
[docs] def plot_hyetograph(self, ax=None, show: bool = True): """Plot the Chicago hyetograph used for the last sizing. :param ax: Optional matplotlib Axes. Created if *None*. :param show: Call ``plt.show()`` at the end (ignored when *ax* is provided). :returns: The matplotlib Axes object. """ import matplotlib.pyplot as plt if self._last_time is None: raise RuntimeError("Call get_retention_capacity() first.") created = ax is None if created: fig, ax = plt.subplots(figsize=(10, 4)) from matplotlib.ticker import MultipleLocator time_h = self._last_time / 60.0 # min -> h ax.bar(time_h, self._last_rainfall, width=(time_h[1] - time_h[0]) * 0.9, color="steelblue", edgecolor="navy", linewidth=0.3, label="Rainfall") ax.set_xlabel("Time [h]") ax.set_ylabel("Intensity [mm/h]") ax.set_title(f"Chicago hyetograph — T = {self._last_return_period} years") ax.xaxis.set_major_locator(MultipleLocator(6)) ax.xaxis.set_minor_locator(MultipleLocator(1)) ax.legend(loc="upper right") ax.grid(True, which="major", linestyle="--", alpha=0.5) ax.grid(True, which="minor", linestyle=":", alpha=0.3) if created and show: plt.tight_layout() plt.show() return ax
[docs] def plot_volume_balance(self, ax=None, show: bool = True): """Plot inflow, outflow and cumulative stored volume over time. Two y-axes are used: left for flow rates [L/s], right for cumulative stored volume [m³]. :param ax: Optional matplotlib Axes. Created if *None*. :param show: Call ``plt.show()`` at the end (ignored when *ax* is provided). :returns: The matplotlib Axes object. """ import matplotlib.pyplot as plt if self._last_time is None: raise RuntimeError("Call get_retention_capacity() first.") created = ax is None if created: fig, ax = plt.subplots(figsize=(10, 5)) from matplotlib.ticker import MultipleLocator time_h = self._last_time / 60.0 # min -> h inflow_ls = self._last_net_rainfall.to(UNITS.liter / UNITS.second).magnitude outflow_ls = np.full_like(time_h, self._last_total_outflow.to(UNITS.liter / UNITS.second).magnitude) stored_m3 = self._last_stored.to(UNITS.meter**3).magnitude ax.plot(time_h, inflow_ls, color="royalblue", linewidth=1.2, label="Net inflow") ax.plot(time_h, outflow_ls, color="orangered", linewidth=1.2, linestyle="--", label="Total outflow") ax.set_xlabel("Time [h]") ax.set_ylabel("Flow rate [L/s]") ax.set_title(f"Volume balance — T = {self._last_return_period} years") ax.xaxis.set_major_locator(MultipleLocator(6)) ax.xaxis.set_minor_locator(MultipleLocator(1)) ax.legend(loc="upper left") ax.grid(True, which="major", linestyle="--", alpha=0.5) ax.grid(True, which="minor", linestyle=":", alpha=0.3) ax2 = ax.twinx() ax2.fill_between(time_h, stored_m3, color="lightgreen", alpha=0.5, label="Stored volume") ax2.plot(time_h, stored_m3, color="green", linewidth=0.8) ax2.set_ylabel("Stored volume [m³]") ax2.legend(loc="upper right") if created and show: plt.tight_layout() plt.show() return ax
[docs] def plot_land_use(self, ax=None, show: bool = True): """Pie chart of the area distribution by land-use type. :param ax: Optional matplotlib Axes. Created if *None*. :param show: Call ``plt.show()`` at the end (ignored when *ax* is provided). :returns: The matplotlib Axes object. """ import matplotlib.pyplot as plt created = ax is None if created: fig, ax = plt.subplots(figsize=(7, 7)) labels = [] sizes = [] for key, (lu, area) in self.areas.items(): lu_type = lu.value if isinstance(lu, LandUse) else lu labels.append(f"{key}\n({lu_type.name}, C={lu_type.runoff_coefficient})") sizes.append(area.to(UNITS.meter**2).magnitude) colors = plt.cm.Set3(np.linspace(0, 1, len(labels))) wedges, texts, autotexts = ax.pie( sizes, labels=labels, colors=colors, autopct="%1.1f%%", startangle=140, pctdistance=0.8) for t in autotexts: t.set_fontsize(8) ax.set_title("Land-use area distribution") if created and show: plt.tight_layout() plt.show() return ax
[docs] def plot_summary(self, show: bool = True): """Generate a composite figure with the three main plots. Layout: hyetograph (top-left), volume balance (top-right), land-use pie (bottom-centre). :param show: Call ``plt.show()`` at the end. :returns: The matplotlib Figure object. """ import matplotlib.pyplot as plt if self._last_time is None: raise RuntimeError("Call get_retention_capacity() first.") fig = plt.figure(figsize=(14, 10)) ax1 = fig.add_subplot(2, 2, 1) ax2 = fig.add_subplot(2, 2, 2) ax3 = fig.add_subplot(2, 2, (3, 4)) self.plot_hyetograph(ax=ax1, show=False) self.plot_volume_balance(ax=ax2, show=False) self.plot_land_use(ax=ax3, show=False) fig.suptitle( f"Retention basin sizing — {self.locality_name} — " f"T = {self._last_return_period} yr — " f"V = {self._last_retention_capacity.to(UNITS.meter**3).magnitude:.1f} m³", fontsize=13, fontweight="bold", ) fig.tight_layout(rect=[0, 0, 1, 0.95]) if show: plt.show() return fig
# ------------------------------------------------------------------ # Sensitivity analysis # ------------------------------------------------------------------ # Mapping: parameter name -> (attribute, default unit label, axis label, log scale)
[docs] _SENSITIVITY_PARAMS: dict[str, tuple[str, str, str, bool]] = { "admissible_outflow": ( "_admissible_outflow", "liter / second / hectare", "Débit de fuite admissible [L/s/ha]", False, ), "infiltration_darcy": ( "_infiltration_darcy", "meter / second", "Conductivité hydraulique [m/s]", True, ), "infiltration_area": ( "_infiltration_area", "meter ** 2", "Surface d'infiltration [m²]", False, ), "infiltration_security_factor": ( "_infiltration_security_factor", None, # dimensionless "Facteur de sécurité [-]", False, ), "return_period": ( None, # handled specially None, "Période de retour [ans]", False, ), "climate_delta_t": ( "_climate_delta_t", None, # dimensionless "Réchauffement climatique ΔT [°C]", False, ), }
[docs] def run_sensitivity( self, parameter: str, values, return_period: int = 25, ) -> pd.DataFrame: """Run a sensitivity analysis by sweeping *parameter* over *values*. Supported parameters: ``"admissible_outflow"``, ``"infiltration_darcy"``, ``"infiltration_area"``, ``"infiltration_security_factor"``, ``"return_period"``. The method restores the original value of the parameter after the sweep. :param parameter: Name of the parameter to vary. :param values: Iterable of values to test. For physical quantities, give plain floats in the parameter's default unit (L/s/ha, m/s, m²) **or** ``pint.Quantity`` objects. :param return_period: Return period used for each run (ignored when *parameter* is ``"return_period"``). :returns: A ``pandas.DataFrame`` with columns ``parameter``, ``volume_m3``, ``duration_h``, ``emptying_h``. """ if parameter not in self._SENSITIVITY_PARAMS: raise ValueError( f"Unknown parameter '{parameter}'. " f"Choose from: {', '.join(self._SENSITIVITY_PARAMS)}" ) attr, default_unit, _, _ = self._SENSITIVITY_PARAMS[parameter] # Save original value if parameter == "return_period": original = None # nothing to restore elif default_unit is None: original = getattr(self, attr) else: original = getattr(self, attr).copy() results = [] for v in values: if parameter == "return_period": rp = int(v) else: rp = return_period if default_unit is None: # dimensionless setattr(self, attr, float(v)) else: if isinstance(v, (int, float)): v = v * UNITS.parse_expression(default_unit) setattr(self, attr, v.to(UNITS.parse_expression(default_unit))) vol = self.get_retention_capacity(return_period=rp) results.append({ "parameter": float(v) if not isinstance(v, pint.Quantity) else v.magnitude, "volume_m3": vol.to(UNITS.meter**3).magnitude, "duration_h": self._design_rainfall_duration.to(UNITS.hour).magnitude, "emptying_h": self._emptying_time.to(UNITS.hour).magnitude, }) # Restore original value if parameter != "return_period": if default_unit is None: setattr(self, attr, original) else: setattr(self, attr, original) return pd.DataFrame(results)
[docs] def plot_sensitivity( self, parameter: str, values, return_period: int = 25, ax=None, show: bool = True, ): """Run a sensitivity analysis and plot *volume* vs *parameter*. See :meth:`run_sensitivity` for accepted parameters and values. :param ax: Optional matplotlib Axes. Created if *None*. :param show: Call ``plt.show()`` at the end (ignored when *ax* is provided). :returns: Tuple ``(ax, DataFrame)`` — the Axes and the results table. """ import matplotlib.pyplot as plt df = self.run_sensitivity(parameter, values, return_period) _, _, xlabel, logscale = self._SENSITIVITY_PARAMS[parameter] created = ax is None if created: fig, ax = plt.subplots(figsize=(8, 5)) ax.plot(df["parameter"], df["volume_m3"], "o-", color="royalblue", markersize=4) ax.set_xlabel(xlabel) ax.set_ylabel("Volume de rétention [m³]") ax.set_title(f"Sensibilité — {parameter}") if logscale: ax.set_xscale("log") ax.grid(True, linestyle="--", alpha=0.5) if created and show: plt.tight_layout() plt.show() return ax, df
[docs] def _land_use_to_name(self, land_use) -> str: """ Convert a LandUse enum or LandUseType to its enum name. """ if isinstance(land_use, LandUse): return land_use.name if isinstance(land_use, LandUseType): for member in LandUse: if member.value == land_use: return member.name raise ValueError(f"Cannot serialize land use: {land_use}")
[docs] def to_dict(self) -> dict: """ Serialize the mockup to a dictionary. """ areas_dict = {} for key, (land_use, area) in self.areas.items(): areas_dict[key] = { "land_use": self._land_use_to_name(land_use), "area_m2": area.to(UNITS.meter**2).magnitude, } return { "locality_nsi": self._locality_nsi, "reference_area_m2": self.reference_area.to(UNITS.meter**2).magnitude, "infiltration_area_m2": self._infiltration_area.to(UNITS.meter**2).magnitude, "infiltration_darcy_m_per_s": self._infiltration_darcy.to(UNITS.meter / UNITS.second).magnitude, "infiltration_security_factor": self._infiltration_security_factor, "admissible_outflow_l_per_s_per_ha": self._admissible_outflow.to(UNITS.liter / UNITS.second / UNITS.hectare).magnitude, "climate_delta_t": self._climate_delta_t, "climate_rate": self._climate_rate, "areas": areas_dict, }
[docs] def to_json(self, path: str | Path) -> None: """ Export the mockup to a JSON file. :param path: Path to the output JSON file. """ path = Path(path) with open(path, "w", encoding="utf-8") as f: json.dump(self.to_dict(), f, indent=4, ensure_ascii=False)
@classmethod
[docs] def from_dict(cls, data: dict) -> "MockupSPWXLSX": """ Create a MockupSPWXLSX instance from a dictionary. :param data: Dictionary with the mockup data. :return: A new MockupSPWXLSX instance. """ mockup = cls() if data.get("locality_nsi", 0) != 0: mockup.set_locality(data["locality_nsi"]) mockup.set_reference_area(data.get("reference_area_m2", 0)) mockup.infiltration_area = data.get("infiltration_area_m2", 0) mockup.infiltration_darcy = data.get("infiltration_darcy_m_per_s", 0) mockup._infiltration_security_factor = data.get("infiltration_security_factor", 2.0) mockup.admissible_outflow = data.get("admissible_outflow_l_per_s_per_ha", 5.0) mockup._climate_delta_t = data.get("climate_delta_t", 0.0) mockup._climate_rate = data.get("climate_rate", 0.07) for key, area_data in data.get("areas", {}).items(): land_use = LandUse[area_data["land_use"]] mockup.add_area(key, land_use, area_data["area_m2"]) return mockup
@classmethod
[docs] def make(cls, locality: int | str, areas: dict[str, tuple[LandUse | str, float | pint.Quantity]], infiltration_area: float | pint.Quantity = 0, infiltration_darcy: float | pint.Quantity = 0, infiltration_security_factor: float = 2.0, admissible_outflow: float | pint.Quantity = 5.0, climate_delta_t: float = 0.0, ) -> "MockupSPWXLSX": """Factory to create a fully configured instance in a single call. :param locality: Commune name or NIS code. :param areas: Dictionary ``{key: (land_use, area)}`` where *land_use* is a ``LandUse`` member or a string (enum name) and *area* is in m² (float) or a ``pint.Quantity``. :param infiltration_area: Infiltration surface in m² (float) or ``pint.Quantity``. :param infiltration_darcy: Hydraulic conductivity in m/s (float) or ``pint.Quantity``. :param infiltration_security_factor: Safety factor on infiltration (dimensionless, default 2). :param admissible_outflow: Allowable outflow in L/s/ha (float) or ``pint.Quantity``. :param climate_delta_t: Temperature increase for climate change scenario in °C (default 0). :return: A ready-to-use ``MockupSPWXLSX`` instance. Example ------- >>> m = MockupSPWXLSX.make( ... locality="Liège", ... areas={ ... "parking": (LandUse.IMPERVIOUS_SURFACES, 800), ... "toitures": (LandUse.WATER_AND_ROOFS, 350), ... "jardin": (LandUse.GRASSLANDS, 1500), ... }, ... infiltration_area=150, ... infiltration_darcy=1e-5, ... admissible_outflow=5.0, ... ) """ mockup = cls() mockup.set_locality(locality) for key, (land_use, area) in areas.items(): mockup.add_area(key, land_use, area) mockup.infiltration_area = infiltration_area mockup.infiltration_darcy = infiltration_darcy mockup._infiltration_security_factor = infiltration_security_factor mockup.admissible_outflow = admissible_outflow mockup.climate_delta_t = climate_delta_t return mockup
@classmethod
[docs] def from_json(cls, path: str | Path) -> "MockupSPWXLSX": """ Import a mockup from a JSON file. :param path: Path to the input JSON file. :return: A new MockupSPWXLSX instance. """ path = Path(path) with open(path, "r", encoding="utf-8") as f: data = json.load(f) return cls.from_dict(data)