Source code for wolfhece.validation.convergence

"""
Check the convergence of Wolf simulations.

This module provides tools for checking
the convergence of steady state wolf simulations.

Authors: HECE - University of Liege, Utashi Ciraane Docile
Created on 16th December 2025
"""
# Standard imports
# ------------------------------
from pathlib import Path

# Third-party imports
# ------------------------------
import numpy as np

from tqdm import tqdm
from matplotlib import pyplot as plt

# Local imports
# ------------------------------
from wolfhece.wolfresults_2D import views_2D, Wolfresults_2D
from wolfhece.wolf_array import WolfArray
from wolfhece.PyVertexvectors import vector

# Constants used throughout the code
# ----------------------------------
[docs] VALUES = 'Values'
[docs] COORDS = 'Coordinates'
[docs] DIFFERENCES = 'Differences'
[docs] DIFFERENCE_MAX = 'Residual [Max. dif.]'
[docs] DIFFERENCE_MEAN = 'Residual [Mean dif.]'
[docs] DIFFERENCE_MEDIAN = 'Residual [Median dif.]'
[docs] DIFFERENCE_PERCENTILE_99 = 'Residual [99th percentile dif.]'
[docs] CONVERGENCE = 'Convergence'
[docs] TIME_STEP = 'Time step'
[docs] TIMES = 'Time [s]'
[docs] SIM_TIME_STEPS = 'Simulation time steps'
[docs] H = 'H'
[docs] QX = 'QX'
[docs] QY = 'QY'
[docs] K ='K'
[docs] EPS ='EPS'
[docs] TOPO ='TOPO'
[docs] FRICTION = 'Friction'
[docs] INFILTRATION = 'Infiltration'
[docs] ARRAY_KEYS = [TOPO, H, QX, QY, K, EPS, FRICTION, INFILTRATION]
[docs] FONTSIZE =14
[docs] FONTSIZELEGEND = 14
[docs] LINEWIDTH = 2.
[docs] TICKPAD = 8
[docs] LABELPADX = 8
[docs] LABELPADY = 12
[docs] BOXPAD = 5
[docs] DARKBLUE = '#0A1172'
[docs] DARKRED = '#B00000'
[docs] GREY = '#7E7E7E'
# FIGURE : Font and line styles # --------------------------------- plt.rc('font', family='garamond', size=FONTSIZE) plt.rcParams['axes.linewidth'] = LINEWIDTH
[docs] class SteadyStateChecker: """ Class to check the convergence of steady state simulations. """ def __init__(self, simulation_file: str | Path| Wolfresults_2D): """ Initialize the SteadyStateChecker class. :param simulation_file: Path to Wolf simulation file (tested on CPU simulation), :type simulation_file: str | Path """ if isinstance(simulation_file, str): simulation_file = Path(simulation_file) if isinstance(simulation_file, Path): self.simulation = Wolfresults_2D(simulation_file) elif isinstance(simulation_file, Wolfresults_2D): self.simulation = simulation_file else: raise ValueError("simulation_file must be a string, a Path or a Wolfresults_2D object.")
[docs] def _harmonize_result_indices(self, from_result: int = None, to_result: int= None) -> np.ndarray: """ Return the array of available result indices in an increasing order for the given bounds. Parameters ---------- :param nb_of_results: int The number of results in the simulation. :type nb_of_results: int :param from_result: int, optional The time step (result index) to start from. If None, starts from 0. :type from_result: int, optional :param to_result: int, optional The time step (result index) to end at. If None, ends at the last time step. :type to_result: int, optional :return: np.ndarray The range of time steps to consider. :rtype: np.ndarray """ nb_of_results = self.simulation.get_nbresults() if from_result is None: from_result = 0 if abs(from_result) >= nb_of_results: if from_result == -nb_of_results: pass else: raise ValueError(f"Imposible to check convergence.\ \nfrom_result {from_result} is greater than\ or equal the number of results {nb_of_results}.") if from_result < 0: from_result = nb_of_results + from_result if to_result is None: to_result = nb_of_results if abs(to_result) > nb_of_results: raise ValueError(f"to_result {to_result} is greater than\ the number of results {nb_of_results}.") if to_result < 0: to_result = nb_of_results+1 + to_result assert from_result < to_result, \ "from_result must be less than to_result." return np.arange(from_result, to_result, step=1)
[docs] def compute_convergence(self, variable: views_2D, threshold: float|int, previous_result: WolfArray, timestep: int, sim_times: list, sim_timesteps: list) -> tuple[dict, WolfArray]: """ Compute the convergence of a given variable at a specific time step. Parameters ---------- :param variable: View of the variable to check for convergence (e.g., views_2D.WATERDEPTH). :type variable: views_2D :param threshold: The maximum difference threshold for convergence. :type threshold: float|int :param previous_result: The result of the variable at the previous time step, :type previous_result: WolfArray :param timestep: The time step at which to check convergence. :type timestep: int :param sim_times: A list of simulation times. :type sim_times: list :param sim_timesteps: A list of simulation time steps. :type sim_timesteps: list """ # We initialize the results dictionary to store the convergence results # for the current variable at the current time step. results = {DIFFERENCE_MAX: None, DIFFERENCE_MEAN: None, DIFFERENCE_MEDIAN: None, DIFFERENCE_PERCENTILE_99: None, CONVERGENCE: None, TIME_STEP: timestep, TIMES: sim_times[timestep], SIM_TIME_STEPS: sim_timesteps[timestep]} # We read the simuated result depending on the simulation type (mono or multi block) # and convert it to a WolfArray if needed. if self.simulation.nb_blocks >1: result = self.simulation.get_result(timestep, variable).as_WolfArray() else: result = self.simulation.get_result(timestep, variable) # If there is no previous result, we cannot compute the differences # and check for convergence, so we return the results # with the current result and exit the function. if previous_result is None: return results, result # We compute the absolute difference between the current result and the previous result, diff = np.abs(result.array.data - previous_result.array.data) # We extract the maximum, mean, median and 99th percentile of the differences # and store them in the results dictionary. results[DIFFERENCE_MAX ] = np.nanmax(diff) results[DIFFERENCE_MEAN] = np.nanmean(diff) results[DIFFERENCE_MEDIAN] = np.nanmedian(diff) results[DIFFERENCE_PERCENTILE_99] = np.nanpercentile(diff,99) # We check the convergence status of the current variable # based on the maximum difference if results[DIFFERENCE_MAX] < threshold: results[CONVERGENCE] = True else: results[CONVERGENCE] = False # We return two objects: the dictionary containing the convergence metrics, and # the simulated result as a WolfArray for the next computation. return results,result
[docs] def check_steadiness(self, from_result: int = None, to_result: int= None, threshold_h: float = None, threshold_q: float = None, threshold_u: float = None, threshold_k: float = None, threshold_eps: float = None, figsize: tuple = None, figax: tuple = None, show: bool = True, turbulence: bool = True, plot_max: bool = True, plot_median: bool = False, plot_percentile_99: bool = True, ) -> dict[views_2D, list[dict]]: """ Return a dictionary containing the steadiness results of each variable (4 + 2). - if plot is True, plot the convergence curves of existing variables. - if turbulence is True, PLot the two turbulence variables as well (TKE and Epsilon). Check if a simulation has converged for a given variable at a specific time step. Prameters ----------------- :param from_result: Index of the result to start from. If None, starts from the first result. :type from_result: int, optional :param to_result: Index of the result to end at. If None, ends at the last result. :type to_result: int, optional :param threshold_h: Maximum difference threshold for water depth convergence. If None, defaults to 0.05. :type threshold_h: float, optional :param threshold_q: Maximum difference threshold for discharge convergence. If None, defaults to 0.5. :type threshold_q: float, optional :param threshold_u: Maximum difference threshold for velocity convergence. If None, defaults to 0.05. :type threshold_u: float, optional :param threshold_k: Maximum difference threshold for TKE convergence. If None, defaults to 0.001. :type threshold_k: float, optional :param threshold_eps: Maximum difference threshold for Epsilon convergence. If None, defaults to 0.0005. :type threshold_eps: float, optional :param figsize: Size of the figure to plot. If None, defaults to (24, 12). :type figsize: tuple, optional :param plot: Whether to plot the convergence curves. Defaults to True. :type plot: bool, optional :param turbulence: Whether to check the convergence of turbulence variables (TKE and Epsilon). Defaults to True. :type turbulence: bool, optional :param plot_max: Whether to plot the maximum difference curve. Defaults to True. if not Only the 99th percentile and other metrics are plotted. :type plot_max: bool, optional :param plot_median: Whether to plot the median difference curve. Defaults to False. :type plot_median: bool, optional :param plot_percentile_99: Whether to plot the 99th percentile difference curve. Defaults to True. :type plot_percentile_99: bool, optional return: A dictionary containing the steadiness results for each variable. The keys are the variable names (views_2D) and the values are lists of dictionaries containing the convergence results for each time step. :rtype: dict[views_2D, list[dict]] """ # Initialize the views to check based on the turbulence parameter if turbulence: views = [views_2D.WATERDEPTH, views_2D.QX, views_2D.QY, views_2D.UNORM, views_2D.KINETIC_ENERGY, views_2D.EPSILON] else: views = [views_2D.WATERDEPTH, views_2D.QX, views_2D.QY, views_2D.UNORM] # Set default thresholds if not provided if threshold_h is None: threshold_h = 5e-2 if threshold_q is None: threshold_q = 5e-1 if threshold_u is None: threshold_u = 5e-2 if threshold_k is None: threshold_k = 1e-3 if threshold_eps is None: threshold_eps = 5e-4 thresholds = {views_2D.WATERDEPTH: threshold_h, views_2D.QX: threshold_q, views_2D.QY: threshold_q, views_2D.UNORM: threshold_u, views_2D.KINETIC_ENERGY: threshold_k, views_2D.EPSILON: threshold_eps} # We initialize the results dictionary to store the convergence results for each variable all_results = {views_2D.WATERDEPTH: [], views_2D.QX: [], views_2D.QY: [], views_2D.UNORM: [], views_2D.KINETIC_ENERGY: [], views_2D.EPSILON: []} # We also initialize a dictionary to store the previous results for each variable, # which will be used to compute the differences and check for convergence at each time step previous_results = {views_2D.WATERDEPTH: None, views_2D.QX: None, views_2D.QY: None, views_2D.UNORM: None, views_2D.KINETIC_ENERGY: None, views_2D.EPSILON: None} # We get the number of results (time steps). nb_of_results = self.simulation.get_nbresults() # We get the timesteps sim_times, sim_timesteps = self.simulation.get_times_steps() assert nb_of_results == len(sim_times) == len(sim_timesteps), \ "Inconsistent number of results, times and time steps in the simulation." range_of_timesteps = self._harmonize_result_indices(from_result,to_result) # We read the first result chosen. self.simulation.read_oneresult(range_of_timesteps[0]) # We loop over the time steps and variables to compute the convergence results for each variable # at each time step, and store them in the all_results dictionary. for timestep in tqdm(range_of_timesteps, desc=f"Checking convergence"): for variable in tqdm(views, desc=f"Variable:", disable= True): res, previous_array = self.compute_convergence(variable=variable, threshold = thresholds[variable], previous_result=previous_results[variable], timestep=timestep, sim_times=sim_times, sim_timesteps=sim_timesteps) all_results[variable].append(res) previous_results[variable] = previous_array # self.simulation.next_result() # We remove the first result because it should be none. for variable in views: all_results[variable].pop(0) # We plot the convergence results for each variable if plot is True. self.plot_steadiness_simulation(steadiness_results=all_results, threshold=thresholds, figsize=figsize, figax=figax, turbulence=turbulence, plot_max=plot_max, plot_median=plot_median, plot_percentile_99=plot_percentile_99, show=show ) # We return the convergence results for each variable as a dictionary. return all_results
[docs] def check_convergence_under_vector(self, vect: vector, variable: views_2D = None, threshold: float = None, from_result: int = None, to_result: int= None, ylim: tuple = None, figax: tuple = None, show: bool = True) -> list: """ Check if a simulation has converged for a given variable at a specific time step. Parameters ---------- :param variable: views_2D The variable to check for convergence. :type variable: views_2D :param threshold: float The threshold for convergence ensure the variable does no longer vary beyond this value. :type threshold: float :return: list of dict containing the congergence report (results at each time step) the variable in each dictionnary are: - Values: np.ndarray The values of the variable at the current time step. - Coordinates: np.ndarray The coordinates of the points under the vector at the current time step. - Differences: np.ndarray The difference (variable evolution) from the previous time step. - Maximum difference: float The maximum difference from the previous time step. - Convergence: bool True if the simulation has converged at the current time step, False otherwise. - Time step: int The current time step. True if the simulation has converged, False otherwise. :rtype: list of dict """ # We make sure that the vector is an instance of the PyVertexvectors.vector class, assert isinstance(vect, vector), \ "vect must be an instance of PyVertexvectors.vector." # We set default values for the variable and threshold if they are not provided. if variable is None: variable = views_2D.WATERDEPTH if threshold is None: threshold = 1e-3 # We get the number of results (time steps) # and the corresponding times and time steps from the simulation. nb_of_results = self.simulation.get_nbresults() sim_times, sim_timesteps = self.simulation.get_times_steps() assert nb_of_results == len(sim_times) == len(sim_timesteps), \ "Inconsistent number of results, times and time steps in the simulation." # We get the range of time steps to consider based on the from_result and to_result parameters. range_of_timesteps = self._harmonize_result_indices(from_result,to_result) all_results = [] # We loop over the time steps to compute the convergence results # for the given variable under the vector at each time step, # and store them in the all_results list. for timestep in tqdm(range_of_timesteps, desc=f"Checking convergence of {variable.name}"): # We initialize the results dictionary to store the convergence results results = {VALUES: None, COORDS: None, DIFFERENCES: None, DIFFERENCE_MAX: None, CONVERGENCE: None, TIME_STEP: timestep, TIMES: sim_times[timestep], SIM_TIME_STEPS: sim_timesteps[timestep]} # We read the simulated result for the given variable at the current time step, if self.simulation.nb_blocks >1: result = self.simulation.get_result(timestep, variable).as_WolfArray() else: result = self.simulation.get_result(timestep, variable) # We extract the values of the variable under the vector. vect_values = result.get_values_underpoly(myvect=vect, usemask= False, getxy= True) # We store the values and coordinates of the points under the vector in the results dictionary. results[VALUES] = vect_values[0] results[COORDS] = vect_values[1] length_all_results = len(all_results) # If there are previous results, we compute the differences and check for convergence. if length_all_results > 0: assert len(results[VALUES]) == len(all_results[length_all_results-1][VALUES]), \ "Number of points under vector changed between time steps." assert results[COORDS] == all_results[length_all_results-1][COORDS], \ "Coordinates of points under vector changed between time steps." results[DIFFERENCES] = results[VALUES] - all_results[length_all_results-1][VALUES] results[DIFFERENCE_MAX] = max(abs(results[DIFFERENCES])) if results[DIFFERENCE_MAX] < threshold: results[CONVERGENCE] = True else: results[CONVERGENCE] = False all_results.append(results) # We remove the first result because it should be none. all_results.pop(0) self.plot_convergence_results(convergence_results=all_results, variable_name=variable, threshold=threshold, ylim=ylim, figax=figax, show=show) return all_results
[docs] def plot_convergence_results(self, convergence_results: list, variable_name: str = None, figax: tuple = None, threshold: float = None, ylim: tuple = None, show = True ) -> None: """Plot the convergence results for a given variable. Parameters ---------- :param convergence_results: The convergence results to plot. :type convergence_results: list :param variable_name: The name of the variable to plot (view2D). :type variable_name: str, :param figax: A tuple containing the figure and axis to plot on. If None, a new figure and axis will be created. :type figax: tuple, optional :param threshold: The convergence threshold to plot as a horizontal line. If None, no threshold line will be plotted at 0.001. :type threshold: float, optional :param ylim: The y-axis limits for the plot. If None, the limits will be automatically determined. :type ylim: tuple, optional """ # We set default values for the variable name and threshold if they are not provided. if variable_name is None: variable_name = 'Variable' if threshold is None: threshold = 1e-3 # We create a figure and axis for plotting if not provided. if figax is None: fig, ax = plt.subplots(figsize=(10, 6)) else: fig, ax = figax # We extract the maximum differences and time steps from the convergence results to plot the convergence curve. max_difference, timestep = [], [] for result in convergence_results: max_difference.append(result[DIFFERENCE_MAX]) timestep.append(result[TIMES]) # We plot the maximum difference curve and the convergence threshold if provided. ax.plot(timestep, max_difference, color=DARKBLUE, label = f'{variable_name}', marker='o', linewidth=LINEWIDTH, ) if threshold is not None: ax.axhline(y=threshold, color=DARKRED, linestyle='--', linewidth = LINEWIDTH, label='Convergence threshold') # We deal with axes parameters and labels, and show the plot. if ylim is None: ax.set_ylim(bottom=0, top= max(max(max_difference)*1.1, threshold*1.1)) else: ax.set_ylim(bottom=ylim[0], top= ylim[1]) ax.set_xlim(left = min(timestep), right= max(timestep)) ax.set_xlabel(TIMES) # ax.set_xticklabels(ax.get_xticks(), rotation=45) ax.set_ylabel(DIFFERENCE_MAX) ax.set_title(f'Convergence check : {variable_name}') legend = ax.legend(fontsize=FONTSIZELEGEND, loc = 'best', fancybox=False, edgecolor = 'k') legend.get_frame().set_linewidth(LINEWIDTH) ax.tick_params(which = 'both', direction = 'in', length = 6, width = LINEWIDTH, gridOn = False) ax.yaxis.set_ticks_position('both') ax.xaxis.set_ticks_position('both') ax.tick_params(pad=TICKPAD) fig.tight_layout() if show: fig.show()
[docs] def plot_steadiness_simulation(self, steadiness_results: dict, threshold: dict = None, figax: tuple = None, figsize: tuple = None, turbulence: bool = True, plot_max: bool = True, plot_median: bool = True, plot_percentile_99: bool = True, show: bool = True ): """Plot the convergence results for a given variable. Parameters ---------- :param convergence_results: list The convergence results to plot. :type convergence_results: list :param variable_name: str The name of the variable to plot. :type variable_name: str """ # We set default values for the figure size if not provided, and create a figure and axes for plotting. if figax is None: if figsize is None: figsize = (24, 12) if turbulence: fig, axs = plt.subplots(2,3,figsize=figsize) else: fig, axs = plt.subplots(2,2,figsize=figsize) else: fig, axs = figax axs = axs.flatten() i = 0 # We loop over the variables in the steadiness results and plot the convergence curves for each variable on a separate axis. for variable in steadiness_results.keys(): # We check if there are convergence results for the current variable. If not, we skip plotting for this variable. if steadiness_results[variable] is None: pass else: # We extract the convergence results for the current variable and the corresponding threshold if provided. convergence_results = steadiness_results[variable] threshold_var = threshold[variable] if threshold is not None else None max_difference, timestep = [], [] mean_difference, median_difference = [], [] percentile_99_difference = [] # We extract the maximum, mean, median and 99th percentile differences and time steps from the convergence results to plot the convergence curves. for result in convergence_results: max_difference.append(result[DIFFERENCE_MAX]) mean_difference.append(result[DIFFERENCE_MEAN]) median_difference.append(result[DIFFERENCE_MEDIAN]) percentile_99_difference.append(result[DIFFERENCE_PERCENTILE_99]) timestep.append(result[TIMES]) ax: plt.Axes = axs[i] # We plot the convergence curves for the current variable, including the maximum difference, mean difference, median difference and 99th percentile difference if specified, as well as the convergence threshold if provided. if plot_percentile_99: ax.plot(timestep, percentile_99_difference, color=DARKBLUE, label = f'99th percentile difference', marker='o', linewidth=LINEWIDTH, # linestyle='dashed', ) if plot_max: ax.plot(timestep, max_difference, color=DARKBLUE, label = f'max difference', marker='s', linewidth=LINEWIDTH, linestyle='dotted', ) ax.set_ylim(bottom=0, top= max(max(max_difference)*1.1, threshold_var*1.1 if threshold_var is not None else 1)) else: ax.set_ylim(bottom=0, top= max(max(percentile_99_difference)*1.1, threshold_var*1.1 if threshold_var is not None else 1)) ax.plot(timestep, mean_difference, color=GREY, label = f'mean difference', marker='x', linewidth=LINEWIDTH, linestyle='dashed', ) if plot_median: ax.plot(timestep, median_difference, color=GREY, label = f'median difference', marker='^', linewidth=LINEWIDTH, linestyle='dotted', ) if not plot_max: ax.set_ylim(bottom=0, top= max(max(percentile_99_difference)*1.1, threshold_var*1.1 if threshold_var is not None else 1)) if threshold_var is not None: ax.axhline(y=threshold_var, color=DARKRED, linestyle='--', linewidth = LINEWIDTH, label='Convergence threshold') ax.set_xlim(left = min(timestep), right= max(timestep)) ax.set_title(f'{variable}') ax.set_xlabel(TIMES) ax.set_ylabel("$\\Delta_{step [n-1]}$") legend = ax.legend(fontsize=FONTSIZELEGEND, loc = 'best', fancybox=False, edgecolor = 'k', facecolor = 'none') legend.get_frame().set_linewidth(LINEWIDTH) ax.tick_params(which = 'both', direction = 'in', length = 6, width = LINEWIDTH, gridOn = False, pad=TICKPAD) ax.yaxis.set_ticks_position('both') ax.xaxis.set_ticks_position('both') i += 1 if turbulence is False and i == 4: break fig.suptitle('Steadiness', fontweight='bold', fontsize="x-large", horizontalalignment='center') fig.tight_layout() if show: fig.show()