Source code for wolfhece.irm_qdf

"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

#Code INS des communes belges
import re
from os import path, mkdir
from pathlib import Path
from time import sleep
from typing import Literal, Union
import logging
import warnings

import matplotlib as mpl
from tqdm import tqdm
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
from scipy.optimize import minimize,curve_fit
from scipy.stats import gumbel_r,genextreme
import numpy as np

# We have tried pymupdf but its license is AGPL so it's more or less a no-go.
import pdfplumber

from .ins import Localities
from .PyTranslate import _
from .pydownloader import toys_dataset, DATADIR
from .PyVertexvectors import Zones, vector, Point, Polygon, wolfvertex as wv, getIfromRGB
from .drawing_obj import Element_To_Draw

[docs] Montana_a1 = 'a1'
[docs] Montana_a2 = 'a2'
[docs] Montana_a3 = 'a3'
[docs] Montana_b1 = 'b1'
[docs] Montana_b2 = 'b2'
[docs] Montana_b3 = 'b3'
[docs] RT2 = '2'
[docs] RT5 = '5'
[docs] RT10 = '10'
[docs] RT15 = '15'
[docs] RT20 ='20'
[docs] RT25 ='25'
[docs] RT30 = '30'
[docs] RT40 ='40'
[docs] RT50 ='50'
[docs] RT75 = '75'
[docs] RT100 ='100'
[docs] RT200 = '200'
[docs] RT = [RT2,RT5,RT10,RT15,RT20,RT25,RT30,RT40,RT50,RT75,RT100,RT200]
[docs] freqdep=np.array([1./float(x) for x in RT])
[docs] freqndep=1.-freqdep
[docs] dur10min = '10 min'
[docs] dur20min = '20 min'
[docs] dur30min = '30 min'
[docs] dur1h = '1 h'
[docs] dur2h = '2 h'
[docs] dur3h = '3 h'
[docs] dur6h = '6 h'
[docs] dur12h = '12 h'
[docs] dur1d = '1 d'
[docs] dur2d = '2 d'
[docs] dur3d = '3 d'
[docs] dur4d = '4 d'
[docs] dur5d = '5 d'
[docs] dur7d = '7 d'
[docs] dur10d = '10 d'
[docs] dur15d = '15 d'
[docs] dur20d = '20 d'
[docs] dur25d = '25 d'
[docs] dur30d = '30 d'
[docs] durationstext=[dur10min,dur20min,dur30min,dur1h,dur2h,dur3h,dur6h,dur12h,dur1d, dur2d,dur3d,dur4d,dur5d,dur7d,dur10d,dur15d,dur20d,dur25d,dur30d]
durations = np.array([10,20,30,60,120,180,360,720],np.float64)
[docs] durationsd = np.array([1,2,3,4,5,7,10,15,20,25,30],np.float64)*24.*60.
[docs] durations = np.concatenate([durations,durationsd])
[docs] durations_seconds = durations * 60. # Convert durations to seconds
[docs] class MontanaIRM(): """ Classe pour la gestion des relations de Montana pour les précipitations """ def __init__(self, coeff:pd.DataFrame, time_bounds=None) -> None: if time_bounds is None: self.time_bounds = [25,6000] else: self.time_bounds = time_bounds
[docs] self.coeff=coeff
[docs] def get_ab(self, dur, T): """ Get the Montana coefficients for a given duration and return period :param dur: the duration, in minutes :param T: the return period, in years """ curcoeff = self.coeff.loc[float(T)] if dur<self.time_bounds[0]: a=curcoeff[Montana_a1] b=curcoeff[Montana_b1] elif dur<=self.time_bounds[1]: a=curcoeff[Montana_a2] b=curcoeff[Montana_b2] else: a=curcoeff[Montana_a3] b=curcoeff[Montana_b3] return a,b
[docs] def get_meanrain(self, dur, T, ab= None): """ Get the mean rain for a given duration and return period Montana formula is i = a * dur^(-b) where a and b are the Montana coefficients for the given duration and return period Unity of i is [mm/h] and dur is in [minutes] :param dur: the duration, in minutes :param T: the return period, in years :param ab: the Montana coefficients :return: the mean rain in [mm/h] """ if ab is None: ab = self.get_ab(dur,T) return ab[0]*dur**(-ab[1])
[docs] def get_instantrain(self, dur, T, ab= None): """ Get the instantaneous rain for a given duration and return period :param dur: the duration, in minutes :param T: the return period, in years :param ab: the Montana coefficients :return: the instantaneous rain in [mm/h] """ if ab is None: ab = self.get_ab(dur,T) meani=self.get_meanrain(dur,T,ab) return (1.-ab[1])*meani
[docs] def get_quantityrain(self, dur, T): """ Get the quantity of rain for a given duration and return period. The quantity of rain is the mean rain multiplied by the duration expressed in hours (dur/60) to convert it from [mm/h] to [mm] :param dur: the duration, in minutes :param T: the return period, in years :return: the quantity of rain in [mm] """ rain = self.get_meanrain(dur,T) return rain * (dur/60)
[docs] def get_delta_quantityrain(self, dur_start, dur_end, T): """ Get the difference in quantity of rain between two durations for a given return period. :param dur_start: the starting duration, in minutes :param dur_end: the ending duration, in minutes :param T: the return period, in years :return: the difference in quantity of rain in [mm] """ return self.get_quantityrain(dur_end, T) - self.get_quantityrain(dur_start, T)
[docs] def get_delta_meanrain(self, dur_start, dur_end, T): """ Get the difference in mean rain between two durations for a given return period. :param dur_start: the starting duration, in minutes :param dur_end: the ending duration, in minutes :param T: the return period, in years :return: the difference in mean rain in [mm/h] """ return self.get_delta_quantityrain(dur_start, dur_end, T) / ((dur_end - dur_start)/60)
[docs] def get_hyetogram_Chicago(self, total_duration, T, r:Literal['centered', 'left', 'right'] = 'centered'): """ Get the Chicoago hyetogram for a given return period, a total duration and a decentration coefficient. The return values are x and y, where x is the starting time of each step of the hyetogram in minutes and y is the intensity of rain for each step of the hyetogram in [mm/h]. x must be interpreted as [x_i, x_{i+1}[ for each step i of the hyetogram, and the intensity y_i is the intensity of rain during this step. The hyetogram is built as follows: - In centered mode, the peak is located at the middle of the hyetogram and has a duration of 10 minutes - In left mode, the peak is located at the beginning of the hyetogram and has a duration of 10 minutes - In right mode, the peak is located at the end of the hyetogram and has a duration of 10 minutes - The intensity is calculated using the Montana coefficients and the mean rain, to ensure the total quantity of rain is correct - The time step is 5 minutes :param total_duration: the total duration of the hyetogram, in minutes (must be greater than 10 minutes and a multiple of 5 minutes) :param T: the return period in years :param r: Decentration mode, either 'centered', 'left' or 'right' :return: x: the starting time of each step of the hyetogram in minutes, y: the intensity of rain for each step of the hyetogram in [mm/h] """ assert r in ['centered', 'left', 'right'], "Decentration mode r must be either 'centered', 'left' or 'right'" assert total_duration > 10, "Total duration must be greater than 10 minutes" assert total_duration % 5 == 0, "Total duration must be a multiple of 5 minutes" assert str(T) in RT, "Return period T must be one of the following: " + ", ".join(RT) x = np.arange(0, total_duration, 5, dtype=np.float64) y = np.zeros(len(x)) # Constructing the rainfall left, it is more easy # We will then distribute the rain to the right of the peak in order to have # the correct total quantity of rain, while respecting the shape of # the hyetogram given by the Montana coefficients if r in ['left', 'right']: y[:2] = self.get_meanrain(10, T) # Peak intensity for the first 10 minutes for k in range(2,len(x)): y[k] = self.get_delta_meanrain(k*5, (k+1)*5, T) if r == 'left': return x, y else: # r == 'right' return x, y[::-1] # Reverse the order of the intensities for the right side elif r == 'centered': start_peak_idx = len(x) // 2 - 1 # Start of the peak (10 minutes) end_peak_idx = len(x) // 2 +1 # End of the peak (10 minutes) y[start_peak_idx:end_peak_idx] = self.get_meanrain(10, T) # Peak intensity for the 10 minutes around the center for k in range(end_peak_idx, len(x)): d_loc = (k - end_peak_idx + 1) * 10 # Duration from the end of the peak # As we are in centered mode, the distribution of rain is symmetric around the center, # so we can calculate the intensity on the right side and then apply it to the left side. # As the timestep is fixed to 5 minutes (for each side), we can calculate the intensity at the center # of each step of the hyetogram every 10 minutes (5 min for the left-part, 5 min for the right-part) . y[k] = self.get_delta_meanrain(d_loc, d_loc + 10, T) y[start_peak_idx - (k - end_peak_idx + 1)] = y[k] # Symmetric distribution on the left side return x, y
[docs] def get_hyetogram_Chicago_rvar(self, total_duration, T, r:float = 0.5, timestep:float = 5.0, peak_duration:float = 10.0): """ Get the Chicoago hyetogram for a given return period, a total duration and a decentration coefficient. The return values are x and y, where x is the starting time of each step of the hyetogram in minutes and y is the intensity of rain for each step of the hyetogram in [mm/h]. x must be interpreted as [x_i, x_{i+1}[ for each step i of the hyetogram, and the intensity y_i is the intensity of rain during this step. The hyetogram is built as follows: - The peak is located at (total_duration * r) and has a duration of peak_duration minutes - The intensity is calculated using the Montana coefficients and the mean rain, to ensure the total quantity of rain is correct - The time step is parameterizable but cosntant for the whole hyetogram, so for example if the time step is 5 minutes, the intensity is calculated every 5 minutes and is constant during these 5 minutes. - if r is not in [0,1,0.5], the hyetogram is computed step-by-step, by alternating steps on the right and left of the peak, to ensure a good resolution around the peak, as the shape of the hyetogram is more important around the peak. :param total_duration: the total duration of the hyetogram, in minutes (must be greater than 10 minutes and a multiple of 5 minutes) :param T: the return period in years :param r: Decentration coefficient, a value in [0, 1] indicating the position of the peak (0 means the peak is at the end of the hyetogram, 1 means the peak is at the beginning of the hyetogram, 0.5 means the peak is in the middle of the hyetogram) :param timestep: the time step for the hyetogram in minutes :param peak_duration: the duration of the peak in minutes :return: x: the starting time of each step of the hyetogram in minutes, y: the intensity of rain for each step of the hyetogram in [mm/h] """ assert 0 <= r <= 1, "Decentration coefficient r must be a value in [0, 1]" assert total_duration > peak_duration, "Total duration must be greater than the peak duration" assert total_duration % timestep == 0, "Total duration must be a multiple of the timestep" assert peak_duration % timestep == 0, "Peak duration must be a multiple of the timestep" assert str(T) in RT, "Return period T must be one of the following: " + ", ".join(RT) x = np.arange(0, total_duration, timestep, dtype=np.float64) y = np.zeros(len(x)) size_peak = int(peak_duration / timestep) if r == 0.5: assert size_peak % 2 == 0, "For a centered peak, the peak duration must be an even multiple of the timestep to ensure symmetry" # Constructing the rainfall left, it is more easy # We will then distribute the rain to the right of the peak in order to have # the correct total quantity of rain, while respecting the shape of # the hyetogram given by the Montana coefficients if r in [0, 1]: y[:size_peak] = self.get_meanrain(peak_duration, T) # Peak intensity for the first peak_duration minutes for k in range(size_peak,len(x)): y[k] = self.get_delta_meanrain(k*timestep, (k+1)*timestep, T) if r == 0: return x, y else: # r == 1 return x, y[::-1] # Reverse the order of the intensities for the right side elif r == 0.5: start_peak_idx = int((len(x) - size_peak) // 2) # Start of the peak end_peak_idx = start_peak_idx + size_peak # End of the peak y[start_peak_idx:end_peak_idx] = self.get_meanrain(peak_duration, T) # Peak intensity for the peak_duration minutes around the center for k in range(end_peak_idx, len(x)): d_loc = (k - end_peak_idx + 1) * timestep * 2 # Duration from the end of the peak y[k] = self.get_delta_meanrain(d_loc, d_loc + timestep * 2, T) y[start_peak_idx - (k - end_peak_idx + 1)] = y[k] # Symmetric distribution on the left side else: # other ponderation coefficients start_peak_idx = int(len(x) * (1-r) - size_peak // 2) # Start of the peak end_peak_idx = start_peak_idx + size_peak # End of the peak y[start_peak_idx:end_peak_idx] = self.get_meanrain(peak_duration, T) # Peak intensity for the peak_duration minutes around the center # We will progress alternaltively on the right and left side of the peak, to ensure a good resolution around the peak, # as the shape of the hyetogram is more important around the peak. if r>0.5: nb_right = int(1/(1-r)) - 1 # Number of steps to do on the right side before doing a step on the left side d_loc = peak_duration i_right = end_peak_idx i_left = start_peak_idx - 1 nb = 0 while nb < len(x) - size_peak: # right-part for __ in range(nb_right): if i_right >= len(x): break y[i_right] = self.get_delta_meanrain(d_loc, d_loc + timestep, T) d_loc += timestep # Duration for the right part, starting from the end of the peak i_right += 1 nb += 1 # left-part if i_left >= 0: y[i_left] = self.get_delta_meanrain(d_loc, d_loc + timestep, T) d_loc += timestep # Duration for the left part, starting from the start of the peak i_left -= 1 nb += 1 else: # r<0.5 nb_left = int(1/r) - 1 # Number of steps to do on the left side before doing a step on the right side d_loc = peak_duration i_right = end_peak_idx i_left = start_peak_idx - 1 nb = 0 while nb < len(x) - size_peak: # left-part for __ in range(nb_left): if i_left < 0: break y[i_left] = self.get_delta_meanrain(d_loc, d_loc + timestep, T) d_loc += timestep # Duration for the left part, starting from the start of the peak i_left -= 1 nb += 1 # right-part if i_right < len(x): y[i_right] = self.get_delta_meanrain(d_loc, d_loc + timestep, T) d_loc += timestep # Duration for the right part, starting from the end of the peak i_right += 1 nb += 1 return x, y
[docs] def get_hyetogram_instant(self, durmax, T, r = 0.5, peak_duration = 10): """ Get the hyetogram for a given return period, a total duration and a decentration coefficient. The decentration coefficient r is a value in [0, 1] that indicates the position of the peak of the hyetogram. If r is 0, the peak is at the end of the hyetogram. If r is 1, the peak is at the beginning of the hyetogram. If r is 0.5, the peak is in the middle of the hyetogram. The time step of the hyetogram is variable and is equal to (1-r) minute before the peak and r minute after the peak, to ensure a good resolution around the peak. The intensity of rain is calculated as the instantaneous rain given by the Montana coefficients, to ensure the correct shape of the hyetogram. The value is picked at the center of each step of the hyetogram, so for example for the step [x_i, x_{i+1}[, the intensity is calculated at (x_i + x_{i+1})/2. The return values are x and y, where x is the starting time of each step of the hyetogram in minutes and y is the intensity of rain for each step of the hyetogram in [mm/h]. x must be interpreted as [x_i, x_{i+1}[ for each step i of the hyetogram, and the intensity y_i is the intensity of rain during this step. The hyetogram is built as follows: - The peak is located at (durmax * r) and has a duration of peak_duration minutes - The peak intensity is the mean rain for a duration of peak_duration minutes and the given return period - The shape is discretized with a step of (1-r) minute before the peak and r minute after the peak - The intensity is calculated using the Montana coefficients and the instantaneous rain for each point of the hyetogram Peak_duration and durmax will be rounded, so the actual peak duration and durmax may be slightly different from the input values. :param durmax: the maximum duration of the hyetogram, in minutes :param T: the return period :param r: Decentration coefficient :param peak_duration: the duration of the peak, in minutes """ assert 0 <= r <= 1, "Decentration coefficient r must be in [0, 1]" assert peak_duration > 0, "Peak duration must be positive" assert durmax > 0, "Maximum duration must be positive" assert durmax > peak_duration, "Maximum duration must be greater than peak duration" # assert durmax * r > peak_duration / 2, "Peak must be fully contained in the hyetogram (check durmax, r and peak_duration)" peak_duration = float(int(peak_duration)) durmax = float(int(durmax)) x = np.arange(peak_duration, durmax, 1, dtype=np.float64) if r==1.: # If r is 1, it means the peak is at the beginning of the hyetogram, so there is no point before the peak startpeak = 0. endpeak = peak_duration xafterpeak = np.arange(endpeak, durmax, 1.) x_hyeto = np.concatenate([[startpeak], xafterpeak]) elif r==0.: # If r is 0, it means the peak is at the end of the hyetogram, so there is no point after the peak startpeak = durmax - peak_duration endpeak = durmax xbeforepeak = np.arange(0., startpeak, 1.) x_hyeto = np.concatenate([xbeforepeak, [startpeak]]) else: startpeak = durmax * r - peak_duration / 2 endpeak = startpeak + peak_duration xbeforepeak = np.arange(0., startpeak, 1-r) xafterpeak = np.arange(endpeak, durmax, r) x_hyeto = np.concatenate([xbeforepeak, [startpeak], xafterpeak]) y_hyeto = np.zeros(len(x_hyeto)) for k in range(len(x_hyeto)): if x_hyeto[k] < startpeak: delta_peak = (startpeak - x_hyeto[k]) / (1-r) + peak_duration - (1-r) y_hyeto[k] = self.get_instantrain(delta_peak, T) elif x_hyeto[k] == startpeak: y_hyeto[k] = self.get_meanrain(dur = peak_duration, T = T) else: delta_peak = (x_hyeto[k] - endpeak) / r + peak_duration + r y_hyeto[k] = self.get_instantrain(delta_peak, T) return x_hyeto, y_hyeto
[docs] def plot_hyetogram_instant(self, durmax, T, r= 0.5, figax = None): """ Plot the hyetogram for a given return period :param durmax: the maximum duration of the hyetogram in minutes :param T: the return period :param r: Decentration coefficient """ x,y = self.get_hyetogram_instant(durmax,T,r) if figax is None: fig,ax = plt.subplots(1,1,figsize=[15,10], tight_layout=True) else: fig,ax = figax edges = np.append(x, durmax) ax.stairs(y, edges, fill=True, alpha=0.7, label=_("Hyetogram (instantaneous) for return period T=")+str(T)+" "+_("years")) ax.set_xlabel(_('Time [min]')) ax.set_ylabel(_('Intensity [mm/h]')) ax.legend().set_draggable(True) return fig,ax
[docs] def plot_hyetogram_Chicago(self, total_duration, T, r = 'centered', figax = None): """ Plot the Chicago hyetogram for a given return period, a total duration and a decentration coefficient. :param total_duration: the total duration of the hyetogram in minutes :param T: the return period :param r: Decentration mode, either 'centered', 'left' or 'right' """ x,y = self.get_hyetogram_Chicago(total_duration,T,r) if figax is None: fig,ax = plt.subplots(1,1,figsize=[15,10], tight_layout=True) else: fig,ax = figax edges = np.append(x, total_duration) ax.stairs(y, edges, fill=True, alpha=0.7, label=_("Chicago hyetogram for return period T=")+str(T)+" "+_("years")) ax.set_xlabel(_('Time [min]')) ax.set_ylabel(_('Intensity [mm/h]')) ax.legend().set_draggable(True) return fig,ax
[docs] def plot_hyetogram_Chicago_rvar(self, total_duration, T, r = 0.5, peak_duration = 10, timestep = 5, figax = None): """ Plot the Chicago hyetogram for a given return period, a total duration and a decentration coefficient. :param total_duration: the total duration of the hyetogram in minutes :param T: the return period :param r: Decentration mode, either 'centered', 'left' or 'right' :param peak_duration: the duration of the peak rainfall :param timestep: the time step for the hyetogram """ x,y = self.get_hyetogram_Chicago_rvar(total_duration,T,r,timestep, peak_duration) if figax is None: fig,ax = plt.subplots(1,1,figsize=[15,10], tight_layout=True) else: fig,ax = figax edges = np.append(x, total_duration) ax.stairs(y, edges, fill=True, alpha=0.7, label=_("Chicago hyetogram for return period T=")+str(T)+" "+_("years")) ax.set_xlabel(_('Time [min]')) ax.set_ylabel(_('Intensity [mm/h]')) ax.legend().set_draggable(True) return fig,ax
[docs] def plot_hyetograms_instant(self, durmax, r= 0.5, figax = None): """ Plot the hyetograms for all return periods :param durmax: the maximum duration of the hyetograms in minutes :param r: Decentration coefficient """ if figax is None: fig,ax = plt.subplots(1,1,figsize=[15,10], tight_layout=True) else: fig,ax = figax inverse_rt = RT.copy() inverse_rt.reverse() for curT in inverse_rt: x,y = self.get_hyetogram_instant(durmax,curT,r) edges = np.append(x, durmax) ax.stairs(y, edges, fill=True, alpha=0.7, label=_("Hyetogram (instantaneous) for return period T=")+str(curT)+" "+_("years")) ax.set_xlabel(_('Time [min]')) ax.set_ylabel(_('Intensity [mm/h]')) ax.legend().set_draggable(True) return fig,ax
[docs] def animate_hyetogram_Chicago(self, total_duration, T, r:float = 0.5, timestep:float = 5.0, peak_duration:float = 10.0, interval:int = 400, figsize:tuple = (18, 5)): """ Create a matplotlib animation showing the progressive construction of a Chicago hyetogram. The animation proceeds as follows: 1. Frame 0: empty axes with labels and IDF envelope 2. Frame 1: the peak bar(s) appear 3. Subsequent frames: bars are added alternating right/left of the peak, exactly mirroring the construction logic of ``get_hyetogram_Chicago_rvar``. 4. Final frame: the complete hyetogram is shown. Three panels are displayed: - Left: IDF curve (Montana) with current duration highlighted - Center: hyetogram under construction - Right: QDF curve (Montana) with cumulative volume of placed bars, showing that the construction respects the total rainfall volume. :param total_duration: the total duration of the hyetogram in minutes :param T: the return period in years :param r: Decentration coefficient in [0, 1] :param timestep: the time step in minutes :param peak_duration: the duration of the peak in minutes :param interval: delay between frames in ms :param figsize: figure size :return: matplotlib.animation.FuncAnimation """ from matplotlib.animation import FuncAnimation # Pre-compute the full hyetogram x, y_full = self.get_hyetogram_Chicago_rvar(total_duration, T, r, timestep, peak_duration) n = len(x) size_peak = int(peak_duration / timestep) edges = np.append(x, total_duration) # Build the ordered list of index-groups that mirrors the construction order # Each element is a list of indices to reveal in that frame. reveal_order: list[list[int]] = [] if r in (0, 1): peak_indices = list(range(size_peak)) rest = list(range(size_peak, n)) if r == 1: peak_indices = list(range(n - size_peak, n)) rest = list(range(n - size_peak - 1, -1, -1)) reveal_order.append(peak_indices) for idx in rest: reveal_order.append([idx]) elif r == 0.5: start_peak_idx = int((n - size_peak) // 2) end_peak_idx = start_peak_idx + size_peak reveal_order.append(list(range(start_peak_idx, end_peak_idx))) for k in range(end_peak_idx, n): mirror = start_peak_idx - (k - end_peak_idx + 1) group = [k] if 0 <= mirror < n: group.append(mirror) reveal_order.append(group) else: start_peak_idx = int(n * (1 - r) - size_peak // 2) end_peak_idx = start_peak_idx + size_peak reveal_order.append(list(range(start_peak_idx, end_peak_idx))) if r > 0.5: nb_right = int(1 / (1 - r)) - 1 i_right = end_peak_idx i_left = start_peak_idx - 1 remaining = n - size_peak while remaining > 0: for __ in range(nb_right): if i_right >= n: break reveal_order.append([i_right]) i_right += 1 remaining -= 1 if i_left >= 0 and remaining > 0: reveal_order.append([i_left]) i_left -= 1 remaining -= 1 else: nb_left = int(1 / r) - 1 i_right = end_peak_idx i_left = start_peak_idx - 1 remaining = n - size_peak while remaining > 0: for __ in range(nb_left): if i_left < 0: break reveal_order.append([i_left]) i_left -= 1 remaining -= 1 if i_right < n and remaining > 0: reveal_order.append([i_right]) i_right += 1 remaining -= 1 # --- Set up figure with 3 panels --- fig, (ax_idf, ax_hyeto, ax_qdf) = plt.subplots(1, 3, figsize=figsize, tight_layout=True) # Left panel: IDF envelope d_range = np.linspace(peak_duration, total_duration, 200) i_range = np.array([self.get_meanrain(d, T) for d in d_range]) ax_idf.plot(d_range, i_range, 'k-', lw=2, label=_('IDF curve (Montana)')) idf_dot, = ax_idf.plot([], [], 'ro', ms=8, zorder=5) idf_dots_placed, = ax_idf.plot([], [], 'o', color='steelblue', ms=5, alpha=0.6, zorder=4) ax_idf.set_xlabel(_('Duration [min]')) ax_idf.set_ylabel(_('Mean intensity [mm/h]')) ax_idf.set_title(_('IDF envelope')) ax_idf.legend(loc='upper right') ax_idf.set_xlim(0, total_duration * 1.05) ax_idf.set_ylim(0, i_range.max() * 1.15) # Center panel: hyetogram being built y_anim = np.zeros(n) ax_hyeto.set_xlim(edges[0], edges[-1]) ax_hyeto.set_ylim(0, y_full.max() * 1.15) ax_hyeto.set_xlabel(_('Time [min]')) ax_hyeto.set_ylabel(_('Intensity [mm/h]')) ax_hyeto.set_title(_('Chicago hyetogram – T=') + str(T) + ' ' + _('years')) stair_plot = ax_hyeto.stairs(y_anim, edges, fill=True, alpha=0.7, color='steelblue') step_text = ax_hyeto.text(0.98, 0.95, '', transform=ax_hyeto.transAxes, ha='right', va='top', fontsize=10, bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8)) # Right panel: QDF volume verification q_range = np.array([self.get_quantityrain(d, T) for d in d_range]) ax_qdf.plot(d_range, q_range, 'k-', lw=2, label=_('QDF curve (Montana)')) qdf_dot, = ax_qdf.plot([], [], 'ro', ms=8, zorder=5) qdf_dots_placed, = ax_qdf.plot([], [], 'o', color='steelblue', ms=5, alpha=0.6, zorder=4) ax_qdf.set_xlabel(_('Duration [min]')) ax_qdf.set_ylabel(_('Quantity [mm]')) ax_qdf.set_title(_('Volume verification (QDF)')) ax_qdf.legend(loc='upper left') ax_qdf.set_xlim(0, total_duration * 1.05) ax_qdf.set_ylim(0, q_range.max() * 1.15) placed_idf = [] placed_qdf = [] def _init(): y_anim[:] = 0. stair_plot.set_data(y_anim, edges) idf_dot.set_data([], []) idf_dots_placed.set_data([], []) qdf_dot.set_data([], []) qdf_dots_placed.set_data([], []) step_text.set_text('') placed_idf.clear() placed_qdf.clear() return stair_plot, idf_dot, idf_dots_placed, qdf_dot, qdf_dots_placed, step_text def _update(frame): if frame >= len(reveal_order): return stair_plot, idf_dot, idf_dots_placed, qdf_dot, qdf_dots_placed, step_text indices = reveal_order[frame] for idx in indices: y_anim[idx] = y_full[idx] stair_plot.set_data(y_anim, edges) # Compute cumulative duration and volume nb_revealed = sum(len(g) for g in reveal_order[:frame + 1]) d_equiv = min(nb_revealed * timestep, total_duration) i_equiv = self.get_meanrain(d_equiv, T) q_montana = self.get_quantityrain(d_equiv, T) # Actual volume from bars placed so far bar_widths = np.diff(edges) q_actual = np.sum(y_anim * bar_widths / 60.) # mm if frame == 0: step_text.set_text( _('Peak') + f': {peak_duration:.0f} min\n' f'V = {q_actual:.1f} mm ' f'(QDF: {q_montana:.1f} mm)') else: step_text.set_text( f'd = {d_equiv:.0f} min\n' f'V = {q_actual:.1f} mm ' f'(QDF: {q_montana:.1f} mm)') # IDF panel idf_dot.set_data([d_equiv], [i_equiv]) placed_idf.append((d_equiv, i_equiv)) idf_dots_placed.set_data([p[0] for p in placed_idf], [p[1] for p in placed_idf]) # QDF panel – plot actual volume (should fall on the curve) qdf_dot.set_data([d_equiv], [q_actual]) placed_qdf.append((d_equiv, q_actual)) qdf_dots_placed.set_data([p[0] for p in placed_qdf], [p[1] for p in placed_qdf]) return stair_plot, idf_dot, idf_dots_placed, qdf_dot, qdf_dots_placed, step_text anim = FuncAnimation(fig, _update, init_func=_init, frames=len(reveal_order), interval=interval, blit=False, repeat=True) return anim
[docs] class Qdf_IRM(): """ Gestion des relations QDF calculées par l'IRM Exemple d'utilisation : Pour importer les fichiers depuis le site web de l'IRM meteo.be from wolfhece.irm_qdf import Qdf_IRM qdf = Qdf_IRM(force_import=True) qdf = Il est possible de spécifier le répertoire de stockage des fichiers Excel Par défaut, il s'agit d'un sous-répertoire 'irm' du répertoire courant qui sera créé s'il n'exsiste pas Une fois importé/téléchargé, il est possible de charger une commune sur base de l'INS ou de son nom myqdf = Qdf_IRM(name='Jalhay') Les données sont ensuite disponibles dans les propriétés, qui sont des "dataframes" pandas (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) : - qdf : les relation Quantité/durée/fréquence - standarddev : l'écart-type de l'erreur - confintlow : la valeur inférieure de l'intervalle de confiance (-2*stddev) - confintup : la valeur supérieure de l'intervalle de confiance (+2*stddev) - montanacoeff : les coeffciients de Montana L'index est le temps (dur10min, dur30min, dur1h, ... -- durationstext) et les colonnes sont les périodes de retour (RT2, RT5, RT10, ... -- RT). Il est par exemple possible d'accéder aux coefficients de Montana via l'une de ces lignes ou une combinaison : display(myqdf.montanacoeff) rt = myqdf.montanacoeff.index display(myqdf.montanacoeff.loc[rt[0]]) display(myqdf.montanacoeff.iloc[0]) display(myqdf.get_Montanacoeff(qdf.RT2)) :param force_import: If True, will download all the IRM's QDF files (about 600). :param import_as_needed: If True and the IRM's QDF file has not yet been downloaded then the file is downloaded. """ def __init__(self, store_path= None, code:int= 0, name= '', force_import= False, ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018, localities:Localities = None, dataframe:pd.DataFrame = None, import_as_needed = False) -> None: if localities is None: assert int(ins) in [2018, 2019, 2025], _('Bad INS - Retry !') self.myloc = Localities(ins) else: self.myloc = localities if store_path is None: self.store = Path(__file__).parent / 'data' / 'irm' / 'qdf' else: self.store = Path(store_path) self.store.mkdir(parents=True, exist_ok=True) # This one will hold Qdf data of one locality. If it is None it means no # data has been loaded.
[docs] self.qdf = None
[docs] self.standarddev = None
[docs] self.confintlow = None
[docs] self.confintup = None
[docs] self.montanacoeff = None
[docs] self.montana = None
if force_import: # Import all QDF's from IRM Qdf_IRM.importfromwebsite(self.store, ins=ins) elif import_as_needed: # Import as needed is helpful in the case of unit tests. They # usually require only a handful of QDF curves and so downloading # only some of them is much faster. if name != "": code = self.myloc.get_INSfromname(name) if not (self.store / f"{code}.xlsx").exists(): Qdf_IRM.importfromwebsite(self.store, ins=ins, ins_code=code)
[docs] self._code = None
[docs] self._name = None
[docs] self._qdf_image_table = None
[docs] self._qdf_image_plot = None
if dataframe is not None: """ If a dataframe is provided, we assume it contains the QDF data and we set it directly. """ self._code = int(code) self._name = self.myloc.get_namefromINS(code) # Find columns containing '_Q' qdf_columns = ['Duration'] + [col for col in dataframe.columns if '_Q' in col] self.qdf = dataframe[qdf_columns].copy() #replace duration in seconds with duration texts self.qdf['Duration'] = self.qdf['Duration'].apply(lambda x: durationstext[list(durations_seconds).index(x)] if x in durations_seconds else x) # replace columns names self.qdf.columns = [col.replace('_Q', '') for col in self.qdf.columns] # Set duration as index self.qdf.set_index('Duration', inplace=True) # Remove the name of the index self.qdf.index.name = None # Convert columns name to string self.qdf.columns = [str(col) for col in self.qdf.columns] std_columns = ['Duration'] + [col for col in dataframe.columns if '_Std' in col] self.standarddev = dataframe[std_columns].copy() self.standarddev['Duration'] = self.standarddev['Duration'].apply(lambda x: durationstext[list(durations_seconds).index(x)] if x in durations_seconds else x) self.standarddev.set_index('Duration', inplace=True) confintlow_columns = ['Duration'] + [col for col in dataframe.columns if '_Low' in col] self.confintlow = dataframe[confintlow_columns].copy() self.confintlow['Duration'] = self.confintlow['Duration'].apply(lambda x: durationstext[list(durations_seconds).index(x)] if x in durations_seconds else x) self.confintlow.set_index('Duration', inplace=True) confintup_columns = ['Duration'] + [col for col in dataframe.columns if '_Up' in col] self.confintup = dataframe[confintup_columns].copy() self.confintup['Duration'] = self.confintup['Duration'].apply(lambda x: durationstext[list(durations_seconds).index(x)] if x in durations_seconds else x) self.confintup.set_index('Duration', inplace=True) self._read_csv_or_excel_Montana_only(code = self._code) self.fit_all() elif code !=0: if self._read_csv_or_excel(code=str(code)): self.fit_all() self._code = code self._name = self.myloc.get_namefromINS(code) else: logging.debug(f"INS code {code} not found in the store (see {self.store.absolute()})") elif name!='': if self._read_csv_or_excel(name=name): self.fit_all() self._name = name self._code = self.myloc.get_INSfromname(name) else: logging.debug(f"Name {name} not found in the store")
[docs] def has_data_for_locality(self) -> bool: """ Has this instance been initialized with data from a locality ? """ return self.qdf is not None
@property
[docs] def name(self): return self._name
@property
[docs] def code(self): return self._code
@property
[docs] def code_name(self): return str(self._code) + '-' + self._name
@property
[docs] def name_code(self): return self._name + '-' + str(self._code)
[docs] def export_allmontana2xls(self): """ Export all Montana coefficients to an Excel file """ newdf = [] for curcode in self.myloc.get_allcodes(): self._read_csv_or_excel(code=curcode) if self.montanacoeff is not None: self.montanacoeff['INS'] = [curcode]*12 self.montanacoeff['Name'] = [self.myloc.get_namefromINS(int(curcode))]*12 newdf.append(self.montanacoeff.copy()) self.montanacoeff=None newdf = pd.concat(newdf) newdf.to_excel("allmontana.xlsx")
@classmethod
[docs] def importfromwebsite(cls, store_path:Path = 'irm', verbose:bool= False, waitingtime:float= .01, ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018, ins_code: int = None): """ Import Excel files for one or all municipalities from the IRM website :param store_path: Where to store the downloaded data. Directory will be created if it doesn't exist. :param verbose: If `True`, will print some progress information. If `False`, will do nothing. If a callable, then will call it with a float in [0, 1]. 0 means nothing downloaded, 1 means everything downloaded. :param waitingtime: How long to wait (in seconds) betwenn the download of each station (will make sure we don't overwhelm IRM's website). :param ins: The year of the INS codes to use. :param code: Restricts the data download to a specific NIS code. `None` means full download. """ import requests myloc = Localities(ins) store_path = Path(store_path) if ins_code is not None: codes_to_load = [ins_code] else: if not store_path.exists(): store_path.mkdir(parents=True, exist_ok=True) codes_to_load = myloc.inscode2name for key,myins in enumerate(codes_to_load): #chaîne URL du fichier Excel url="https://www.meteo.be//resources//climatology//climateCity//xls//IDF_table_INS"+str(myins)+".xlsx" #Obtention du fichiers depuis le site web de l'IRM response=requests.get(url) if str(response.content).find("not found")==-1: # Make sure we create the store path only if we have # something to put inside. if ins_code is not None and not store_path.exists(): store_path.mkdir(parents=True, exist_ok=True) fname = store_path / (str(myins)+".xlsx") file=open(fname, 'wb') file.write(response.content) file.close() # To avoid bad default style import warnings from openpyxl import load_workbook with warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) wb = load_workbook(fname) #, read_only=True) wb.save(fname) # écrase le fichier avec un style par défaut if verbose: if callable(verbose): verbose(key/len(codes_to_load)) else: print(myins) else: #logging.error(response.content) logging.error(f"Failed to load IRM data: {url} --> {response}") sleep(waitingtime)
@classmethod
[docs] def make(cls, ins_code: int | str, store_path:Path = None, verbose:bool= False, waitingtime:float= .01, ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018): """ Make a Qdf_IRM instance with data from the IRM website :param store_path: Where to store the downloaded data. Directory will be created if it doesn't exist. :param verbose: If `True`, will print some progress information. If `False`, will do nothing. If a callable, then will call it with a float in [0, 1]. 0 means nothing downloaded, 1 means everything downloaded. :param waitingtime: How long to wait (in seconds) betwenn the download of each station (will make sure we don't overwhelm IRM's website). :param ins: The year of the INS codes to use. :param code: Restricts the data download to a specific NIS code. `None` means full download. """ if store_path is None: store_path = Path(__file__).parent / 'data' / 'irm' / 'qdf' if isinstance(ins_code, str): loc = Localities(ins) ins_code = loc.get_INSfromname(ins_code) if ins_code is None: raise ValueError(f"Invalid locality name: {ins_code}") cls.importfromwebsite(store_path=store_path, verbose=verbose, waitingtime=waitingtime, ins=ins, ins_code=ins_code) new_cls = cls(store_path=store_path, code = ins_code, ins=ins) return new_cls
[docs] def _read_csv_or_excel(self, code='', name=''): """ Lecture des caractéristiques d'une commune depuis le fichier CSV ou Excel associé au code INS :param code: le code INS de la commune :param name: le nom de la commune """ import warnings if code !='': loccode=str(code) name = self.myloc.get_namefromINS(int(loccode)) elif name!='': if not name.lower() in self.myloc.insname2code.keys(): return _('Bad name ! - Retry') loccode=str(self.myloc.insname2code[name.lower()]) self._code = loccode self._name = name store = self.store pathname_xls = store / (loccode+".xlsx") pathname_csv = store / 'csv' / loccode if pathname_csv.exists(): self.qdf = pd.read_csv(pathname_csv / 'qdf.csv', index_col=0) self.standarddev = pd.read_csv(pathname_csv / 'standarddev.csv', index_col=0) self.confintlow = pd.read_csv(pathname_csv / 'confintlow.csv', index_col=0) self.confintup = pd.read_csv(pathname_csv / 'confintup.csv', index_col=0) self.montanacoeff = pd.read_csv(pathname_csv / 'montanacoeff.csv', index_col=0) self.montana = MontanaIRM(self.montanacoeff) return True else: if path.exists(pathname_xls): with warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) self.qdf=pd.read_excel(pathname_xls,"Return level",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True}) self.standarddev=pd.read_excel(pathname_xls,"Standard deviation",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True}) self.confintlow=pd.read_excel(pathname_xls,"Conf. interval, lower bound",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True}) self.confintup=pd.read_excel(pathname_xls,"Conf. interval, upper bound",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True}) self.montanacoeff=pd.read_excel(pathname_xls,"Montana coefficients",index_col=0,skiprows=range(11),nrows=12,usecols="A:G",engine='openpyxl', engine_kwargs={'read_only': True}) self.montana = MontanaIRM(self.montanacoeff) return True else: logging.warning(f"Can't find montana data. Checked {pathname_csv.absolute()} and {pathname_xls.absolute()}") self.qdf=None self.standarddev=None self.confintlow=None self.confintup=None self.montanacoeff=None self.montana=None return False
[docs] def _read_csv_or_excel_Montana_only(self, code='', name=''): """ Lecture des caractéristiques d'une commune depuis le fichier CSV Excel associé au code INS :param code: le code INS de la commune :param name: le nom de la commune """ import warnings if code !='': loccode=str(code) name = self.myloc.get_namefromINS(int(loccode)) elif name!='': if not name.lower() in self.myloc.insname2code.keys(): return _('Bad name ! - Retry') loccode=str(self.myloc.insname2code[name.lower()]) self._code = loccode self._name = name store = self.store pathname_xls = store / (loccode+".xlsx") pathname_csv = store / 'csv' / loccode if pathname_csv.exists(): self.montanacoeff = pd.read_csv(pathname_csv / 'montanacoeff.csv', index_col=0) self.montana = MontanaIRM(self.montanacoeff) return True else: with warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) if path.exists(pathname_xls): self.montanacoeff=pd.read_excel(pathname_xls,"Montana coefficients",index_col=0,skiprows=range(11),nrows=12,usecols="A:G",engine='openpyxl', engine_kwargs={'read_only': True}) self.montana = MontanaIRM(self.montanacoeff) return True else: self.montanacoeff=None self.montana=None return False
@classmethod
[docs] def convert_xls2csv(cls, store_path= 'irm', ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018): """ Convert all Excel files to CSV files :param store_path: Where to store the downloaded data. Directory will be created if it doesn't exists. :param ins: The year of the INS codes to use. """ myloc = Localities(ins) store_path = Path(store_path) for key,myins in enumerate(myloc.get_allcodes()): pathname = store_path / (str(myins)+".xlsx") if pathname.exists(): try: logging.info(f"Converting {pathname} to CSV files") with warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) qdf=pd.read_excel(pathname,"Return level",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True}) standarddev=pd.read_excel(pathname,"Standard deviation",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True}) confintlow=pd.read_excel(pathname,"Conf. interval, lower bound",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True}) confintup=pd.read_excel(pathname,"Conf. interval, upper bound",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True}) montanacoeff=pd.read_excel(pathname,"Montana coefficients",index_col=0,skiprows=range(11),nrows=12,usecols="A:G",engine='openpyxl', engine_kwargs={'read_only': True}) store_csv = store_path / 'csv' / str(myins) store_csv.mkdir(exist_ok=True, parents=True) qdf.to_csv(store_csv / 'qdf.csv') standarddev.to_csv(store_csv / 'standarddev.csv') confintlow.to_csv(store_csv / 'confintlow.csv') confintup.to_csv(store_csv / 'confintup.csv') montanacoeff.to_csv(store_csv / 'montanacoeff.csv') except Exception as e: logging.error(f"Error processing {pathname}: {e}") else: logging.warning(f"File {pathname} does not exist, skipping conversion.") logging.info(_("If it is a problem, try to reimport the data from the IRM website."))
[docs] def plot_idf(self, T=None, which:Literal['All', 'Montana', 'QDFTable'] = 'All', color=[27./255.,136./255.,245./255.]): """ Plot IDF relations on a new figure :param T : the return period (based on RT constants) :param which : information to plot - 'Montana' - 'QDFTable' - 'All' """ if self.montana is None and which != 'QDFTable': logging.error(_("Montana coefficients are not available for this locality.")) return None, None if self.qdf is None and which != 'Montana': logging.error(_("QDF data is not available for this locality.")) return None, None fig,ax = plt.subplots(1,1,figsize=(15,10), tight_layout=True) ax.set_xscale('log') ax.set_yscale('log') if T is None: for k in range(len(RT)): pond = .3+.7*float(k/len(RT)) mycolor = color+[pond] if which=='All' or which=='QDFTable': ax.scatter(durations,self.qdf[RT[k]]/durations*60.,label=RT[k] + _(' QDF Table'),color=mycolor) if which=='All' or which=='Montana': iMontana = [self.montana.get_meanrain(curdur,RT[k]) for curdur in durations] ax.plot(durations,iMontana,label=RT[k] + ' Montana',color=mycolor) else: assert T in RT, _('Bad return period ! - Retry') if which=='All' or which=='QDFTable': ax.scatter(durations,self.qdf[T],label=T+ _(' QDF Table'),color=color) if which=='All' or which=='Montana': iMontana = [self.montana.get_instantrain(curdur,T) for curdur in durations] ax.plot(durations,iMontana,label=T + ' Montana',color=color) ax.legend().set_draggable(True) ax.set_xlabel(_('Duration [min]')) ax.set_ylabel(_('Intensity [mm/h]')) ax.set_xticks(durations) ax.set_xticklabels(durationstext,rotation=45) ax.set_title(self._name + ' - code : ' + str(self._code)) return fig,ax
[docs] def plot_qdf(self, T=None, which:Literal['All', 'Montana', 'QDFTable'] = 'All', color=[27./255.,136./255.,245./255.]): """ Plot QDF relations on a new figure :param T : the return period (based on RT constants) :param which : information to plot - 'Montana' - 'QDFTable' - 'All' """ if self.qdf is None and which != 'Montana': logging.error(_("QDF data is not available for this locality.")) return None, None if self.montana is None and which != 'QDFTable': logging.error(_("Montana coefficients are not available for this locality.")) return None, None fig,ax = plt.subplots(1,1,figsize=(12,8), tight_layout=True) ax.set_xscale('log') if T is None: for k in range(len(RT)): pond = .3+.7*float(k/len(RT)) mycolor = color+[pond] if which=='All' or which=='QDFTable': ax.scatter(durations,self.qdf[RT[k]],label=RT[k] + _(' QDF Table'),color=mycolor) if which=='All' or which=='Montana': QMontana = [self.montana.get_quantityrain(curdur,RT[k]) for curdur in durations] ax.plot(durations,QMontana,label=RT[k] + ' Montana',color=mycolor) else: assert T in RT, _('Bad return period ! - Retry') if which=='All' or which=='QDFTable': ax.scatter(durations,self.qdf[T],label=T+ _(' QDF Table'),color=color) if which=='All' or which=='Montana': QMontana = [self.montana.get_quantityrain(curdur,T) for curdur in durations] ax.plot(durations,QMontana,label=T + ' Montana',color=color) ax.grid(True, which='both', linestyle='--', linewidth=0.5) ax.legend().set_draggable(True) ax.set_xlabel(_('Duration [min]')) ax.set_ylabel(_('Quantity [mm]')) ax.set_xticks(durations) ax.set_xticklabels(durationstext,rotation=45) ax.set_title(self._name + ' - code : ' + str(self._code)) return fig,ax
[docs] def plot_cdf(self, dur=None): """ Plot the cdf of the QDF data for a given duration """ if self.qdf is None: logging.error(_("QDF data is not available for this locality.")) return None, None fig,ax = plt.subplots(1,1,figsize=(10,10), tight_layout=True) if dur is None: for k in range(len(durations)): pond = .3+.7*float(k/len(durations)) mycolor = (27./255.,136./255.,245./255.,pond) ax.scatter(self.qdf.loc[durationstext[k]],freqndep,marker='o',label=durationstext[k],color=mycolor) else: assert dur in durationstext, _('Bad duration - Retry !') ax.scatter(self.qdf.loc[dur],freqndep,marker='o',label=dur,color=(0,0,1)) ax.legend().set_draggable(True) ax.set_ylabel(_('Cumulative distribution function (cdf)')) ax.set_xlabel(_('Quantity [mm]')) ax.set_title(self._name + ' - code : ' + str(self._code)) return fig,ax
[docs] def fit_all(self): """ Fit all durations with a Generalized Extreme Value distribution """ self.load_fits_json() if self.popt_all == {}: for curdur in durationstext: fig,ax,popt,pcov = self.fit_cdf(curdur) self.popt_all[curdur]=popt # self.pcov_all[curdur]=pcov self.save_fits_json()
[docs] def save_fits_json(self): """ Save the fits in a csv file """ with open(self.store / (str(self._code) + '_fits.json'), 'w') as f: df = pd.DataFrame(self.popt_all) df.to_json(f)
# with open(path.join(self.store, str(self.code) + '_fits_cov.json'), 'w') as f: # df = pd.DataFrame(self.pcov_all) # df.to_json(f)
[docs] def load_fits_json(self): """ Load the fits from a json file """ import json filename = self.store / (str(self._code) + '_fits.json') if filename.exists(): with open(filename, 'r') as f: self.popt_all = json.load(f) for key in self.popt_all.keys(): self.popt_all[key] = np.array([val for val in self.popt_all[key].values()]) else: self.popt_all = {}
# filename = path.join(self.store, str(self.code) + '_fits_cov.json') # if path.exists(filename): # with open(filename, 'r') as f: # self.pcov_all = json.load(f) # else: # self.pcov_all = {}
[docs] def fit_cdf(self, dur=None, plot=False): """ Fit the cdf of the QDF data with a Generalized Extreme Value distribution :param dur: the duration to fit :param plot: if True, will plot the cdf with the fit """ if dur is None: return _('Bad duration - Retry !') if dur not in durationstext: return _('Bad duration - Retry !') x=np.asarray(self.qdf.loc[dur], dtype=np.float64) def locextreme(x,a,b,c): return genextreme.cdf(x, a, loc=b, scale=c) def locextreme2(a): LL = -np.sum(genextreme.logpdf(x,a[0],loc=a[1],scale=a[2])) return LL popt = genextreme.fit(x) popt, pcov = curve_fit(locextreme, x, freqndep, p0=popt) #ptest = minimize(locextreme2,popt,bounds=[[-10.,0.],[0.,100.],[0.,100.]]) # perr = np.sqrt(np.diag(pcov)) fig=ax=None if plot: fig,ax=self.plot_cdf(dur) ax.plot(x,genextreme.cdf(x,popt[0],loc=popt[1],scale=popt[2]),label='fit') # ax.plot(x,genextreme.cdf(x,ptest.x[0],loc=ptest.x[1],scale=ptest.x[2]),label='fit_MLE') ax.legend().set_draggable(True) self.stat = genextreme return fig,ax,popt,pcov
[docs] def get_Tfromrain(self, Q, dur=dur1h): """ Get the return period for a given quantity of rain :param Q: the quantity of rain :param dur: the duration """ return 1./self.stat.sf(Q, self.popt_all[dur][0], loc=self.popt_all[dur][1], scale=self.popt_all[dur][2])
[docs] def get_rainfromT(self, T, dur= dur1h): """ Get the quantity of rain for a given return period and duration :param T: the return period :param dur: the duration """ return self.stat.isf(1./T,self.popt_all[dur][0],loc=self.popt_all[dur][1],scale=self.popt_all[dur][2])
[docs] def get_MontanacoeffforT(self, return_period): """ Get the Montana coefficients for a given return period :param return_period: the return period """ if return_period in RT: return self.montanacoeff.loc[float(return_period)] else: return _('Bad RT - Retry !')
[docs] def plot_hyeto(self, durmax, T, r=.5): """ Plot the hyetogram for a given return period :param durmax: the maximum duration of the hyetogram :param T: the return period :param r: the decentration coefficient """ fig,ax = self.montana.plot_hyeto(durmax,T,r) ax.set_title(self._name + ' - code : ' + str(self._code)) return fig
[docs] def plot_hyetos(self, durmax, r=.5): """ Plot the hyetograms for all return periods :param durmax: the maximum duration of the hyetograms :param r: the decentration coefficient """ fig,ax = self.montana.plot_hyetograms_instant(durmax,r) ax.set_title(self._name + ' - code : ' + str(self._code))
def __str__(self) -> str: """ Return the QDF data as a string """ return self.qdf.__str__()
[docs] def make_image_qdf_plot(self, T= None, which:Literal['All', 'Montana', 'QDFTable'] = 'All', color=[27./255.,136./255.,245./255.]): """ Create an image of the QDF plot. We use the `matplotlib` library to create a PNG image of the QDF data. The image will be saved in the store path with the name `<code>_qdf_plot.png`. :param durmax: the maximum duration of the hyetograms :param r: Decentration coefficient :return: a PNG image """ import matplotlib self._qdf_image_plot = self.store / f"{self.code_name}_qdf_plot.png" if self._qdf_image_plot.exists(): return self._qdf_image_plot old_backend = matplotlib.get_backend() matplotlib.use('Agg') # Use a non-interactive backend for saving images fig, ax = self.plot_qdf(T=T, which=which, color=color) fig.savefig(self._qdf_image_plot, dpi=300) plt.close(fig) matplotlib.use(old_backend) # Restore the original backend return self._qdf_image_plot
[docs] def make_image_qdf_table(self): """ Create an image of the QDF data. We use the `dataframe_image` library to create a PNG image of the QDF data. Added style to the DataFrame to make it more readable. :return: a PNG image """ try: import dataframe_image as dfimg except ImportError: logging.error(_("The 'dataframe_image' library is not installed. Please install it to create QDF table images.")) return None if self.qdf is None: logging.error(_("QDF data is not available for this locality.")) return None qdf = self.qdf.copy() # Create a styled DataFrame # Add a caption to the DataFrame qdf.attrs['caption'] = f"QDF data for {self._name} (INS code: {self._code})<br>\ <div style='font-size:8px;'>source : https://www.meteo.be/fr/climat/climat-de-la-belgique/climat-dans-votre-commune<br> \ Data extracted from IRM (Institut Royal Météorologique de Belgique) and processed by Wolf - ULiège" qdf.columns = pd.MultiIndex.from_tuples([(f"{_('Return period')}", str(col)) for col in qdf.columns]) # Style the DataFrame # One line per duration, one column per return period # We will use light colors for the background and borders # to highlight every other line and center the text styled_df = qdf.style.format(precision=1) \ .set_caption(qdf.attrs['caption']) \ .set_properties(**{ 'text-align': 'center', 'font-size': '12px', 'border': '1px solid black', # 'background-color': '#f0f0f0', }).set_table_styles([ { 'selector': 'thead th.row_heading.level0', # 'props': [('text-align', 'center'), ('background-color', '#d9edf7'), ('color', '#31708f'),], 'props': [('color', 'transparent')], }, { 'selector': 'thead th', 'props': [('text-align', 'center'), ('background-color', '#d9edf7'), ('color', '#31708f')], }, ]) # Define the path for the image self._qdf_image_table = self.store / f"{self.code_name}_qdf.png" # Save the styled DataFrame as an image dfimg.export(styled_df, self._qdf_image_table, dpi=300)
[docs] def make_images(self): """ Create all images for the QDF data. """ self.make_image_qdf_table() self.make_image_qdf_plot() return self._qdf_image_table, self._qdf_image_plot
@property
[docs] def path_image_plot(self): """ Get the path for the QDF plot image. """ if self._qdf_image_plot is None: self.make_image_qdf_plot() return self._qdf_image_plot
@property
[docs] def path_image_table(self): """ Get the path for the QDF table image. """ if self._qdf_image_table is None: self.make_image_qdf_table() return self._qdf_image_table
[docs] class QDF_Belgium(): """ Class to manage all QDF data for Belgium """ def __init__(self, store_path= 'irm', ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018, force_import: bool = False) -> None:
[docs] self.localities = Localities(ins)
[docs] self.store_path = Path(store_path)
if force_import or len(list(self.store_path.glob('*.xlsx'))) == 0: Qdf_IRM.importfromwebsite(store_path=str(self.store_path), verbose=True, ins=ins) if len(list(self.store_path.rglob('*.csv'))) == 0: Qdf_IRM.convert_xls2csv(store_path=str(self.store_path), ins=ins)
[docs] self.all:dict[int, Qdf_IRM] = {}
for loc_ins in tqdm(self.localities.get_allcodes()): loc = Qdf_IRM(store_path=str(self.store_path), code=loc_ins, localities=self.localities) if loc.qdf is not None: self.all[loc_ins] = loc
[docs] def make_images(self): """ Create all images for all QDF data. """ for loc in self.all.values(): loc.make_images()
def __getitem__(self, key) -> Qdf_IRM: if isinstance(key, int): if key in self.all: return self.all[key] else: logging.error(f"INS code {key} not found in the data") return None elif isinstance(key, str): key = self.localities.get_INSfromname(key) if key is not None: if key in self.all: return self.all[key] else: logging.error(f"INS code {key} not found in the data") return None else: logging.error(f"Name {key} not found in the data") return None def __iter__(self): """ Iterate over all localities """ for qdf_municip in self.all.values(): yield qdf_municip
[docs] TRANSLATION_HEADER = {'année': 'year', 'janv.': 'January', 'févr.': 'February', 'mars': 'March', 'avr.': 'April', 'mai': 'May', 'juin': 'June', 'juil.': 'July', 'août': 'August', 'sept.': 'September', 'oct.': 'October', 'nov.': 'November', 'déc.': 'December'}
[docs] RE_REFERENCE = re.compile(r"\([0-9]+\)")
[docs] class Climate_IRM(): def __init__(self, store_path= 'irm', ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018) -> None:
[docs] self.store_path = Path(store_path)
[docs] self.localities = Localities(ins)
[docs] self._climate_data = {}
def __getitem__(self, key): return self._climate_data[key] @classmethod
[docs] def importfromwebsite(cls, store_path= 'irm', verbose:bool= False, waitingtime:float= .01, ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018, ins_code: int = None, convert=False): """ Import Excel files for one or all municipalities from the IRM website :param store_path: Where to store the downloaded data. Directory will be created if it doesn't exists. :param verbose: If `True`, will print some progress information. If `False`, will do nothing. If a callable, then will call it with a float in [0, 1]. 0 means nothing downloaded, 1 means everything downloaded. :param waitingtime: How long to wait (in seconds) betwenn the download of each station (will make sure we don't overwhelm IRM's website). :param ins: The year of the INS codes to use. :param code: Restricts the data download to a specific NIS code. `None` means full download. :param convert: Converts the downloaded PDF to Excel files. """ import requests myloc = Localities(ins) if ins_code is not None: codes_to_load = [ins_code] else: if not path.exists(store_path): mkdir(store_path) codes_to_load = myloc.inscode2name for key,myins in enumerate(codes_to_load): #chaîne URL du fichier Excel url="https://www.meteo.be//resources//climatology//climateCity//pdf//climate_INS"+str(myins)+"_9120_fr.pdf" #Obtention du fichiers depuis le site web de l'IRM response=requests.get(url) if str(response.content).find("Page not found")==-1 : # Make sure we create the store path only if we have # something to put inside. if ins_code is not None and not path.exists(store_path): mkdir(store_path) pdf_file = path.join(store_path,str(myins)+".pdf") file=open(pdf_file, 'wb') file.write(response.content) file.close() if convert: cls._convert_irm_file(pdf_file) if verbose: if callable(verbose): verbose(key/len(codes_to_load)) else: print(myins) else: #logging.error(response.content) logging.error(f"Failed to load IRM data: {url} --> {response}") if len(codes_to_load) >= 2: sleep(waitingtime)
@classmethod
[docs] def _scrap_table(cls, t): """ Helper method to transform a table represented as a list of list to a pandas DataFrame. """ def fix_cid(strings: list[str]): # The CID thing is a known issue: # https://github.com/euske/pdfminer/issues/122 return [s.replace('(cid:176)C ', '°C').replace('¢', "'") for s in strings] nt = [] row_headers = [] for rndx in range(1, len(t)): # In the row header, we remove the "references" like "(1)". row_headers.append( RE_REFERENCE.sub("", t[rndx][0]) ) # The PDFs use different "minus" glyph instead of an ASCII one, # let's fix it. nt.append( list(map(lambda s:float(s.replace("−","-")), t[rndx][1:]))) columns_headers = map(TRANSLATION_HEADER.get, t[0][1:]) df = pd.DataFrame(nt, columns=fix_cid(columns_headers), index=fix_cid(row_headers)) return df
@classmethod
[docs] def _convert_irm_file(cls, pdf_file: Union[str, Path]): """ Scrap a PDF from IRM into two tables in a single Excel file with two sheets. """ pdf_file = Path(pdf_file) with pdfplumber.open(pdf_file) as pdf: # Rain data df = cls._scrap_table(pdf.pages[1].extract_table()) # Sun data df_sun = cls._scrap_table(pdf.pages[4].extract_table()) dest_file = pdf_file.with_suffix('.xlsx') with pd.ExcelWriter(dest_file) as writer: # doctest: +SKIP df.to_excel(writer, sheet_name='Rain') df_sun.to_excel(writer, sheet_name='Sun')
[docs] PLUVIO_INI = "pluvio.ini"
[docs] MATCH_NUM_ZONE_SHAPEFILE_INS_INDEX = "Match_num_zone_shapefile_INS_index.txt"
[docs] EXTREME_PRECIP_COMMUNES = "Extreme_rain_ins.txt"
[docs] GEOMETRY_MUNICIPALITIES = "PDS__COMMUNES.shp"
[docs] class QDF_Hydrology(): """ Prepare data from IRM website for WOLF hydrology calculations. We need : - pluvio.ini - Match_num_zone_shapefile_INS_index.txt "pluvio.ini" contains the path to the rainfall data files for each locality: - Extreme_precip_communes.txt """ def __init__(self, store_path= DATADIR / 'irm_qdf', ini_file:str = PLUVIO_INI, ins:Literal['2018', 2018] = 2018, geometry:str = GEOMETRY_MUNICIPALITIES) -> None:
[docs] self.store_path = Path(store_path)
[docs] self._data:dict[int, Qdf_IRM] = {}
[docs] self._ins = ins # INS version to use. IRM has updated the QDF data in 2016, so we force the use of the 2018 version.
[docs] self._extreme_file = EXTREME_PRECIP_COMMUNES
[docs] self._nb_lines_extreme_file = 0
[docs] self._nb_cols_extreme_file = 0
[docs] self.localities = Localities(ins)
[docs] self._ini_file = ini_file
[docs] self._geometry = geometry
[docs] self._geometry_df = None
if not self.store_path.exists(): self.store_path.mkdir(parents=True, exist_ok=True) if not self.store_path.joinpath(self._geometry).exists(): # get the municipalities shapefile from HECE Gitlab public repository self.download_municipalities_2018() try: if self.store_path.joinpath(self._geometry).exists(): self._geometry_df = gpd.read_file(self.store_path / self._geometry) else: logging.error(f"Geometry file {self._geometry} not found in {self.store_path}.") logging.error("Please download the 2018 version of municipalities shapefile from HECE or what you want.") return except Exception as e: logging.error(f"Error reading geometry file {self._geometry}: {e}") self._geometry_df = None if not self.store_path.joinpath(ini_file).exists(): self.create_default_data() self.read_data()
[docs] def read_data(self): """ Read the data from the ini file and the extreme precipitation file. """ try: with open(self.store_path / PLUVIO_INI, 'r') as f: lines = f.readlines() self._extreme_file = lines[0].strip() self._nb_lines_extreme_file = int(lines[1].strip()) self._nb_cols_extreme_file = int(lines[2].strip()) _nb_return_periods = int(lines[3].strip()) _nb_values_per_return_period = int(lines[4].strip()) rt =[] for i in range(_nb_return_periods): try: rt.append(int(lines[5 + i].strip())) except ValueError: logging.error(f"Invalid return period value in ini file: {lines[5 + i].strip()}") rt.append(0) if self.store_path.joinpath(self._extreme_file).exists(): df = pd.read_csv(self.store_path / self._extreme_file, sep='\t', header=None) # Set column names based on the number of return periods and values per return period columns = ['INS', 'Duration'] for rt_value in rt: for value_type in ['Q', 'Std', 'Low', 'Up']: columns.append(f"{rt_value}_{value_type}") df.columns = columns all_ins = df['INS'].astype(int).unique() self._data = {ins: Qdf_IRM(store_path=self.store_path, code=ins, localities=self.localities, dataframe=df[df['INS'] == ins]) for ins in all_ins} else: logging.error(f"Extreme precipitation file {self._extreme_file} not found in {self.store_path}.") self._data = {} except: logging.error(f"Error during reading {self._ini_file} in {self.store_path}.") logging.error("Check your data or delete files in the directory to force a new download.") self._data = {} self._extreme_file = None self._nb_lines_extreme_file = 0 self._nb_cols_extreme_file = 0
def __getitem__(self, key) -> Qdf_IRM: """ Get the QDF data for a given INS code. """ if isinstance(key, int): ins_code = key elif isinstance(key, str): ins_code = self.localities.get_INSfromname(key) if ins_code is None: try: # Try to convert the string to an integer (INS code) ins_code = int(key) except ValueError: # If it fails, raise an error raise KeyError(f"Locality {key} not found.") else: raise TypeError("Key must be an integer (INS code) or a string (locality name).") if ins_code in self._data: return self._data[ins_code] else: raise KeyError(f"Data for INS code {ins_code} not found.") def __iter__(self): """ Iterate over all QDF data. """ for ins_code in self._data: yield self._data[ins_code]
[docs] def download_municipalities_2018(self, force:bool = False): """ Download the municipalities shapefile from HECE. :param force: If `True`, will download the file even if it already exists. """ munic = Path(toys_dataset('Communes_Belgique', 'PDS__COMMUNES.zip')) # Unzip the file if it is not already unzipped. if munic.exists(): import zipfile with zipfile.ZipFile(munic, 'r') as zip_ref: zip_ref.extractall(self.store_path) self._geometry = 'PDS__COMMUNES.shp'
[docs] def create_match_num_zone_shapefile(self): """ Create the Match_num_zone_shapefile_INS_index.txt file. This file contains the mapping between the INS codes and the shapefile indices. """ match_file = self.store_path / MATCH_NUM_ZONE_SHAPEFILE_INS_INDEX colname = 'INS' if colname not in self._geometry_df.columns: colname = 'NSI' # Use NSI if INS is not available if colname not in self._geometry_df.columns: logging.error(f"Column {colname} not found in the geometry DataFrame.") return if not match_file.exists(): with open(match_file, 'w') as f: for idx, row in self._geometry_df.iterrows(): f.write(f"{row[colname]}\n")
[docs] def create_default_data(self): """ Create data from scratch for WOLF hydrology calculations. """ self.create_match_num_zone_shapefile() self.create_extreme_precipitation_file() self.create_ini_file()
[docs] def create_extreme_precipitation_file(self): """ Create the extreme precipitation file for all localities. Each line of the file contains the following data: - INS code - Duration in seconds - Quantity for each return period (RT2, RT5, RT10, RT20, RT50, RT100) """ self.extreme_file = self.store_path / EXTREME_PRECIP_COMMUNES if not self.extreme_file.exists(): all_qdf = QDF_Belgium(store_path=self.store_path, ins=self._ins, force_import=False) self._nb_lines_extreme_file = 0 with open(self.extreme_file, 'w') as f: for loc in all_qdf: loc:Qdf_IRM ins = loc.code qdf = loc.qdf low = loc.confintlow up = loc.confintup std = loc.standarddev for (dur_text, dur_s) in zip(durationstext, durations_seconds): data = [ins] data.append(int(dur_s)) for rt in RT: data.append(qdf.loc[dur_text, rt]) data.append(std.loc[dur_text, rt]) data.append(low.loc[dur_text, rt]) data.append(up.loc[dur_text, rt]) f.write("\t".join(map(str, data)) + "\n") self._nb_lines_extreme_file += 1 self._nb_cols_extreme_file = len(data)
[docs] def create_ini_file(self): """ Create a parameter file for the class """ with open(self.store_path / PLUVIO_INI, 'w') as f: f.write(f"{self._extreme_file}\n") # name of the file containing the extreme precipitation data if self._nb_lines_extreme_file == 0 or self._nb_cols_extreme_file == 0: with open(self.store_path / self._extreme_file, 'r') as ef: lines = ef.readlines() self._nb_lines_extreme_file = len(lines) if lines: self._nb_cols_extreme_file = len(lines[0].strip().split('\t')) else: self._nb_cols_extreme_file = 0 f.write(f"{self._nb_lines_extreme_file}\n") # number of lines in the extreme precipitation file f.write(f"{self._nb_cols_extreme_file}\n") # number of columns in the extreme precipitation file f.write(f"{len(RT)}\n") # Number of return periods f.write("4\n") # Number of values par return period (Q, std, low, up) for rt in RT: f.write(f"{rt}\n")
[docs] def get_all_ins(self) -> list[int]: """ Get a list of all INS codes. """ return list(self._data.keys())
[docs] class QDF_Hydrology_Draw(Element_To_Draw): """ Class to draw the QDF hydrology data on a map. This class is used to draw the QDF hydrology data on a map using the WOLF hydrology calculations. """ def __init__(self, store_path= DATADIR / 'irm_qdf', ins:Literal['2018', 2018] = 2018, idx:str = '', plotted:bool = True, mapviewer = None) -> None: super().__init__(idx=idx, plotted=plotted, mapviewer=mapviewer)
[docs] self._qdf_hydrology = QDF_Hydrology(store_path=store_path, ins=ins)
from .PyPictures import PictureCollection
[docs] self._scale_factor = 1.0 # Default scale factor for images
[docs] self._geometry_zones = Zones('', idx= idx+'_zones', plotted=plotted, mapviewer=mapviewer, parent = mapviewer)
[docs] self._geometry_tables = PictureCollection('', idx= idx, plotted=plotted, mapviewer=mapviewer, parent = mapviewer)
[docs] self._geometry_plots = PictureCollection('', idx= idx, plotted=plotted, mapviewer=mapviewer, parent = mapviewer)
self._geometry_zones.import_shapefile(self.store_path / self._qdf_hydrology._geometry, colname='NSI') self._geometry_zones.prep_listogl() self._geometry_tables.import_shapefile(self.store_path / self._qdf_hydrology._geometry, colname='NSI') self._geometry_plots.import_shapefile(self.store_path / self._qdf_hydrology._geometry, colname='NSI') self._prepare_image_location()
[docs] self._centroids = {curzone[0].centroid: curzone.myname for curzone in self._geometry_tables.myzones}
[docs] self._show_table = False
[docs] self._show_plot = False
[docs] self._reload_images = True
[docs] self._current_images = None
def __getitem__(self, key) -> Qdf_IRM: """ Get the QDF data for a given INS code. """ if isinstance(key, int): ins_code = key elif isinstance(key, str): ins_code = self.localities.get_INSfromname(key) if ins_code is None: try: # Try to convert the string to an integer (INS code) ins_code = int(key) except ValueError: # If it fails, raise an error raise KeyError(f"Locality {key} not found.") else: raise TypeError("Key must be an integer (INS code) or a string (locality name).") if ins_code in self._qdf_hydrology._data: return self._qdf_hydrology._data[ins_code] else: raise KeyError(f"Data for INS code {ins_code} not found.")
[docs] def _get_vector_tables(self, ins:str | int) -> vector: """ Get the vector for a given INS code. """ return self._geometry_tables[(str(ins), str(ins))]
[docs] def _get_vector_plots(self, ins:str | int) -> vector: """ Get the vector for a given INS code. """ return self._geometry_plots[(str(ins), str(ins))]
@property
[docs] def store_path(self): """ Get the store path for the QDF hydrology data. """ return self._qdf_hydrology.store_path
[docs] def _prepare_image_location(self): """ Set the default size for the images. """ # plots DEFAULT_SIZE = 2000. * self._scale_factor # Default size for the images RAP_W_H = 3600. / 2400. WIDTH_PLOTS = DEFAULT_SIZE * RAP_W_H HEIGHT_PLOTS = DEFAULT_SIZE for curzone in self._geometry_plots.myzones: vec = curzone[0] vec.myprop.image_attached_pointx = vec.centroid.x vec.myprop.image_attached_pointy = vec.centroid.y vec.myprop.imagevisible = False x, y = vec.centroid.x, vec.centroid.y y -= HEIGHT_PLOTS / 2. vec.myvertices = [wv(x - WIDTH_PLOTS, y - HEIGHT_PLOTS), wv(x + WIDTH_PLOTS, y - HEIGHT_PLOTS), wv(x + WIDTH_PLOTS, y + HEIGHT_PLOTS), wv(x - WIDTH_PLOTS, y + HEIGHT_PLOTS), wv(x - WIDTH_PLOTS, y - HEIGHT_PLOTS)] vec.myprop.color = getIfromRGB([255, 255, 255, 0]) # Transparent color vec.find_minmax() self._geometry_plots.prep_listogl() # tables RAP_W_H = 1730. / 2000. WIDTH_TABLES = DEFAULT_SIZE * RAP_W_H HEIGHT_TABLES = DEFAULT_SIZE for curzone in self._geometry_tables.myzones: vec = curzone[0] vec.myprop.image_attached_pointx = vec.centroid.x vec.myprop.image_attached_pointy = vec.centroid.y vec.myprop.imagevisible = False x, y = vec.centroid.x, vec.centroid.y y += 2. * HEIGHT_TABLES - (HEIGHT_PLOTS / 2.) * 3./4. vec.myvertices = [wv(x - WIDTH_TABLES, y - HEIGHT_TABLES), wv(x + WIDTH_TABLES, y - HEIGHT_TABLES), wv(x + WIDTH_TABLES, y + HEIGHT_TABLES), wv(x - WIDTH_TABLES, y + HEIGHT_TABLES), wv(x - WIDTH_TABLES, y - HEIGHT_TABLES)] vec.myprop.color = getIfromRGB([255, 255, 255, 0]) vec.find_minmax() self._geometry_tables.prep_listogl()
[docs] def set_images_as_legend(self, plot_or_table:Literal['plot', 'table'] = 'plot', which:list = None): """ Set all images in the collection as legend images. """ DEFAULT_SIZE = 2000. * self._scale_factor # Default size for the images if which is None: which = self._qdf_hydrology.get_all_ins() if plot_or_table == 'plot': RAP_W_H = 3600. / 2400. WIDTH = DEFAULT_SIZE * RAP_W_H HEIGHT = DEFAULT_SIZE for loc_ins in which: loc_qdf = self._qdf_hydrology[loc_ins] if loc_qdf.path_image_plot is not None: vec = self._get_vector_plots(loc_qdf.code) vec.myprop.image_path = loc_qdf.path_image_plot centroid = vec.centroid vec.myprop.image_attached_pointx, vec.myprop.image_attached_pointy = centroid.x, centroid.y vec.myprop.imagevisible = True vec.myvertices = [wv(centroid.x - WIDTH, centroid.y - HEIGHT), wv(centroid.x + WIDTH, centroid.y - HEIGHT), wv(centroid.x + WIDTH, centroid.y + HEIGHT), wv(centroid.x - WIDTH, centroid.y + HEIGHT), wv(centroid.x - WIDTH, centroid.y - HEIGHT)] vec.myprop.color = getIfromRGB([255, 255, 255, 0]) vec.find_minmax() self._geometry_plots.reset_listogl() self._geometry_plots.prep_listogl() elif plot_or_table == 'table': RAP_W_H = 1730. / 2000. WIDTH = DEFAULT_SIZE * RAP_W_H HEIGHT = DEFAULT_SIZE for loc_ins in which: loc_qdf = self._qdf_hydrology[loc_ins] if loc_qdf.path_image_table is not None: vec = self._get_vector_tables(loc_qdf.code) vec.myprop.image_path = loc_qdf.path_image_table centroid = vec.centroid vec.myprop.image_attached_pointx, vec.myprop.image_attached_pointy = centroid.x, centroid.y vec.myprop.imagevisible = True vec.myvertices = [wv(centroid.x - WIDTH, centroid.y - HEIGHT), wv(centroid.x + WIDTH, centroid.y - HEIGHT), wv(centroid.x + WIDTH, centroid.y + HEIGHT), wv(centroid.x - WIDTH, centroid.y + HEIGHT), wv(centroid.x - WIDTH, centroid.y - HEIGHT)] vec.myprop.color = getIfromRGB([255, 255, 255, 0]) vec.find_minmax() self._geometry_tables.reset_listogl() self._geometry_tables.prep_listogl()
[docs] def hide_all_images(self): """ Hide all images in the collection. """ for curzone in self._geometry_tables.myzones: curzone[0].myprop.imagevisible = False self._geometry_tables.reset_listogl() for curzone in self._geometry_plots.myzones: curzone[0].myprop.imagevisible = False self._geometry_plots.reset_listogl()
[docs] def check_plot(self): return super().check_plot()
[docs] def find_nearest_centroid(self, x: float, y: float, bounds: tuple[float, float, float, float]): """ Pick the municipality at the given coordinates. :param x: The x coordinate. :param y: The y coordinate. :return: The name of the municipality or an empty string if not found. """ centroids = self.find_centroid_in_bounds(bounds) if not centroids: return '' # Find the centroid closest to the given point closest_centroid = min(centroids, key=lambda c: c[0].distance(Point(x, y))) return closest_centroid[1]
[docs] def pick_municipality(self, x: float, y: float, bounds: tuple[float, float, float, float]): """ Activate plot for the nearest municipality to the given coordinates. """ which = [self.find_nearest_centroid(x, y, bounds)] for loc_ins in which: loc_qdf = self._qdf_hydrology[loc_ins] if loc_qdf.path_image_plot is not None and loc_qdf.path_image_table is not None: vec_plt = self._get_vector_plots(loc_qdf.code) vec_tables = self._get_vector_tables(loc_qdf.code) if vec_plt.myprop.imagevisible or vec_tables.myprop.imagevisible: vec_plt.myprop.imagevisible = False vec_tables.myprop.imagevisible = False else: vec_plt.myprop.image_path = loc_qdf.path_image_plot vec_plt.myprop.imagevisible = True vec_tables.myprop.image_path = loc_qdf.path_image_table vec_tables.myprop.imagevisible = True # Reset the OpenGL lists to reflect the changes self._geometry_plots.reset_listogl() self._geometry_plots.prep_listogl() self._geometry_tables.reset_listogl() self._geometry_tables.prep_listogl()
[docs] def find_centroids_in_polygon(self, polygon: Polygon) -> list[tuple[vector, str]]: """ Find all centroids in a given polygon. :param polygon: A shapely Polygon object defining the area to search. """ centroids = [] for centroid, name in self._centroids.items(): if centroid.within(polygon): centroids.append((centroid, name)) # Sort centroids by distance to the center of the polygon center_x, center_y = polygon.centroid.x, polygon.centroid.y dist_to_center = lambda c: ((c.x - center_x) ** 2 + (c.y - center_y) ** 2) ** 0.5 centroids.sort(key=lambda c: dist_to_center(c[0])) return centroids
[docs] def find_centroid_in_bounds(self, bounds: tuple[float, float, float, float]) -> list[tuple[vector, str]]: """ Find all centroids within the given bounds. :param bounds: A tuple of (minx, miny, maxx, maxy) defining the bounding box. """ minx, miny, maxx, maxy = bounds centroids = [] for centroid, name in self._centroids.items(): if minx <= centroid.x <= maxx and miny <= centroid.y <= maxy: centroids.append((centroid, name)) dist_to_center = lambda c: ((c.x - (minx + maxx) / 2) ** 2 + (c.y - (miny + maxy) / 2) ** 2) ** 0.5 centroids.sort(key=lambda c: dist_to_center(c[0])) return centroids
@property
[docs] def show_plot(self) -> bool: """ Check if the plot is shown. """ return self._show_plot
@show_plot.setter def show_plot(self, value: bool): """ Set whether to show the plot or not. """ self._show_plot = value if not value: self.hide_all_images() self._reload_images = value @property
[docs] def show_table(self) -> bool: """ Check if the table is shown. """ return self._show_table
@show_table.setter def show_table(self, value: bool): """ Set whether to show the table or not. """ self._show_table = value if not value: self.hide_all_images() self._reload_images = value
[docs] def scale_images(self, factor:float = 1.0): """ Scale the images in the collection by a given factor. :param factor: The scaling factor to apply to the images. """ assert isinstance(factor, (int, float)), "Scaling factor must be a number." self._geometry_tables.scale_all_pictures(factor) self._geometry_plots.scale_all_pictures(factor)
[docs] def plot(self, sx=None, sy=None, xmin=None, ymin=None, xmax=None, ymax=None, size=None): """ Plot the QDF hydrology data on the map. """ NB_MAX = 10 # Maximum number of images to display if self.show_plot and self._reload_images: _new_images = self.find_centroid_in_bounds((xmin, ymin, xmax, ymax)) self._reload_images = False if len(_new_images) > NB_MAX: logging.warning(_(f"Too many images to display. Showing only the first {NB_MAX}.")) _new_images = _new_images[:NB_MAX] if self._current_images is None or len(_new_images) != len(self._current_images): self.set_images_as_legend(plot_or_table='plot', which=[img[1] for img in _new_images]) elif self.show_table and self._reload_images: _new_images = self.find_centroid_in_bounds((xmin, ymin, xmax, ymax)) self._reload_images = False if len(_new_images) > NB_MAX: logging.warning(_(f"Too many images to display. Showing only the first {NB_MAX}.")) _new_images = _new_images[:NB_MAX] if self._current_images is None or len(_new_images) != len(self._current_images): self.set_images_as_legend(plot_or_table='table', which=[img[1] for img in _new_images]) self._geometry_tables.plot(sx=sx, sy=sy, xmin=xmin, ymin=ymin, xmax=xmax, ymax=ymax, size=size) self._geometry_plots.plot(sx=sx, sy=sy, xmin=xmin, ymin=ymin, xmax=xmax, ymax=ymax, size=size) self._geometry_zones.plot(sx=sx, sy=sy, xmin=xmin, ymin=ymin, xmax=xmax, ymax=ymax, size=size)