"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
#Code INS des communes belges
import re
from os import path, mkdir
from pathlib import Path
from time import sleep
from typing import Literal, Union
import logging
import warnings
import matplotlib as mpl
from tqdm import tqdm
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
from scipy.optimize import minimize,curve_fit
from scipy.stats import gumbel_r,genextreme
import numpy as np
# We have tried pymupdf but its license is AGPL so it's more or less a no-go.
import pdfplumber
from .ins import Localities
from .PyTranslate import _
from .pydownloader import toys_dataset, DATADIR
from .PyVertexvectors import Zones, vector, Point, Polygon, wolfvertex as wv, getIfromRGB
from .drawing_obj import Element_To_Draw
[docs]
RT = [RT2,RT5,RT10,RT15,RT20,RT25,RT30,RT40,RT50,RT75,RT100,RT200]
[docs]
freqdep=np.array([1./float(x) for x in RT])
[docs]
durationstext=[dur10min,dur20min,dur30min,dur1h,dur2h,dur3h,dur6h,dur12h,dur1d,
dur2d,dur3d,dur4d,dur5d,dur7d,dur10d,dur15d,dur20d,dur25d,dur30d]
durations = np.array([10,20,30,60,120,180,360,720],np.float64)
[docs]
durationsd = np.array([1,2,3,4,5,7,10,15,20,25,30],np.float64)*24.*60.
[docs]
durations = np.concatenate([durations,durationsd])
[docs]
durations_seconds = durations * 60. # Convert durations to seconds
[docs]
class MontanaIRM():
""" Classe pour la gestion des relations de Montana pour les précipitations """
def __init__(self, coeff:pd.DataFrame, time_bounds=None) -> None:
if time_bounds is None:
self.time_bounds = [25,6000]
else:
self.time_bounds = time_bounds
[docs]
def get_ab(self, dur, T):
""" Get the Montana coefficients for a given duration and return period
:param dur: the duration, in minutes
:param T: the return period, in years
"""
curcoeff = self.coeff.loc[float(T)]
if dur<self.time_bounds[0]:
a=curcoeff[Montana_a1]
b=curcoeff[Montana_b1]
elif dur<=self.time_bounds[1]:
a=curcoeff[Montana_a2]
b=curcoeff[Montana_b2]
else:
a=curcoeff[Montana_a3]
b=curcoeff[Montana_b3]
return a,b
[docs]
def get_meanrain(self, dur, T, ab= None):
""" Get the mean rain for a given duration and return period
Montana formula is i = a * dur^(-b) where a and b are the Montana coefficients for the given duration and return period
Unity of i is [mm/h] and dur is in [minutes]
:param dur: the duration, in minutes
:param T: the return period, in years
:param ab: the Montana coefficients
:return: the mean rain in [mm/h]
"""
if ab is None:
ab = self.get_ab(dur,T)
return ab[0]*dur**(-ab[1])
[docs]
def get_instantrain(self, dur, T, ab= None):
""" Get the instantaneous rain for a given duration and return period
:param dur: the duration, in minutes
:param T: the return period, in years
:param ab: the Montana coefficients
:return: the instantaneous rain in [mm/h]
"""
if ab is None:
ab = self.get_ab(dur,T)
meani=self.get_meanrain(dur,T,ab)
return (1.-ab[1])*meani
[docs]
def get_quantityrain(self, dur, T):
""" Get the quantity of rain for a given duration and return period.
The quantity of rain is the mean rain multiplied by the duration expressed in hours (dur/60) to convert it from [mm/h] to [mm]
:param dur: the duration, in minutes
:param T: the return period, in years
:return: the quantity of rain in [mm]
"""
rain = self.get_meanrain(dur,T)
return rain * (dur/60)
[docs]
def get_delta_quantityrain(self, dur_start, dur_end, T):
""" Get the difference in quantity of rain between two durations for a given return period.
:param dur_start: the starting duration, in minutes
:param dur_end: the ending duration, in minutes
:param T: the return period, in years
:return: the difference in quantity of rain in [mm]
"""
return self.get_quantityrain(dur_end, T) - self.get_quantityrain(dur_start, T)
[docs]
def get_delta_meanrain(self, dur_start, dur_end, T):
""" Get the difference in mean rain between two durations for a given return period.
:param dur_start: the starting duration, in minutes
:param dur_end: the ending duration, in minutes
:param T: the return period, in years
:return: the difference in mean rain in [mm/h]
"""
return self.get_delta_quantityrain(dur_start, dur_end, T) / ((dur_end - dur_start)/60)
[docs]
def get_hyetogram_Chicago(self, total_duration, T,
r:Literal['centered', 'left', 'right'] = 'centered'):
""" Get the Chicoago hyetogram for a given return period, a total duration and a decentration coefficient.
The return values are x and y, where x is the starting time of each step of the hyetogram in minutes and
y is the intensity of rain for each step of the hyetogram in [mm/h].
x must be interpreted as [x_i, x_{i+1}[ for each step i of the hyetogram, and the intensity y_i is the intensity of rain during this step.
The hyetogram is built as follows:
- In centered mode, the peak is located at the middle of the hyetogram and has a duration of 10 minutes
- In left mode, the peak is located at the beginning of the hyetogram and has a duration of 10 minutes
- In right mode, the peak is located at the end of the hyetogram and has a duration of 10 minutes
- The intensity is calculated using the Montana coefficients and the mean rain, to ensure the total quantity of rain is correct
- The time step is 5 minutes
:param total_duration: the total duration of the hyetogram, in minutes (must be greater than 10 minutes and a multiple of 5 minutes)
:param T: the return period in years
:param r: Decentration mode, either 'centered', 'left' or 'right'
:return: x: the starting time of each step of the hyetogram in minutes, y: the intensity of rain for each step of the hyetogram in [mm/h]
"""
assert r in ['centered', 'left', 'right'], "Decentration mode r must be either 'centered', 'left' or 'right'"
assert total_duration > 10, "Total duration must be greater than 10 minutes"
assert total_duration % 5 == 0, "Total duration must be a multiple of 5 minutes"
assert str(T) in RT, "Return period T must be one of the following: " + ", ".join(RT)
x = np.arange(0, total_duration, 5, dtype=np.float64)
y = np.zeros(len(x))
# Constructing the rainfall left, it is more easy
# We will then distribute the rain to the right of the peak in order to have
# the correct total quantity of rain, while respecting the shape of
# the hyetogram given by the Montana coefficients
if r in ['left', 'right']:
y[:2] = self.get_meanrain(10, T) # Peak intensity for the first 10 minutes
for k in range(2,len(x)):
y[k] = self.get_delta_meanrain(k*5, (k+1)*5, T)
if r == 'left':
return x, y
else: # r == 'right'
return x, y[::-1] # Reverse the order of the intensities for the right side
elif r == 'centered':
start_peak_idx = len(x) // 2 - 1 # Start of the peak (10 minutes)
end_peak_idx = len(x) // 2 +1 # End of the peak (10 minutes)
y[start_peak_idx:end_peak_idx] = self.get_meanrain(10, T) # Peak intensity for the 10 minutes around the center
for k in range(end_peak_idx, len(x)):
d_loc = (k - end_peak_idx + 1) * 10 # Duration from the end of the peak
# As we are in centered mode, the distribution of rain is symmetric around the center,
# so we can calculate the intensity on the right side and then apply it to the left side.
# As the timestep is fixed to 5 minutes (for each side), we can calculate the intensity at the center
# of each step of the hyetogram every 10 minutes (5 min for the left-part, 5 min for the right-part) .
y[k] = self.get_delta_meanrain(d_loc, d_loc + 10, T)
y[start_peak_idx - (k - end_peak_idx + 1)] = y[k] # Symmetric distribution on the left side
return x, y
[docs]
def get_hyetogram_Chicago_rvar(self, total_duration, T,
r:float = 0.5, timestep:float = 5.0,
peak_duration:float = 10.0):
""" Get the Chicoago hyetogram for a given return period, a total duration and a decentration coefficient.
The return values are x and y, where x is the starting time of each step of the hyetogram in minutes and
y is the intensity of rain for each step of the hyetogram in [mm/h].
x must be interpreted as [x_i, x_{i+1}[ for each step i of the hyetogram, and the intensity y_i is the intensity of rain during this step.
The hyetogram is built as follows:
- The peak is located at (total_duration * r) and has a duration of peak_duration minutes
- The intensity is calculated using the Montana coefficients and the mean rain, to ensure the total quantity of rain is correct
- The time step is parameterizable but cosntant for the whole hyetogram, so for example if the time step is 5 minutes, the intensity is calculated every 5 minutes and is constant during these 5 minutes.
- if r is not in [0,1,0.5], the hyetogram is computed step-by-step, by alternating steps on the right and left of the peak, to ensure a good resolution around the peak, as the shape of the hyetogram is more important around the peak.
:param total_duration: the total duration of the hyetogram, in minutes (must be greater than 10 minutes and a multiple of 5 minutes)
:param T: the return period in years
:param r: Decentration coefficient, a value in [0, 1] indicating the position of the peak (0 means the peak is at the end of the hyetogram, 1 means the peak is at the beginning of the hyetogram, 0.5 means the peak is in the middle of the hyetogram)
:param timestep: the time step for the hyetogram in minutes
:param peak_duration: the duration of the peak in minutes
:return: x: the starting time of each step of the hyetogram in minutes, y: the intensity of rain for each step of the hyetogram in [mm/h]
"""
assert 0 <= r <= 1, "Decentration coefficient r must be a value in [0, 1]"
assert total_duration > peak_duration, "Total duration must be greater than the peak duration"
assert total_duration % timestep == 0, "Total duration must be a multiple of the timestep"
assert peak_duration % timestep == 0, "Peak duration must be a multiple of the timestep"
assert str(T) in RT, "Return period T must be one of the following: " + ", ".join(RT)
x = np.arange(0, total_duration, timestep, dtype=np.float64)
y = np.zeros(len(x))
size_peak = int(peak_duration / timestep)
if r == 0.5:
assert size_peak % 2 == 0, "For a centered peak, the peak duration must be an even multiple of the timestep to ensure symmetry"
# Constructing the rainfall left, it is more easy
# We will then distribute the rain to the right of the peak in order to have
# the correct total quantity of rain, while respecting the shape of
# the hyetogram given by the Montana coefficients
if r in [0, 1]:
y[:size_peak] = self.get_meanrain(peak_duration, T) # Peak intensity for the first peak_duration minutes
for k in range(size_peak,len(x)):
y[k] = self.get_delta_meanrain(k*timestep, (k+1)*timestep, T)
if r == 0:
return x, y
else: # r == 1
return x, y[::-1] # Reverse the order of the intensities for the right side
elif r == 0.5:
start_peak_idx = int((len(x) - size_peak) // 2) # Start of the peak
end_peak_idx = start_peak_idx + size_peak # End of the peak
y[start_peak_idx:end_peak_idx] = self.get_meanrain(peak_duration, T) # Peak intensity for the peak_duration minutes around the center
for k in range(end_peak_idx, len(x)):
d_loc = (k - end_peak_idx + 1) * timestep * 2 # Duration from the end of the peak
y[k] = self.get_delta_meanrain(d_loc, d_loc + timestep * 2, T)
y[start_peak_idx - (k - end_peak_idx + 1)] = y[k] # Symmetric distribution on the left side
else:
# other ponderation coefficients
start_peak_idx = int(len(x) * (1-r) - size_peak // 2) # Start of the peak
end_peak_idx = start_peak_idx + size_peak # End of the peak
y[start_peak_idx:end_peak_idx] = self.get_meanrain(peak_duration, T) # Peak intensity for the peak_duration minutes around the center
# We will progress alternaltively on the right and left side of the peak, to ensure a good resolution around the peak,
# as the shape of the hyetogram is more important around the peak.
if r>0.5:
nb_right = int(1/(1-r)) - 1 # Number of steps to do on the right side before doing a step on the left side
d_loc = peak_duration
i_right = end_peak_idx
i_left = start_peak_idx - 1
nb = 0
while nb < len(x) - size_peak:
# right-part
for __ in range(nb_right):
if i_right >= len(x):
break
y[i_right] = self.get_delta_meanrain(d_loc, d_loc + timestep, T)
d_loc += timestep # Duration for the right part, starting from the end of the peak
i_right += 1
nb += 1
# left-part
if i_left >= 0:
y[i_left] = self.get_delta_meanrain(d_loc, d_loc + timestep, T)
d_loc += timestep # Duration for the left part, starting from the start of the peak
i_left -= 1
nb += 1
else: # r<0.5
nb_left = int(1/r) - 1 # Number of steps to do on the left side before doing a step on the right side
d_loc = peak_duration
i_right = end_peak_idx
i_left = start_peak_idx - 1
nb = 0
while nb < len(x) - size_peak:
# left-part
for __ in range(nb_left):
if i_left < 0:
break
y[i_left] = self.get_delta_meanrain(d_loc, d_loc + timestep, T)
d_loc += timestep # Duration for the left part, starting from the start of the peak
i_left -= 1
nb += 1
# right-part
if i_right < len(x):
y[i_right] = self.get_delta_meanrain(d_loc, d_loc + timestep, T)
d_loc += timestep # Duration for the right part, starting from the end of the peak
i_right += 1
nb += 1
return x, y
[docs]
def get_hyetogram_instant(self, durmax, T, r = 0.5, peak_duration = 10):
""" Get the hyetogram for a given return period, a total duration and a decentration coefficient.
The decentration coefficient r is a value in [0, 1] that indicates the position of the peak of the hyetogram.
If r is 0, the peak is at the end of the hyetogram.
If r is 1, the peak is at the beginning of the hyetogram.
If r is 0.5, the peak is in the middle of the hyetogram.
The time step of the hyetogram is variable and is equal to (1-r) minute before the peak
and r minute after the peak, to ensure a good resolution around the peak.
The intensity of rain is calculated as the instantaneous rain given by the Montana coefficients,
to ensure the correct shape of the hyetogram. The value is picked at the center of each step of
the hyetogram, so for example for the step [x_i, x_{i+1}[, the intensity is calculated at (x_i + x_{i+1})/2.
The return values are x and y, where x is the starting time of each step of the hyetogram in minutes and
y is the intensity of rain for each step of the hyetogram in [mm/h].
x must be interpreted as [x_i, x_{i+1}[ for each step i of the hyetogram, and the intensity y_i is the intensity of rain during this step.
The hyetogram is built as follows:
- The peak is located at (durmax * r) and has a duration of peak_duration minutes
- The peak intensity is the mean rain for a duration of peak_duration minutes and the given return period
- The shape is discretized with a step of (1-r) minute before the peak and r minute after the peak
- The intensity is calculated using the Montana coefficients and the instantaneous rain for each point of the hyetogram
Peak_duration and durmax will be rounded, so the actual peak duration and durmax
may be slightly different from the input values.
:param durmax: the maximum duration of the hyetogram, in minutes
:param T: the return period
:param r: Decentration coefficient
:param peak_duration: the duration of the peak, in minutes
"""
assert 0 <= r <= 1, "Decentration coefficient r must be in [0, 1]"
assert peak_duration > 0, "Peak duration must be positive"
assert durmax > 0, "Maximum duration must be positive"
assert durmax > peak_duration, "Maximum duration must be greater than peak duration"
# assert durmax * r > peak_duration / 2, "Peak must be fully contained in the hyetogram (check durmax, r and peak_duration)"
peak_duration = float(int(peak_duration))
durmax = float(int(durmax))
x = np.arange(peak_duration, durmax, 1, dtype=np.float64)
if r==1.:
# If r is 1, it means the peak is at the beginning of the hyetogram, so there is no point before the peak
startpeak = 0.
endpeak = peak_duration
xafterpeak = np.arange(endpeak, durmax, 1.)
x_hyeto = np.concatenate([[startpeak], xafterpeak])
elif r==0.:
# If r is 0, it means the peak is at the end of the hyetogram, so there is no point after the peak
startpeak = durmax - peak_duration
endpeak = durmax
xbeforepeak = np.arange(0., startpeak, 1.)
x_hyeto = np.concatenate([xbeforepeak, [startpeak]])
else:
startpeak = durmax * r - peak_duration / 2
endpeak = startpeak + peak_duration
xbeforepeak = np.arange(0., startpeak, 1-r)
xafterpeak = np.arange(endpeak, durmax, r)
x_hyeto = np.concatenate([xbeforepeak, [startpeak], xafterpeak])
y_hyeto = np.zeros(len(x_hyeto))
for k in range(len(x_hyeto)):
if x_hyeto[k] < startpeak:
delta_peak = (startpeak - x_hyeto[k]) / (1-r) + peak_duration - (1-r)
y_hyeto[k] = self.get_instantrain(delta_peak, T)
elif x_hyeto[k] == startpeak:
y_hyeto[k] = self.get_meanrain(dur = peak_duration, T = T)
else:
delta_peak = (x_hyeto[k] - endpeak) / r + peak_duration + r
y_hyeto[k] = self.get_instantrain(delta_peak, T)
return x_hyeto, y_hyeto
[docs]
def plot_hyetogram_instant(self, durmax, T, r= 0.5, figax = None):
""" Plot the hyetogram for a given return period
:param durmax: the maximum duration of the hyetogram in minutes
:param T: the return period
:param r: Decentration coefficient
"""
x,y = self.get_hyetogram_instant(durmax,T,r)
if figax is None:
fig,ax = plt.subplots(1,1,figsize=[15,10], tight_layout=True)
else:
fig,ax = figax
edges = np.append(x, durmax)
ax.stairs(y, edges, fill=True, alpha=0.7,
label=_("Hyetogram (instantaneous) for return period T=")+str(T)+" "+_("years"))
ax.set_xlabel(_('Time [min]'))
ax.set_ylabel(_('Intensity [mm/h]'))
ax.legend().set_draggable(True)
return fig,ax
[docs]
def plot_hyetogram_Chicago(self, total_duration, T, r = 'centered', figax = None):
""" Plot the Chicago hyetogram for a given return period, a total duration and a decentration coefficient.
:param total_duration: the total duration of the hyetogram in minutes
:param T: the return period
:param r: Decentration mode, either 'centered', 'left' or 'right'
"""
x,y = self.get_hyetogram_Chicago(total_duration,T,r)
if figax is None:
fig,ax = plt.subplots(1,1,figsize=[15,10], tight_layout=True)
else:
fig,ax = figax
edges = np.append(x, total_duration)
ax.stairs(y, edges, fill=True, alpha=0.7,
label=_("Chicago hyetogram for return period T=")+str(T)+" "+_("years"))
ax.set_xlabel(_('Time [min]'))
ax.set_ylabel(_('Intensity [mm/h]'))
ax.legend().set_draggable(True)
return fig,ax
[docs]
def plot_hyetogram_Chicago_rvar(self, total_duration, T, r = 0.5, peak_duration = 10, timestep = 5, figax = None):
""" Plot the Chicago hyetogram for a given return period, a total duration and a decentration coefficient.
:param total_duration: the total duration of the hyetogram in minutes
:param T: the return period
:param r: Decentration mode, either 'centered', 'left' or 'right'
:param peak_duration: the duration of the peak rainfall
:param timestep: the time step for the hyetogram
"""
x,y = self.get_hyetogram_Chicago_rvar(total_duration,T,r,timestep, peak_duration)
if figax is None:
fig,ax = plt.subplots(1,1,figsize=[15,10], tight_layout=True)
else:
fig,ax = figax
edges = np.append(x, total_duration)
ax.stairs(y, edges, fill=True, alpha=0.7,
label=_("Chicago hyetogram for return period T=")+str(T)+" "+_("years"))
ax.set_xlabel(_('Time [min]'))
ax.set_ylabel(_('Intensity [mm/h]'))
ax.legend().set_draggable(True)
return fig,ax
[docs]
def plot_hyetograms_instant(self, durmax, r= 0.5, figax = None):
""" Plot the hyetograms for all return periods
:param durmax: the maximum duration of the hyetograms in minutes
:param r: Decentration coefficient
"""
if figax is None:
fig,ax = plt.subplots(1,1,figsize=[15,10], tight_layout=True)
else:
fig,ax = figax
inverse_rt = RT.copy()
inverse_rt.reverse()
for curT in inverse_rt:
x,y = self.get_hyetogram_instant(durmax,curT,r)
edges = np.append(x, durmax)
ax.stairs(y, edges, fill=True, alpha=0.7,
label=_("Hyetogram (instantaneous) for return period T=")+str(curT)+" "+_("years"))
ax.set_xlabel(_('Time [min]'))
ax.set_ylabel(_('Intensity [mm/h]'))
ax.legend().set_draggable(True)
return fig,ax
[docs]
def animate_hyetogram_Chicago(self, total_duration, T,
r:float = 0.5, timestep:float = 5.0,
peak_duration:float = 10.0,
interval:int = 400,
figsize:tuple = (18, 5)):
""" Create a matplotlib animation showing the progressive construction
of a Chicago hyetogram.
The animation proceeds as follows:
1. Frame 0: empty axes with labels and IDF envelope
2. Frame 1: the peak bar(s) appear
3. Subsequent frames: bars are added alternating right/left of the peak,
exactly mirroring the construction logic of ``get_hyetogram_Chicago_rvar``.
4. Final frame: the complete hyetogram is shown.
Three panels are displayed:
- Left: IDF curve (Montana) with current duration highlighted
- Center: hyetogram under construction
- Right: QDF curve (Montana) with cumulative volume of placed bars,
showing that the construction respects the total rainfall volume.
:param total_duration: the total duration of the hyetogram in minutes
:param T: the return period in years
:param r: Decentration coefficient in [0, 1]
:param timestep: the time step in minutes
:param peak_duration: the duration of the peak in minutes
:param interval: delay between frames in ms
:param figsize: figure size
:return: matplotlib.animation.FuncAnimation
"""
from matplotlib.animation import FuncAnimation
# Pre-compute the full hyetogram
x, y_full = self.get_hyetogram_Chicago_rvar(total_duration, T, r, timestep, peak_duration)
n = len(x)
size_peak = int(peak_duration / timestep)
edges = np.append(x, total_duration)
# Build the ordered list of index-groups that mirrors the construction order
# Each element is a list of indices to reveal in that frame.
reveal_order: list[list[int]] = []
if r in (0, 1):
peak_indices = list(range(size_peak))
rest = list(range(size_peak, n))
if r == 1:
peak_indices = list(range(n - size_peak, n))
rest = list(range(n - size_peak - 1, -1, -1))
reveal_order.append(peak_indices)
for idx in rest:
reveal_order.append([idx])
elif r == 0.5:
start_peak_idx = int((n - size_peak) // 2)
end_peak_idx = start_peak_idx + size_peak
reveal_order.append(list(range(start_peak_idx, end_peak_idx)))
for k in range(end_peak_idx, n):
mirror = start_peak_idx - (k - end_peak_idx + 1)
group = [k]
if 0 <= mirror < n:
group.append(mirror)
reveal_order.append(group)
else:
start_peak_idx = int(n * (1 - r) - size_peak // 2)
end_peak_idx = start_peak_idx + size_peak
reveal_order.append(list(range(start_peak_idx, end_peak_idx)))
if r > 0.5:
nb_right = int(1 / (1 - r)) - 1
i_right = end_peak_idx
i_left = start_peak_idx - 1
remaining = n - size_peak
while remaining > 0:
for __ in range(nb_right):
if i_right >= n:
break
reveal_order.append([i_right])
i_right += 1
remaining -= 1
if i_left >= 0 and remaining > 0:
reveal_order.append([i_left])
i_left -= 1
remaining -= 1
else:
nb_left = int(1 / r) - 1
i_right = end_peak_idx
i_left = start_peak_idx - 1
remaining = n - size_peak
while remaining > 0:
for __ in range(nb_left):
if i_left < 0:
break
reveal_order.append([i_left])
i_left -= 1
remaining -= 1
if i_right < n and remaining > 0:
reveal_order.append([i_right])
i_right += 1
remaining -= 1
# --- Set up figure with 3 panels ---
fig, (ax_idf, ax_hyeto, ax_qdf) = plt.subplots(1, 3, figsize=figsize, tight_layout=True)
# Left panel: IDF envelope
d_range = np.linspace(peak_duration, total_duration, 200)
i_range = np.array([self.get_meanrain(d, T) for d in d_range])
ax_idf.plot(d_range, i_range, 'k-', lw=2, label=_('IDF curve (Montana)'))
idf_dot, = ax_idf.plot([], [], 'ro', ms=8, zorder=5)
idf_dots_placed, = ax_idf.plot([], [], 'o', color='steelblue', ms=5, alpha=0.6, zorder=4)
ax_idf.set_xlabel(_('Duration [min]'))
ax_idf.set_ylabel(_('Mean intensity [mm/h]'))
ax_idf.set_title(_('IDF envelope'))
ax_idf.legend(loc='upper right')
ax_idf.set_xlim(0, total_duration * 1.05)
ax_idf.set_ylim(0, i_range.max() * 1.15)
# Center panel: hyetogram being built
y_anim = np.zeros(n)
ax_hyeto.set_xlim(edges[0], edges[-1])
ax_hyeto.set_ylim(0, y_full.max() * 1.15)
ax_hyeto.set_xlabel(_('Time [min]'))
ax_hyeto.set_ylabel(_('Intensity [mm/h]'))
ax_hyeto.set_title(_('Chicago hyetogram – T=') + str(T) + ' ' + _('years'))
stair_plot = ax_hyeto.stairs(y_anim, edges, fill=True, alpha=0.7, color='steelblue')
step_text = ax_hyeto.text(0.98, 0.95, '', transform=ax_hyeto.transAxes,
ha='right', va='top', fontsize=10,
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
# Right panel: QDF volume verification
q_range = np.array([self.get_quantityrain(d, T) for d in d_range])
ax_qdf.plot(d_range, q_range, 'k-', lw=2, label=_('QDF curve (Montana)'))
qdf_dot, = ax_qdf.plot([], [], 'ro', ms=8, zorder=5)
qdf_dots_placed, = ax_qdf.plot([], [], 'o', color='steelblue', ms=5, alpha=0.6, zorder=4)
ax_qdf.set_xlabel(_('Duration [min]'))
ax_qdf.set_ylabel(_('Quantity [mm]'))
ax_qdf.set_title(_('Volume verification (QDF)'))
ax_qdf.legend(loc='upper left')
ax_qdf.set_xlim(0, total_duration * 1.05)
ax_qdf.set_ylim(0, q_range.max() * 1.15)
placed_idf = []
placed_qdf = []
def _init():
y_anim[:] = 0.
stair_plot.set_data(y_anim, edges)
idf_dot.set_data([], [])
idf_dots_placed.set_data([], [])
qdf_dot.set_data([], [])
qdf_dots_placed.set_data([], [])
step_text.set_text('')
placed_idf.clear()
placed_qdf.clear()
return stair_plot, idf_dot, idf_dots_placed, qdf_dot, qdf_dots_placed, step_text
def _update(frame):
if frame >= len(reveal_order):
return stair_plot, idf_dot, idf_dots_placed, qdf_dot, qdf_dots_placed, step_text
indices = reveal_order[frame]
for idx in indices:
y_anim[idx] = y_full[idx]
stair_plot.set_data(y_anim, edges)
# Compute cumulative duration and volume
nb_revealed = sum(len(g) for g in reveal_order[:frame + 1])
d_equiv = min(nb_revealed * timestep, total_duration)
i_equiv = self.get_meanrain(d_equiv, T)
q_montana = self.get_quantityrain(d_equiv, T)
# Actual volume from bars placed so far
bar_widths = np.diff(edges)
q_actual = np.sum(y_anim * bar_widths / 60.) # mm
if frame == 0:
step_text.set_text(
_('Peak') + f': {peak_duration:.0f} min\n'
f'V = {q_actual:.1f} mm '
f'(QDF: {q_montana:.1f} mm)')
else:
step_text.set_text(
f'd = {d_equiv:.0f} min\n'
f'V = {q_actual:.1f} mm '
f'(QDF: {q_montana:.1f} mm)')
# IDF panel
idf_dot.set_data([d_equiv], [i_equiv])
placed_idf.append((d_equiv, i_equiv))
idf_dots_placed.set_data([p[0] for p in placed_idf],
[p[1] for p in placed_idf])
# QDF panel – plot actual volume (should fall on the curve)
qdf_dot.set_data([d_equiv], [q_actual])
placed_qdf.append((d_equiv, q_actual))
qdf_dots_placed.set_data([p[0] for p in placed_qdf],
[p[1] for p in placed_qdf])
return stair_plot, idf_dot, idf_dots_placed, qdf_dot, qdf_dots_placed, step_text
anim = FuncAnimation(fig, _update, init_func=_init,
frames=len(reveal_order),
interval=interval, blit=False, repeat=True)
return anim
[docs]
class Qdf_IRM():
"""
Gestion des relations QDF calculées par l'IRM
Exemple d'utilisation :
Pour importer les fichiers depuis le site web de l'IRM meteo.be
from wolfhece.irm_qdf import Qdf_IRM
qdf = Qdf_IRM(force_import=True)
qdf =
Il est possible de spécifier le répertoire de stockage des fichiers Excel
Par défaut, il s'agit d'un sous-répertoire 'irm' du répertoire courant qui sera créé s'il n'exsiste pas
Une fois importé/téléchargé, il est possible de charger une commune sur base de l'INS ou de son nom
myqdf = Qdf_IRM(name='Jalhay')
Les données sont ensuite disponibles dans les propriétés, qui sont des "dataframes" pandas (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) :
- qdf : les relation Quantité/durée/fréquence
- standarddev : l'écart-type de l'erreur
- confintlow : la valeur inférieure de l'intervalle de confiance (-2*stddev)
- confintup : la valeur supérieure de l'intervalle de confiance (+2*stddev)
- montanacoeff : les coeffciients de Montana
L'index est le temps (dur10min, dur30min, dur1h, ... -- durationstext) et les colonnes sont les périodes de retour (RT2, RT5, RT10, ... -- RT).
Il est par exemple possible d'accéder aux coefficients de Montana via l'une de ces lignes ou une combinaison :
display(myqdf.montanacoeff)
rt = myqdf.montanacoeff.index
display(myqdf.montanacoeff.loc[rt[0]])
display(myqdf.montanacoeff.iloc[0])
display(myqdf.get_Montanacoeff(qdf.RT2))
:param force_import: If True, will download all the IRM's QDF files (about 600).
:param import_as_needed: If True and the IRM's QDF file has not yet been downloaded
then the file is downloaded.
"""
def __init__(self, store_path= None,
code:int= 0, name= '',
force_import= False,
ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018,
localities:Localities = None,
dataframe:pd.DataFrame = None,
import_as_needed = False) -> None:
if localities is None:
assert int(ins) in [2018, 2019, 2025], _('Bad INS - Retry !')
self.myloc = Localities(ins)
else:
self.myloc = localities
if store_path is None:
self.store = Path(__file__).parent / 'data' / 'irm' / 'qdf'
else:
self.store = Path(store_path)
self.store.mkdir(parents=True, exist_ok=True)
# This one will hold Qdf data of one locality. If it is None it means no
# data has been loaded.
[docs]
self.standarddev = None
[docs]
self.montanacoeff = None
if force_import:
# Import all QDF's from IRM
Qdf_IRM.importfromwebsite(self.store, ins=ins)
elif import_as_needed:
# Import as needed is helpful in the case of unit tests. They
# usually require only a handful of QDF curves and so downloading
# only some of them is much faster.
if name != "":
code = self.myloc.get_INSfromname(name)
if not (self.store / f"{code}.xlsx").exists():
Qdf_IRM.importfromwebsite(self.store, ins=ins, ins_code=code)
[docs]
self._qdf_image_table = None
[docs]
self._qdf_image_plot = None
if dataframe is not None:
""" If a dataframe is provided, we assume it contains the QDF data
and we set it directly.
"""
self._code = int(code)
self._name = self.myloc.get_namefromINS(code)
# Find columns containing '_Q'
qdf_columns = ['Duration'] + [col for col in dataframe.columns if '_Q' in col]
self.qdf = dataframe[qdf_columns].copy()
#replace duration in seconds with duration texts
self.qdf['Duration'] = self.qdf['Duration'].apply(lambda x: durationstext[list(durations_seconds).index(x)] if x in durations_seconds else x)
# replace columns names
self.qdf.columns = [col.replace('_Q', '') for col in self.qdf.columns]
# Set duration as index
self.qdf.set_index('Duration', inplace=True)
# Remove the name of the index
self.qdf.index.name = None
# Convert columns name to string
self.qdf.columns = [str(col) for col in self.qdf.columns]
std_columns = ['Duration'] + [col for col in dataframe.columns if '_Std' in col]
self.standarddev = dataframe[std_columns].copy()
self.standarddev['Duration'] = self.standarddev['Duration'].apply(lambda x: durationstext[list(durations_seconds).index(x)] if x in durations_seconds else x)
self.standarddev.set_index('Duration', inplace=True)
confintlow_columns = ['Duration'] + [col for col in dataframe.columns if '_Low' in col]
self.confintlow = dataframe[confintlow_columns].copy()
self.confintlow['Duration'] = self.confintlow['Duration'].apply(lambda x: durationstext[list(durations_seconds).index(x)] if x in durations_seconds else x)
self.confintlow.set_index('Duration', inplace=True)
confintup_columns = ['Duration'] + [col for col in dataframe.columns if '_Up' in col]
self.confintup = dataframe[confintup_columns].copy()
self.confintup['Duration'] = self.confintup['Duration'].apply(lambda x: durationstext[list(durations_seconds).index(x)] if x in durations_seconds else x)
self.confintup.set_index('Duration', inplace=True)
self._read_csv_or_excel_Montana_only(code = self._code)
self.fit_all()
elif code !=0:
if self._read_csv_or_excel(code=str(code)):
self.fit_all()
self._code = code
self._name = self.myloc.get_namefromINS(code)
else:
logging.debug(f"INS code {code} not found in the store (see {self.store.absolute()})")
elif name!='':
if self._read_csv_or_excel(name=name):
self.fit_all()
self._name = name
self._code = self.myloc.get_INSfromname(name)
else:
logging.debug(f"Name {name} not found in the store")
[docs]
def has_data_for_locality(self) -> bool:
""" Has this instance been initialized with data from a locality ?
"""
return self.qdf is not None
@property
[docs]
def name(self):
return self._name
@property
[docs]
def code(self):
return self._code
@property
[docs]
def code_name(self):
return str(self._code) + '-' + self._name
@property
[docs]
def name_code(self):
return self._name + '-' + str(self._code)
[docs]
def export_allmontana2xls(self):
""" Export all Montana coefficients to an Excel file """
newdf = []
for curcode in self.myloc.get_allcodes():
self._read_csv_or_excel(code=curcode)
if self.montanacoeff is not None:
self.montanacoeff['INS'] = [curcode]*12
self.montanacoeff['Name'] = [self.myloc.get_namefromINS(int(curcode))]*12
newdf.append(self.montanacoeff.copy())
self.montanacoeff=None
newdf = pd.concat(newdf)
newdf.to_excel("allmontana.xlsx")
@classmethod
[docs]
def importfromwebsite(cls, store_path:Path = 'irm', verbose:bool= False, waitingtime:float= .01, ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018, ins_code: int = None):
""" Import Excel files for one or all municipalities from the IRM website
:param store_path: Where to store the downloaded data. Directory will be created if it doesn't exist.
:param verbose: If `True`, will print some progress information.
If `False`, will do nothing.
If a callable, then will call it with a float in [0, 1].
0 means nothing downloaded, 1 means everything downloaded.
:param waitingtime: How long to wait (in seconds) betwenn the download
of each station (will make sure we don't overwhelm IRM's website).
:param ins: The year of the INS codes to use.
:param code: Restricts the data download to a specific NIS code. `None` means full download.
"""
import requests
myloc = Localities(ins)
store_path = Path(store_path)
if ins_code is not None:
codes_to_load = [ins_code]
else:
if not store_path.exists():
store_path.mkdir(parents=True, exist_ok=True)
codes_to_load = myloc.inscode2name
for key,myins in enumerate(codes_to_load):
#chaîne URL du fichier Excel
url="https://www.meteo.be//resources//climatology//climateCity//xls//IDF_table_INS"+str(myins)+".xlsx"
#Obtention du fichiers depuis le site web de l'IRM
response=requests.get(url)
if str(response.content).find("not found")==-1:
# Make sure we create the store path only if we have
# something to put inside.
if ins_code is not None and not store_path.exists():
store_path.mkdir(parents=True, exist_ok=True)
fname = store_path / (str(myins)+".xlsx")
file=open(fname, 'wb')
file.write(response.content)
file.close()
# To avoid bad default style
import warnings
from openpyxl import load_workbook
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
wb = load_workbook(fname) #, read_only=True)
wb.save(fname) # écrase le fichier avec un style par défaut
if verbose:
if callable(verbose):
verbose(key/len(codes_to_load))
else:
print(myins)
else:
#logging.error(response.content)
logging.error(f"Failed to load IRM data: {url} --> {response}")
sleep(waitingtime)
@classmethod
[docs]
def make(cls, ins_code: int | str,
store_path:Path = None, verbose:bool= False,
waitingtime:float= .01,
ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018):
""" Make a Qdf_IRM instance with data from the IRM website
:param store_path: Where to store the downloaded data. Directory will be created if it doesn't exist.
:param verbose: If `True`, will print some progress information.
If `False`, will do nothing.
If a callable, then will call it with a float in [0, 1].
0 means nothing downloaded, 1 means everything downloaded.
:param waitingtime: How long to wait (in seconds) betwenn the download
of each station (will make sure we don't overwhelm IRM's website).
:param ins: The year of the INS codes to use.
:param code: Restricts the data download to a specific NIS code. `None` means full download.
"""
if store_path is None:
store_path = Path(__file__).parent / 'data' / 'irm' / 'qdf'
if isinstance(ins_code, str):
loc = Localities(ins)
ins_code = loc.get_INSfromname(ins_code)
if ins_code is None:
raise ValueError(f"Invalid locality name: {ins_code}")
cls.importfromwebsite(store_path=store_path, verbose=verbose, waitingtime=waitingtime, ins=ins, ins_code=ins_code)
new_cls = cls(store_path=store_path, code = ins_code, ins=ins)
return new_cls
[docs]
def _read_csv_or_excel(self, code='', name=''):
""" Lecture des caractéristiques d'une commune
depuis le fichier CSV ou Excel associé au code INS
:param code: le code INS de la commune
:param name: le nom de la commune
"""
import warnings
if code !='':
loccode=str(code)
name = self.myloc.get_namefromINS(int(loccode))
elif name!='':
if not name.lower() in self.myloc.insname2code.keys():
return _('Bad name ! - Retry')
loccode=str(self.myloc.insname2code[name.lower()])
self._code = loccode
self._name = name
store = self.store
pathname_xls = store / (loccode+".xlsx")
pathname_csv = store / 'csv' / loccode
if pathname_csv.exists():
self.qdf = pd.read_csv(pathname_csv / 'qdf.csv', index_col=0)
self.standarddev = pd.read_csv(pathname_csv / 'standarddev.csv', index_col=0)
self.confintlow = pd.read_csv(pathname_csv / 'confintlow.csv', index_col=0)
self.confintup = pd.read_csv(pathname_csv / 'confintup.csv', index_col=0)
self.montanacoeff = pd.read_csv(pathname_csv / 'montanacoeff.csv', index_col=0)
self.montana = MontanaIRM(self.montanacoeff)
return True
else:
if path.exists(pathname_xls):
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
self.qdf=pd.read_excel(pathname_xls,"Return level",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True})
self.standarddev=pd.read_excel(pathname_xls,"Standard deviation",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True})
self.confintlow=pd.read_excel(pathname_xls,"Conf. interval, lower bound",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True})
self.confintup=pd.read_excel(pathname_xls,"Conf. interval, upper bound",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True})
self.montanacoeff=pd.read_excel(pathname_xls,"Montana coefficients",index_col=0,skiprows=range(11),nrows=12,usecols="A:G",engine='openpyxl', engine_kwargs={'read_only': True})
self.montana = MontanaIRM(self.montanacoeff)
return True
else:
logging.warning(f"Can't find montana data. Checked {pathname_csv.absolute()} and {pathname_xls.absolute()}")
self.qdf=None
self.standarddev=None
self.confintlow=None
self.confintup=None
self.montanacoeff=None
self.montana=None
return False
[docs]
def _read_csv_or_excel_Montana_only(self, code='', name=''):
""" Lecture des caractéristiques d'une commune depuis
le fichier CSV Excel associé au code INS
:param code: le code INS de la commune
:param name: le nom de la commune
"""
import warnings
if code !='':
loccode=str(code)
name = self.myloc.get_namefromINS(int(loccode))
elif name!='':
if not name.lower() in self.myloc.insname2code.keys():
return _('Bad name ! - Retry')
loccode=str(self.myloc.insname2code[name.lower()])
self._code = loccode
self._name = name
store = self.store
pathname_xls = store / (loccode+".xlsx")
pathname_csv = store / 'csv' / loccode
if pathname_csv.exists():
self.montanacoeff = pd.read_csv(pathname_csv / 'montanacoeff.csv', index_col=0)
self.montana = MontanaIRM(self.montanacoeff)
return True
else:
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
if path.exists(pathname_xls):
self.montanacoeff=pd.read_excel(pathname_xls,"Montana coefficients",index_col=0,skiprows=range(11),nrows=12,usecols="A:G",engine='openpyxl', engine_kwargs={'read_only': True})
self.montana = MontanaIRM(self.montanacoeff)
return True
else:
self.montanacoeff=None
self.montana=None
return False
@classmethod
[docs]
def convert_xls2csv(cls, store_path= 'irm', ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018):
""" Convert all Excel files to CSV files
:param store_path: Where to store the downloaded data. Directory will be created if it doesn't exists.
:param ins: The year of the INS codes to use.
"""
myloc = Localities(ins)
store_path = Path(store_path)
for key,myins in enumerate(myloc.get_allcodes()):
pathname = store_path / (str(myins)+".xlsx")
if pathname.exists():
try:
logging.info(f"Converting {pathname} to CSV files")
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
qdf=pd.read_excel(pathname,"Return level",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True})
standarddev=pd.read_excel(pathname,"Standard deviation",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True})
confintlow=pd.read_excel(pathname,"Conf. interval, lower bound",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True})
confintup=pd.read_excel(pathname,"Conf. interval, upper bound",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl', engine_kwargs={'read_only': True})
montanacoeff=pd.read_excel(pathname,"Montana coefficients",index_col=0,skiprows=range(11),nrows=12,usecols="A:G",engine='openpyxl', engine_kwargs={'read_only': True})
store_csv = store_path / 'csv' / str(myins)
store_csv.mkdir(exist_ok=True, parents=True)
qdf.to_csv(store_csv / 'qdf.csv')
standarddev.to_csv(store_csv / 'standarddev.csv')
confintlow.to_csv(store_csv / 'confintlow.csv')
confintup.to_csv(store_csv / 'confintup.csv')
montanacoeff.to_csv(store_csv / 'montanacoeff.csv')
except Exception as e:
logging.error(f"Error processing {pathname}: {e}")
else:
logging.warning(f"File {pathname} does not exist, skipping conversion.")
logging.info(_("If it is a problem, try to reimport the data from the IRM website."))
[docs]
def plot_idf(self, T=None, which:Literal['All', 'Montana', 'QDFTable'] = 'All', color=[27./255.,136./255.,245./255.]):
"""
Plot IDF relations on a new figure
:param T : the return period (based on RT constants)
:param which : information to plot
- 'Montana'
- 'QDFTable'
- 'All'
"""
if self.montana is None and which != 'QDFTable':
logging.error(_("Montana coefficients are not available for this locality."))
return None, None
if self.qdf is None and which != 'Montana':
logging.error(_("QDF data is not available for this locality."))
return None, None
fig,ax = plt.subplots(1,1,figsize=(15,10), tight_layout=True)
ax.set_xscale('log')
ax.set_yscale('log')
if T is None:
for k in range(len(RT)):
pond = .3+.7*float(k/len(RT))
mycolor = color+[pond]
if which=='All' or which=='QDFTable':
ax.scatter(durations,self.qdf[RT[k]]/durations*60.,label=RT[k] + _(' QDF Table'),color=mycolor)
if which=='All' or which=='Montana':
iMontana = [self.montana.get_meanrain(curdur,RT[k]) for curdur in durations]
ax.plot(durations,iMontana,label=RT[k] + ' Montana',color=mycolor)
else:
assert T in RT, _('Bad return period ! - Retry')
if which=='All' or which=='QDFTable':
ax.scatter(durations,self.qdf[T],label=T+ _(' QDF Table'),color=color)
if which=='All' or which=='Montana':
iMontana = [self.montana.get_instantrain(curdur,T) for curdur in durations]
ax.plot(durations,iMontana,label=T + ' Montana',color=color)
ax.legend().set_draggable(True)
ax.set_xlabel(_('Duration [min]'))
ax.set_ylabel(_('Intensity [mm/h]'))
ax.set_xticks(durations)
ax.set_xticklabels(durationstext,rotation=45)
ax.set_title(self._name + ' - code : ' + str(self._code))
return fig,ax
[docs]
def plot_qdf(self, T=None, which:Literal['All', 'Montana', 'QDFTable'] = 'All', color=[27./255.,136./255.,245./255.]):
"""
Plot QDF relations on a new figure
:param T : the return period (based on RT constants)
:param which : information to plot
- 'Montana'
- 'QDFTable'
- 'All'
"""
if self.qdf is None and which != 'Montana':
logging.error(_("QDF data is not available for this locality."))
return None, None
if self.montana is None and which != 'QDFTable':
logging.error(_("Montana coefficients are not available for this locality."))
return None, None
fig,ax = plt.subplots(1,1,figsize=(12,8), tight_layout=True)
ax.set_xscale('log')
if T is None:
for k in range(len(RT)):
pond = .3+.7*float(k/len(RT))
mycolor = color+[pond]
if which=='All' or which=='QDFTable':
ax.scatter(durations,self.qdf[RT[k]],label=RT[k] + _(' QDF Table'),color=mycolor)
if which=='All' or which=='Montana':
QMontana = [self.montana.get_quantityrain(curdur,RT[k]) for curdur in durations]
ax.plot(durations,QMontana,label=RT[k] + ' Montana',color=mycolor)
else:
assert T in RT, _('Bad return period ! - Retry')
if which=='All' or which=='QDFTable':
ax.scatter(durations,self.qdf[T],label=T+ _(' QDF Table'),color=color)
if which=='All' or which=='Montana':
QMontana = [self.montana.get_quantityrain(curdur,T) for curdur in durations]
ax.plot(durations,QMontana,label=T + ' Montana',color=color)
ax.grid(True, which='both', linestyle='--', linewidth=0.5)
ax.legend().set_draggable(True)
ax.set_xlabel(_('Duration [min]'))
ax.set_ylabel(_('Quantity [mm]'))
ax.set_xticks(durations)
ax.set_xticklabels(durationstext,rotation=45)
ax.set_title(self._name + ' - code : ' + str(self._code))
return fig,ax
[docs]
def plot_cdf(self, dur=None):
""" Plot the cdf of the QDF data for a given duration """
if self.qdf is None:
logging.error(_("QDF data is not available for this locality."))
return None, None
fig,ax = plt.subplots(1,1,figsize=(10,10), tight_layout=True)
if dur is None:
for k in range(len(durations)):
pond = .3+.7*float(k/len(durations))
mycolor = (27./255.,136./255.,245./255.,pond)
ax.scatter(self.qdf.loc[durationstext[k]],freqndep,marker='o',label=durationstext[k],color=mycolor)
else:
assert dur in durationstext, _('Bad duration - Retry !')
ax.scatter(self.qdf.loc[dur],freqndep,marker='o',label=dur,color=(0,0,1))
ax.legend().set_draggable(True)
ax.set_ylabel(_('Cumulative distribution function (cdf)'))
ax.set_xlabel(_('Quantity [mm]'))
ax.set_title(self._name + ' - code : ' + str(self._code))
return fig,ax
[docs]
def fit_all(self):
""" Fit all durations with a Generalized Extreme Value distribution """
self.load_fits_json()
if self.popt_all == {}:
for curdur in durationstext:
fig,ax,popt,pcov = self.fit_cdf(curdur)
self.popt_all[curdur]=popt
# self.pcov_all[curdur]=pcov
self.save_fits_json()
[docs]
def save_fits_json(self):
""" Save the fits in a csv file """
with open(self.store / (str(self._code) + '_fits.json'), 'w') as f:
df = pd.DataFrame(self.popt_all)
df.to_json(f)
# with open(path.join(self.store, str(self.code) + '_fits_cov.json'), 'w') as f:
# df = pd.DataFrame(self.pcov_all)
# df.to_json(f)
[docs]
def load_fits_json(self):
""" Load the fits from a json file """
import json
filename = self.store / (str(self._code) + '_fits.json')
if filename.exists():
with open(filename, 'r') as f:
self.popt_all = json.load(f)
for key in self.popt_all.keys():
self.popt_all[key] = np.array([val for val in self.popt_all[key].values()])
else:
self.popt_all = {}
# filename = path.join(self.store, str(self.code) + '_fits_cov.json')
# if path.exists(filename):
# with open(filename, 'r') as f:
# self.pcov_all = json.load(f)
# else:
# self.pcov_all = {}
[docs]
def fit_cdf(self, dur=None, plot=False):
""" Fit the cdf of the QDF data with a Generalized Extreme Value distribution
:param dur: the duration to fit
:param plot: if True, will plot the cdf with the fit
"""
if dur is None:
return _('Bad duration - Retry !')
if dur not in durationstext:
return _('Bad duration - Retry !')
x=np.asarray(self.qdf.loc[dur], dtype=np.float64)
def locextreme(x,a,b,c):
return genextreme.cdf(x, a, loc=b, scale=c)
def locextreme2(a):
LL = -np.sum(genextreme.logpdf(x,a[0],loc=a[1],scale=a[2]))
return LL
popt = genextreme.fit(x)
popt, pcov = curve_fit(locextreme, x, freqndep, p0=popt)
#ptest = minimize(locextreme2,popt,bounds=[[-10.,0.],[0.,100.],[0.,100.]])
# perr = np.sqrt(np.diag(pcov))
fig=ax=None
if plot:
fig,ax=self.plot_cdf(dur)
ax.plot(x,genextreme.cdf(x,popt[0],loc=popt[1],scale=popt[2]),label='fit')
# ax.plot(x,genextreme.cdf(x,ptest.x[0],loc=ptest.x[1],scale=ptest.x[2]),label='fit_MLE')
ax.legend().set_draggable(True)
self.stat = genextreme
return fig,ax,popt,pcov
[docs]
def get_Tfromrain(self, Q, dur=dur1h):
""" Get the return period for a given quantity of rain
:param Q: the quantity of rain
:param dur: the duration
"""
return 1./self.stat.sf(Q, self.popt_all[dur][0], loc=self.popt_all[dur][1], scale=self.popt_all[dur][2])
[docs]
def get_rainfromT(self, T, dur= dur1h):
""" Get the quantity of rain for a given return period and duration
:param T: the return period
:param dur: the duration
"""
return self.stat.isf(1./T,self.popt_all[dur][0],loc=self.popt_all[dur][1],scale=self.popt_all[dur][2])
[docs]
def get_MontanacoeffforT(self, return_period):
""" Get the Montana coefficients for a given return period
:param return_period: the return period
"""
if return_period in RT:
return self.montanacoeff.loc[float(return_period)]
else:
return _('Bad RT - Retry !')
[docs]
def plot_hyeto(self, durmax, T, r=.5):
""" Plot the hyetogram for a given return period
:param durmax: the maximum duration of the hyetogram
:param T: the return period
:param r: the decentration coefficient
"""
fig,ax = self.montana.plot_hyeto(durmax,T,r)
ax.set_title(self._name + ' - code : ' + str(self._code))
return fig
[docs]
def plot_hyetos(self, durmax, r=.5):
""" Plot the hyetograms for all return periods
:param durmax: the maximum duration of the hyetograms
:param r: the decentration coefficient
"""
fig,ax = self.montana.plot_hyetograms_instant(durmax,r)
ax.set_title(self._name + ' - code : ' + str(self._code))
def __str__(self) -> str:
""" Return the QDF data as a string """
return self.qdf.__str__()
[docs]
def make_image_qdf_plot(self, T= None, which:Literal['All', 'Montana', 'QDFTable'] = 'All', color=[27./255.,136./255.,245./255.]):
""" Create an image of the QDF plot.
We use the `matplotlib` library to create a PNG image of the QDF data.
The image will be saved in the store path with the name `<code>_qdf_plot.png`.
:param durmax: the maximum duration of the hyetograms
:param r: Decentration coefficient
:return: a PNG image
"""
import matplotlib
self._qdf_image_plot = self.store / f"{self.code_name}_qdf_plot.png"
if self._qdf_image_plot.exists():
return self._qdf_image_plot
old_backend = matplotlib.get_backend()
matplotlib.use('Agg') # Use a non-interactive backend for saving images
fig, ax = self.plot_qdf(T=T, which=which, color=color)
fig.savefig(self._qdf_image_plot, dpi=300)
plt.close(fig)
matplotlib.use(old_backend) # Restore the original backend
return self._qdf_image_plot
[docs]
def make_image_qdf_table(self):
""" Create an image of the QDF data.
We use the `dataframe_image` library to create a PNG image of the QDF data.
Added style to the DataFrame to make it more readable.
:return: a PNG image
"""
try:
import dataframe_image as dfimg
except ImportError:
logging.error(_("The 'dataframe_image' library is not installed. Please install it to create QDF table images."))
return None
if self.qdf is None:
logging.error(_("QDF data is not available for this locality."))
return None
qdf = self.qdf.copy()
# Create a styled DataFrame
# Add a caption to the DataFrame
qdf.attrs['caption'] = f"QDF data for {self._name} (INS code: {self._code})<br>\
<div style='font-size:8px;'>source : https://www.meteo.be/fr/climat/climat-de-la-belgique/climat-dans-votre-commune<br> \
Data extracted from IRM (Institut Royal Météorologique de Belgique) and processed by Wolf - ULiège"
qdf.columns = pd.MultiIndex.from_tuples([(f"{_('Return period')}", str(col)) for col in qdf.columns])
# Style the DataFrame
# One line per duration, one column per return period
# We will use light colors for the background and borders
# to highlight every other line and center the text
styled_df = qdf.style.format(precision=1) \
.set_caption(qdf.attrs['caption']) \
.set_properties(**{
'text-align': 'center',
'font-size': '12px',
'border': '1px solid black',
# 'background-color': '#f0f0f0',
}).set_table_styles([
{
'selector': 'thead th.row_heading.level0',
# 'props': [('text-align', 'center'), ('background-color', '#d9edf7'), ('color', '#31708f'),],
'props': [('color', 'transparent')],
},
{
'selector': 'thead th',
'props': [('text-align', 'center'), ('background-color', '#d9edf7'), ('color', '#31708f')],
},
])
# Define the path for the image
self._qdf_image_table = self.store / f"{self.code_name}_qdf.png"
# Save the styled DataFrame as an image
dfimg.export(styled_df, self._qdf_image_table, dpi=300)
[docs]
def make_images(self):
""" Create all images for the QDF data. """
self.make_image_qdf_table()
self.make_image_qdf_plot()
return self._qdf_image_table, self._qdf_image_plot
@property
[docs]
def path_image_plot(self):
""" Get the path for the QDF plot image. """
if self._qdf_image_plot is None:
self.make_image_qdf_plot()
return self._qdf_image_plot
@property
[docs]
def path_image_table(self):
""" Get the path for the QDF table image. """
if self._qdf_image_table is None:
self.make_image_qdf_table()
return self._qdf_image_table
[docs]
class QDF_Belgium():
""" Class to manage all QDF data for Belgium """
def __init__(self, store_path= 'irm',
ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018,
force_import: bool = False) -> None:
[docs]
self.localities = Localities(ins)
[docs]
self.store_path = Path(store_path)
if force_import or len(list(self.store_path.glob('*.xlsx'))) == 0:
Qdf_IRM.importfromwebsite(store_path=str(self.store_path), verbose=True, ins=ins)
if len(list(self.store_path.rglob('*.csv'))) == 0:
Qdf_IRM.convert_xls2csv(store_path=str(self.store_path), ins=ins)
[docs]
self.all:dict[int, Qdf_IRM] = {}
for loc_ins in tqdm(self.localities.get_allcodes()):
loc = Qdf_IRM(store_path=str(self.store_path),
code=loc_ins,
localities=self.localities)
if loc.qdf is not None:
self.all[loc_ins] = loc
[docs]
def make_images(self):
""" Create all images for all QDF data. """
for loc in self.all.values():
loc.make_images()
def __getitem__(self, key) -> Qdf_IRM:
if isinstance(key, int):
if key in self.all:
return self.all[key]
else:
logging.error(f"INS code {key} not found in the data")
return None
elif isinstance(key, str):
key = self.localities.get_INSfromname(key)
if key is not None:
if key in self.all:
return self.all[key]
else:
logging.error(f"INS code {key} not found in the data")
return None
else:
logging.error(f"Name {key} not found in the data")
return None
def __iter__(self):
""" Iterate over all localities """
for qdf_municip in self.all.values():
yield qdf_municip
[docs]
RE_REFERENCE = re.compile(r"\([0-9]+\)")
[docs]
class Climate_IRM():
def __init__(self, store_path= 'irm', ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018) -> None:
[docs]
self.store_path = Path(store_path)
[docs]
self.localities = Localities(ins)
[docs]
self._climate_data = {}
def __getitem__(self, key):
return self._climate_data[key]
@classmethod
[docs]
def importfromwebsite(cls, store_path= 'irm', verbose:bool= False, waitingtime:float= .01, ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2018, ins_code: int = None, convert=False):
""" Import Excel files for one or all municipalities from the IRM website
:param store_path: Where to store the downloaded data. Directory will be created if it doesn't exists.
:param verbose: If `True`, will print some progress information.
If `False`, will do nothing.
If a callable, then will call it with a float in [0, 1].
0 means nothing downloaded, 1 means everything downloaded.
:param waitingtime: How long to wait (in seconds) betwenn the download
of each station (will make sure we don't overwhelm IRM's website).
:param ins: The year of the INS codes to use.
:param code: Restricts the data download to a specific NIS code. `None` means full download.
:param convert: Converts the downloaded PDF to Excel files.
"""
import requests
myloc = Localities(ins)
if ins_code is not None:
codes_to_load = [ins_code]
else:
if not path.exists(store_path):
mkdir(store_path)
codes_to_load = myloc.inscode2name
for key,myins in enumerate(codes_to_load):
#chaîne URL du fichier Excel
url="https://www.meteo.be//resources//climatology//climateCity//pdf//climate_INS"+str(myins)+"_9120_fr.pdf"
#Obtention du fichiers depuis le site web de l'IRM
response=requests.get(url)
if str(response.content).find("Page not found")==-1 :
# Make sure we create the store path only if we have
# something to put inside.
if ins_code is not None and not path.exists(store_path):
mkdir(store_path)
pdf_file = path.join(store_path,str(myins)+".pdf")
file=open(pdf_file, 'wb')
file.write(response.content)
file.close()
if convert:
cls._convert_irm_file(pdf_file)
if verbose:
if callable(verbose):
verbose(key/len(codes_to_load))
else:
print(myins)
else:
#logging.error(response.content)
logging.error(f"Failed to load IRM data: {url} --> {response}")
if len(codes_to_load) >= 2:
sleep(waitingtime)
@classmethod
[docs]
def _scrap_table(cls, t):
"""
Helper method to transform a table represented as a list of list to a
pandas DataFrame.
"""
def fix_cid(strings: list[str]):
# The CID thing is a known issue:
# https://github.com/euske/pdfminer/issues/122
return [s.replace('(cid:176)C ', '°C').replace('¢', "'") for s in strings]
nt = []
row_headers = []
for rndx in range(1, len(t)):
# In the row header, we remove the "references" like "(1)".
row_headers.append( RE_REFERENCE.sub("", t[rndx][0]) )
# The PDFs use different "minus" glyph instead of an ASCII one,
# let's fix it.
nt.append( list(map(lambda s:float(s.replace("−","-")), t[rndx][1:])))
columns_headers = map(TRANSLATION_HEADER.get, t[0][1:])
df = pd.DataFrame(nt, columns=fix_cid(columns_headers), index=fix_cid(row_headers))
return df
@classmethod
[docs]
def _convert_irm_file(cls, pdf_file: Union[str, Path]):
"""
Scrap a PDF from IRM into two tables in a single Excel file with two
sheets.
"""
pdf_file = Path(pdf_file)
with pdfplumber.open(pdf_file) as pdf:
# Rain data
df = cls._scrap_table(pdf.pages[1].extract_table())
# Sun data
df_sun = cls._scrap_table(pdf.pages[4].extract_table())
dest_file = pdf_file.with_suffix('.xlsx')
with pd.ExcelWriter(dest_file) as writer: # doctest: +SKIP
df.to_excel(writer, sheet_name='Rain')
df_sun.to_excel(writer, sheet_name='Sun')
[docs]
PLUVIO_INI = "pluvio.ini"
[docs]
MATCH_NUM_ZONE_SHAPEFILE_INS_INDEX = "Match_num_zone_shapefile_INS_index.txt"
[docs]
EXTREME_PRECIP_COMMUNES = "Extreme_rain_ins.txt"
[docs]
GEOMETRY_MUNICIPALITIES = "PDS__COMMUNES.shp"
[docs]
class QDF_Hydrology():
""" Prepare data from IRM website for WOLF hydrology calculations.
We need :
- pluvio.ini
- Match_num_zone_shapefile_INS_index.txt
"pluvio.ini" contains the path to the rainfall data files for each locality:
- Extreme_precip_communes.txt
"""
def __init__(self, store_path= DATADIR / 'irm_qdf',
ini_file:str = PLUVIO_INI,
ins:Literal['2018', 2018] = 2018,
geometry:str = GEOMETRY_MUNICIPALITIES) -> None:
[docs]
self.store_path = Path(store_path)
[docs]
self._data:dict[int, Qdf_IRM] = {}
[docs]
self._ins = ins # INS version to use. IRM has updated the QDF data in 2016, so we force the use of the 2018 version.
[docs]
self._extreme_file = EXTREME_PRECIP_COMMUNES
[docs]
self._nb_lines_extreme_file = 0
[docs]
self._nb_cols_extreme_file = 0
[docs]
self.localities = Localities(ins)
[docs]
self._ini_file = ini_file
[docs]
self._geometry = geometry
[docs]
self._geometry_df = None
if not self.store_path.exists():
self.store_path.mkdir(parents=True, exist_ok=True)
if not self.store_path.joinpath(self._geometry).exists():
# get the municipalities shapefile from HECE Gitlab public repository
self.download_municipalities_2018()
try:
if self.store_path.joinpath(self._geometry).exists():
self._geometry_df = gpd.read_file(self.store_path / self._geometry)
else:
logging.error(f"Geometry file {self._geometry} not found in {self.store_path}.")
logging.error("Please download the 2018 version of municipalities shapefile from HECE or what you want.")
return
except Exception as e:
logging.error(f"Error reading geometry file {self._geometry}: {e}")
self._geometry_df = None
if not self.store_path.joinpath(ini_file).exists():
self.create_default_data()
self.read_data()
[docs]
def read_data(self):
""" Read the data from the ini file and the extreme precipitation file. """
try:
with open(self.store_path / PLUVIO_INI, 'r') as f:
lines = f.readlines()
self._extreme_file = lines[0].strip()
self._nb_lines_extreme_file = int(lines[1].strip())
self._nb_cols_extreme_file = int(lines[2].strip())
_nb_return_periods = int(lines[3].strip())
_nb_values_per_return_period = int(lines[4].strip())
rt =[]
for i in range(_nb_return_periods):
try:
rt.append(int(lines[5 + i].strip()))
except ValueError:
logging.error(f"Invalid return period value in ini file: {lines[5 + i].strip()}")
rt.append(0)
if self.store_path.joinpath(self._extreme_file).exists():
df = pd.read_csv(self.store_path / self._extreme_file, sep='\t', header=None)
# Set column names based on the number of return periods and values per return period
columns = ['INS', 'Duration']
for rt_value in rt:
for value_type in ['Q', 'Std', 'Low', 'Up']:
columns.append(f"{rt_value}_{value_type}")
df.columns = columns
all_ins = df['INS'].astype(int).unique()
self._data = {ins: Qdf_IRM(store_path=self.store_path, code=ins, localities=self.localities, dataframe=df[df['INS'] == ins]) for ins in all_ins}
else:
logging.error(f"Extreme precipitation file {self._extreme_file} not found in {self.store_path}.")
self._data = {}
except:
logging.error(f"Error during reading {self._ini_file} in {self.store_path}.")
logging.error("Check your data or delete files in the directory to force a new download.")
self._data = {}
self._extreme_file = None
self._nb_lines_extreme_file = 0
self._nb_cols_extreme_file = 0
def __getitem__(self, key) -> Qdf_IRM:
""" Get the QDF data for a given INS code. """
if isinstance(key, int):
ins_code = key
elif isinstance(key, str):
ins_code = self.localities.get_INSfromname(key)
if ins_code is None:
try:
# Try to convert the string to an integer (INS code)
ins_code = int(key)
except ValueError:
# If it fails, raise an error
raise KeyError(f"Locality {key} not found.")
else:
raise TypeError("Key must be an integer (INS code) or a string (locality name).")
if ins_code in self._data:
return self._data[ins_code]
else:
raise KeyError(f"Data for INS code {ins_code} not found.")
def __iter__(self):
""" Iterate over all QDF data. """
for ins_code in self._data:
yield self._data[ins_code]
[docs]
def download_municipalities_2018(self, force:bool = False):
""" Download the municipalities shapefile from HECE.
:param force: If `True`, will download the file even if it already exists.
"""
munic = Path(toys_dataset('Communes_Belgique', 'PDS__COMMUNES.zip'))
# Unzip the file if it is not already unzipped.
if munic.exists():
import zipfile
with zipfile.ZipFile(munic, 'r') as zip_ref:
zip_ref.extractall(self.store_path)
self._geometry = 'PDS__COMMUNES.shp'
[docs]
def create_match_num_zone_shapefile(self):
""" Create the Match_num_zone_shapefile_INS_index.txt file.
This file contains the mapping between the INS codes and the shapefile
indices.
"""
match_file = self.store_path / MATCH_NUM_ZONE_SHAPEFILE_INS_INDEX
colname = 'INS'
if colname not in self._geometry_df.columns:
colname = 'NSI' # Use NSI if INS is not available
if colname not in self._geometry_df.columns:
logging.error(f"Column {colname} not found in the geometry DataFrame.")
return
if not match_file.exists():
with open(match_file, 'w') as f:
for idx, row in self._geometry_df.iterrows():
f.write(f"{row[colname]}\n")
[docs]
def create_default_data(self):
""" Create data from scratch for WOLF hydrology calculations. """
self.create_match_num_zone_shapefile()
self.create_extreme_precipitation_file()
self.create_ini_file()
[docs]
def create_extreme_precipitation_file(self):
""" Create the extreme precipitation file for all localities.
Each line of the file contains the following data:
- INS code
- Duration in seconds
- Quantity for each return period (RT2, RT5, RT10, RT20, RT50, RT100)
"""
self.extreme_file = self.store_path / EXTREME_PRECIP_COMMUNES
if not self.extreme_file.exists():
all_qdf = QDF_Belgium(store_path=self.store_path, ins=self._ins, force_import=False)
self._nb_lines_extreme_file = 0
with open(self.extreme_file, 'w') as f:
for loc in all_qdf:
loc:Qdf_IRM
ins = loc.code
qdf = loc.qdf
low = loc.confintlow
up = loc.confintup
std = loc.standarddev
for (dur_text, dur_s) in zip(durationstext, durations_seconds):
data = [ins]
data.append(int(dur_s))
for rt in RT:
data.append(qdf.loc[dur_text, rt])
data.append(std.loc[dur_text, rt])
data.append(low.loc[dur_text, rt])
data.append(up.loc[dur_text, rt])
f.write("\t".join(map(str, data)) + "\n")
self._nb_lines_extreme_file += 1
self._nb_cols_extreme_file = len(data)
[docs]
def create_ini_file(self):
"""
Create a parameter file for the class
"""
with open(self.store_path / PLUVIO_INI, 'w') as f:
f.write(f"{self._extreme_file}\n") # name of the file containing the extreme precipitation data
if self._nb_lines_extreme_file == 0 or self._nb_cols_extreme_file == 0:
with open(self.store_path / self._extreme_file, 'r') as ef:
lines = ef.readlines()
self._nb_lines_extreme_file = len(lines)
if lines:
self._nb_cols_extreme_file = len(lines[0].strip().split('\t'))
else:
self._nb_cols_extreme_file = 0
f.write(f"{self._nb_lines_extreme_file}\n") # number of lines in the extreme precipitation file
f.write(f"{self._nb_cols_extreme_file}\n") # number of columns in the extreme precipitation file
f.write(f"{len(RT)}\n") # Number of return periods
f.write("4\n") # Number of values par return period (Q, std, low, up)
for rt in RT:
f.write(f"{rt}\n")
[docs]
def get_all_ins(self) -> list[int]:
""" Get a list of all INS codes. """
return list(self._data.keys())
[docs]
class QDF_Hydrology_Draw(Element_To_Draw):
""" Class to draw the QDF hydrology data on a map.
This class is used to draw the QDF hydrology data on a map using the
WOLF hydrology calculations.
"""
def __init__(self, store_path= DATADIR / 'irm_qdf', ins:Literal['2018', 2018] = 2018, idx:str = '', plotted:bool = True, mapviewer = None) -> None:
super().__init__(idx=idx, plotted=plotted, mapviewer=mapviewer)
[docs]
self._qdf_hydrology = QDF_Hydrology(store_path=store_path, ins=ins)
from .PyPictures import PictureCollection
[docs]
self._scale_factor = 1.0 # Default scale factor for images
[docs]
self._geometry_zones = Zones('', idx= idx+'_zones', plotted=plotted, mapviewer=mapviewer, parent = mapviewer)
[docs]
self._geometry_tables = PictureCollection('', idx= idx, plotted=plotted, mapviewer=mapviewer, parent = mapviewer)
[docs]
self._geometry_plots = PictureCollection('', idx= idx, plotted=plotted, mapviewer=mapviewer, parent = mapviewer)
self._geometry_zones.import_shapefile(self.store_path / self._qdf_hydrology._geometry, colname='NSI')
self._geometry_zones.prep_listogl()
self._geometry_tables.import_shapefile(self.store_path / self._qdf_hydrology._geometry, colname='NSI')
self._geometry_plots.import_shapefile(self.store_path / self._qdf_hydrology._geometry, colname='NSI')
self._prepare_image_location()
[docs]
self._centroids = {curzone[0].centroid: curzone.myname for curzone in self._geometry_tables.myzones}
[docs]
self._show_table = False
[docs]
self._show_plot = False
[docs]
self._reload_images = True
[docs]
self._current_images = None
def __getitem__(self, key) -> Qdf_IRM:
""" Get the QDF data for a given INS code. """
if isinstance(key, int):
ins_code = key
elif isinstance(key, str):
ins_code = self.localities.get_INSfromname(key)
if ins_code is None:
try:
# Try to convert the string to an integer (INS code)
ins_code = int(key)
except ValueError:
# If it fails, raise an error
raise KeyError(f"Locality {key} not found.")
else:
raise TypeError("Key must be an integer (INS code) or a string (locality name).")
if ins_code in self._qdf_hydrology._data:
return self._qdf_hydrology._data[ins_code]
else:
raise KeyError(f"Data for INS code {ins_code} not found.")
[docs]
def _get_vector_tables(self, ins:str | int) -> vector:
""" Get the vector for a given INS code. """
return self._geometry_tables[(str(ins), str(ins))]
[docs]
def _get_vector_plots(self, ins:str | int) -> vector:
""" Get the vector for a given INS code. """
return self._geometry_plots[(str(ins), str(ins))]
@property
[docs]
def store_path(self):
""" Get the store path for the QDF hydrology data. """
return self._qdf_hydrology.store_path
[docs]
def _prepare_image_location(self):
""" Set the default size for the images. """
# plots
DEFAULT_SIZE = 2000. * self._scale_factor # Default size for the images
RAP_W_H = 3600. / 2400.
WIDTH_PLOTS = DEFAULT_SIZE * RAP_W_H
HEIGHT_PLOTS = DEFAULT_SIZE
for curzone in self._geometry_plots.myzones:
vec = curzone[0]
vec.myprop.image_attached_pointx = vec.centroid.x
vec.myprop.image_attached_pointy = vec.centroid.y
vec.myprop.imagevisible = False
x, y = vec.centroid.x, vec.centroid.y
y -= HEIGHT_PLOTS / 2.
vec.myvertices = [wv(x - WIDTH_PLOTS, y - HEIGHT_PLOTS),
wv(x + WIDTH_PLOTS, y - HEIGHT_PLOTS),
wv(x + WIDTH_PLOTS, y + HEIGHT_PLOTS),
wv(x - WIDTH_PLOTS, y + HEIGHT_PLOTS),
wv(x - WIDTH_PLOTS, y - HEIGHT_PLOTS)]
vec.myprop.color = getIfromRGB([255, 255, 255, 0]) # Transparent color
vec.find_minmax()
self._geometry_plots.prep_listogl()
# tables
RAP_W_H = 1730. / 2000.
WIDTH_TABLES = DEFAULT_SIZE * RAP_W_H
HEIGHT_TABLES = DEFAULT_SIZE
for curzone in self._geometry_tables.myzones:
vec = curzone[0]
vec.myprop.image_attached_pointx = vec.centroid.x
vec.myprop.image_attached_pointy = vec.centroid.y
vec.myprop.imagevisible = False
x, y = vec.centroid.x, vec.centroid.y
y += 2. * HEIGHT_TABLES - (HEIGHT_PLOTS / 2.) * 3./4.
vec.myvertices = [wv(x - WIDTH_TABLES, y - HEIGHT_TABLES),
wv(x + WIDTH_TABLES, y - HEIGHT_TABLES),
wv(x + WIDTH_TABLES, y + HEIGHT_TABLES),
wv(x - WIDTH_TABLES, y + HEIGHT_TABLES),
wv(x - WIDTH_TABLES, y - HEIGHT_TABLES)]
vec.myprop.color = getIfromRGB([255, 255, 255, 0])
vec.find_minmax()
self._geometry_tables.prep_listogl()
[docs]
def set_images_as_legend(self, plot_or_table:Literal['plot', 'table'] = 'plot', which:list = None):
""" Set all images in the collection as legend images. """
DEFAULT_SIZE = 2000. * self._scale_factor # Default size for the images
if which is None:
which = self._qdf_hydrology.get_all_ins()
if plot_or_table == 'plot':
RAP_W_H = 3600. / 2400.
WIDTH = DEFAULT_SIZE * RAP_W_H
HEIGHT = DEFAULT_SIZE
for loc_ins in which:
loc_qdf = self._qdf_hydrology[loc_ins]
if loc_qdf.path_image_plot is not None:
vec = self._get_vector_plots(loc_qdf.code)
vec.myprop.image_path = loc_qdf.path_image_plot
centroid = vec.centroid
vec.myprop.image_attached_pointx, vec.myprop.image_attached_pointy = centroid.x, centroid.y
vec.myprop.imagevisible = True
vec.myvertices = [wv(centroid.x - WIDTH, centroid.y - HEIGHT),
wv(centroid.x + WIDTH, centroid.y - HEIGHT),
wv(centroid.x + WIDTH, centroid.y + HEIGHT),
wv(centroid.x - WIDTH, centroid.y + HEIGHT),
wv(centroid.x - WIDTH, centroid.y - HEIGHT)]
vec.myprop.color = getIfromRGB([255, 255, 255, 0])
vec.find_minmax()
self._geometry_plots.reset_listogl()
self._geometry_plots.prep_listogl()
elif plot_or_table == 'table':
RAP_W_H = 1730. / 2000.
WIDTH = DEFAULT_SIZE * RAP_W_H
HEIGHT = DEFAULT_SIZE
for loc_ins in which:
loc_qdf = self._qdf_hydrology[loc_ins]
if loc_qdf.path_image_table is not None:
vec = self._get_vector_tables(loc_qdf.code)
vec.myprop.image_path = loc_qdf.path_image_table
centroid = vec.centroid
vec.myprop.image_attached_pointx, vec.myprop.image_attached_pointy = centroid.x, centroid.y
vec.myprop.imagevisible = True
vec.myvertices = [wv(centroid.x - WIDTH, centroid.y - HEIGHT),
wv(centroid.x + WIDTH, centroid.y - HEIGHT),
wv(centroid.x + WIDTH, centroid.y + HEIGHT),
wv(centroid.x - WIDTH, centroid.y + HEIGHT),
wv(centroid.x - WIDTH, centroid.y - HEIGHT)]
vec.myprop.color = getIfromRGB([255, 255, 255, 0])
vec.find_minmax()
self._geometry_tables.reset_listogl()
self._geometry_tables.prep_listogl()
[docs]
def hide_all_images(self):
""" Hide all images in the collection. """
for curzone in self._geometry_tables.myzones:
curzone[0].myprop.imagevisible = False
self._geometry_tables.reset_listogl()
for curzone in self._geometry_plots.myzones:
curzone[0].myprop.imagevisible = False
self._geometry_plots.reset_listogl()
[docs]
def check_plot(self):
return super().check_plot()
[docs]
def find_nearest_centroid(self, x: float, y: float, bounds: tuple[float, float, float, float]):
""" Pick the municipality at the given coordinates.
:param x: The x coordinate.
:param y: The y coordinate.
:return: The name of the municipality or an empty string if not found.
"""
centroids = self.find_centroid_in_bounds(bounds)
if not centroids:
return ''
# Find the centroid closest to the given point
closest_centroid = min(centroids, key=lambda c: c[0].distance(Point(x, y)))
return closest_centroid[1]
[docs]
def pick_municipality(self, x: float, y: float, bounds: tuple[float, float, float, float]):
""" Activate plot for the nearest municipality to the given coordinates. """
which = [self.find_nearest_centroid(x, y, bounds)]
for loc_ins in which:
loc_qdf = self._qdf_hydrology[loc_ins]
if loc_qdf.path_image_plot is not None and loc_qdf.path_image_table is not None:
vec_plt = self._get_vector_plots(loc_qdf.code)
vec_tables = self._get_vector_tables(loc_qdf.code)
if vec_plt.myprop.imagevisible or vec_tables.myprop.imagevisible:
vec_plt.myprop.imagevisible = False
vec_tables.myprop.imagevisible = False
else:
vec_plt.myprop.image_path = loc_qdf.path_image_plot
vec_plt.myprop.imagevisible = True
vec_tables.myprop.image_path = loc_qdf.path_image_table
vec_tables.myprop.imagevisible = True
# Reset the OpenGL lists to reflect the changes
self._geometry_plots.reset_listogl()
self._geometry_plots.prep_listogl()
self._geometry_tables.reset_listogl()
self._geometry_tables.prep_listogl()
[docs]
def find_centroids_in_polygon(self, polygon: Polygon) -> list[tuple[vector, str]]:
""" Find all centroids in a given polygon.
:param polygon: A shapely Polygon object defining the area to search.
"""
centroids = []
for centroid, name in self._centroids.items():
if centroid.within(polygon):
centroids.append((centroid, name))
# Sort centroids by distance to the center of the polygon
center_x, center_y = polygon.centroid.x, polygon.centroid.y
dist_to_center = lambda c: ((c.x - center_x) ** 2 + (c.y - center_y) ** 2) ** 0.5
centroids.sort(key=lambda c: dist_to_center(c[0]))
return centroids
[docs]
def find_centroid_in_bounds(self, bounds: tuple[float, float, float, float]) -> list[tuple[vector, str]]:
""" Find all centroids within the given bounds.
:param bounds: A tuple of (minx, miny, maxx, maxy) defining the bounding box.
"""
minx, miny, maxx, maxy = bounds
centroids = []
for centroid, name in self._centroids.items():
if minx <= centroid.x <= maxx and miny <= centroid.y <= maxy:
centroids.append((centroid, name))
dist_to_center = lambda c: ((c.x - (minx + maxx) / 2) ** 2 + (c.y - (miny + maxy) / 2) ** 2) ** 0.5
centroids.sort(key=lambda c: dist_to_center(c[0]))
return centroids
@property
[docs]
def show_plot(self) -> bool:
""" Check if the plot is shown. """
return self._show_plot
@show_plot.setter
def show_plot(self, value: bool):
""" Set whether to show the plot or not. """
self._show_plot = value
if not value:
self.hide_all_images()
self._reload_images = value
@property
[docs]
def show_table(self) -> bool:
""" Check if the table is shown. """
return self._show_table
@show_table.setter
def show_table(self, value: bool):
""" Set whether to show the table or not. """
self._show_table = value
if not value:
self.hide_all_images()
self._reload_images = value
[docs]
def scale_images(self, factor:float = 1.0):
""" Scale the images in the collection by a given factor.
:param factor: The scaling factor to apply to the images.
"""
assert isinstance(factor, (int, float)), "Scaling factor must be a number."
self._geometry_tables.scale_all_pictures(factor)
self._geometry_plots.scale_all_pictures(factor)
[docs]
def plot(self, sx=None, sy=None, xmin=None, ymin=None, xmax=None, ymax=None, size=None):
""" Plot the QDF hydrology data on the map. """
NB_MAX = 10 # Maximum number of images to display
if self.show_plot and self._reload_images:
_new_images = self.find_centroid_in_bounds((xmin, ymin, xmax, ymax))
self._reload_images = False
if len(_new_images) > NB_MAX:
logging.warning(_(f"Too many images to display. Showing only the first {NB_MAX}."))
_new_images = _new_images[:NB_MAX]
if self._current_images is None or len(_new_images) != len(self._current_images):
self.set_images_as_legend(plot_or_table='plot', which=[img[1] for img in _new_images])
elif self.show_table and self._reload_images:
_new_images = self.find_centroid_in_bounds((xmin, ymin, xmax, ymax))
self._reload_images = False
if len(_new_images) > NB_MAX:
logging.warning(_(f"Too many images to display. Showing only the first {NB_MAX}."))
_new_images = _new_images[:NB_MAX]
if self._current_images is None or len(_new_images) != len(self._current_images):
self.set_images_as_legend(plot_or_table='table', which=[img[1] for img in _new_images])
self._geometry_tables.plot(sx=sx, sy=sy, xmin=xmin, ymin=ymin, xmax=xmax, ymax=ymax, size=size)
self._geometry_plots.plot(sx=sx, sy=sy, xmin=xmin, ymin=ymin, xmax=xmax, ymax=ymax, size=size)
self._geometry_zones.plot(sx=sx, sy=sy, xmin=xmin, ymin=ymin, xmax=xmax, ymax=ymax, size=size)