Source code for wolfhece.irm_qdf

"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

#Code INS des communes belges
import numpy as np
from os import path,mkdir
from time import sleep
import pandas as pd
import matplotlib.pyplot as plt
from scipy.optimize import minimize,curve_fit
from scipy.stats import gumbel_r,genextreme
from typing import Literal

from .ins import Localities
from .PyTranslate import _

[docs] Montana_a1 = 'a1'
[docs] Montana_a2 = 'a2'
[docs] Montana_a3 = 'a3'
[docs] Montana_b1 = 'b1'
[docs] Montana_b2 = 'b2'
[docs] Montana_b3 = 'b3'
[docs] RT2 = '2'
[docs] RT5 = '5'
[docs] RT10 = '10'
[docs] RT15 = '15'
[docs] RT20 ='20'
[docs] RT25 ='25'
[docs] RT30 = '30'
[docs] RT40 ='40'
[docs] RT50 ='50'
[docs] RT75 = '75'
[docs] RT100 ='100'
[docs] RT200 = '200'
[docs] RT = [RT2,RT5,RT10,RT15,RT20,RT25,RT30,RT40,RT50,RT75,RT100,RT200]
[docs] freqdep=np.array([1./float(x) for x in RT])
[docs] freqndep=1.-freqdep
[docs] dur10min = '10 min'
[docs] dur20min = '20 min'
[docs] dur30min = '30 min'
[docs] dur1h = '1 h'
[docs] dur2h = '2 h'
[docs] dur3h = '3 h'
[docs] dur6h = '6 h'
[docs] dur12h = '12 h'
[docs] dur1d = '1 d'
[docs] dur2d = '2 d'
[docs] dur3d = '3 d'
[docs] dur4d = '4 d'
[docs] dur5d = '5 d'
[docs] dur7d = '7 d'
[docs] dur10d = '10 d'
[docs] dur15d = '15 d'
[docs] dur20d = '20 d'
[docs] dur25d = '25 d'
[docs] dur30d = '30 d'
[docs] durationstext=[dur10min,dur20min,dur30min,dur1h,dur2h,dur3h,dur6h,dur12h,dur1d, dur2d,dur3d,dur4d,dur5d,dur7d,dur10d,dur15d,dur20d,dur25d,dur30d]
durations=np.array([10,20,30,60,120,180,360,720],np.float64)
[docs] durationsd=np.array([1,2,3,4,5,7,10,15,20,25,30],np.float64)*24.*60.
[docs] durations = np.concatenate([durations,durationsd])
[docs] class MontanaIRM(): def __init__(self,coeff:pd.DataFrame,time_bounds=None) -> None: if time_bounds is None: self.time_bounds = [25,6000] else: self.time_bounds = time_bounds self.coeff=coeff
[docs] def get_ab(self,dur,T): curcoeff = self.coeff.loc[float(T)] if dur<self.time_bounds[0]: a=curcoeff[Montana_a1] b=curcoeff[Montana_b1] elif dur<=self.time_bounds[1]: a=curcoeff[Montana_a2] b=curcoeff[Montana_b2] else: a=curcoeff[Montana_a3] b=curcoeff[Montana_b3] return a,b
[docs] def get_meanrain(self,dur,T,ab=None): if ab is None: ab = self.get_ab(dur,T) return ab[0]*dur**(-ab[1])
[docs] def get_instantrain(self,dur,T,ab=None): if ab is None: ab = self.get_ab(dur,T) meani=self.get_meanrain(dur,T,ab) return (1.-ab[1])*meani
[docs] def get_Q(self,dur,T): rain = self.get_meanrain(dur,T) return rain*dur/60. #to obtains [mm.h^-1] as dur is in [min]
[docs] def get_hyeto(self,durmax,T,r=0.5): """ :param r: Decentration coefficient """ x = np.arange(10,durmax,1,dtype=np.float64) # y = [self.get_instantrain(curx,T) for curx in x] startpeak=durmax*r-5 endpeak=durmax*r+5 if r==1.: xbeforepeak = np.zeros(1) else: xbeforepeak = np.arange(-float(durmax-10)*(1.-r),0,(1.-r)) if r==0.: xafterpeak = endpeak else: xafterpeak = np.arange(0,float(durmax-10)*r,r) xbeforepeak+= startpeak xafterpeak += endpeak x_hyeto = np.concatenate([xbeforepeak, [startpeak,endpeak], xafterpeak]) y_hyeto = np.zeros(len(x_hyeto)) for k in range(len(x_hyeto)): if x_hyeto[k] <= startpeak: y_hyeto[k] = self.get_instantrain((startpeak-x_hyeto[k])/(1.-r)+10,T) else: y_hyeto[k] = self.get_instantrain((x_hyeto[k]-endpeak)/r+10,T) if r==0.: y_hyeto[-1]=0. elif r==1.: y_hyeto[0]=0. return x_hyeto,y_hyeto
[docs] def plot_hyeto(self,durmax,T,r=0.5): x,y = self.get_hyeto(durmax,T,r) fig,ax = plt.subplots(1,1,figsize=[15,10]) ax.plot(x,y,label=_("Hyetogram")) ax.set_xlabel(_('Time [min]')) ax.set_ylabel(_('Intensity [mm/h]')) ax.legend().set_draggable(True) return fig,ax
[docs] def plot_hyetos(self,durmax,r=0.5): fig,ax = plt.subplots(1,1,figsize=[15,10]) for curT in RT: x,y = self.get_hyeto(durmax,curT,r) ax.plot(x,y,label=curT) ax.set_xlabel(_('Time [min]')) ax.set_ylabel(_('Intensity [mm/h]')) ax.legend().set_draggable(True) return fig,ax
[docs] class Qdf_IRM(): """ Gestion des relations QDF calculées par l'IRM Exemple d'utilisation : Pour importer les fichiers depuis le site web de l'IRM meteo.be from wolfhece.irm_qdf import Qdf_IRM qdf = Qdf_IRM(force_import=True) Il est possible de spécifier le répertoire de stockage des fichiers Excel Par défaut, il s'agit d'un sous-répertoire 'irm' du répertoire courant qui sera créé s'il n'exsiste pas Une fois importé/téléchargé, il est possible de charger une commune sur base de l'INS ou de son nom myqdf = Qdf_IRM(name='Jalhay') Les données sont ensuite disponibles dans les propriétés, qui sont des "dataframes" pandas (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) : - qdf : les relation Quantité/durée/fréquence - standarddev : l'écart-type de l'erreur - confintlow : la valeur inférieure de l'intervalle de confiance (-2*stddev) - confintup : la valeur supérieure de l'intervalle de confiance (+2*stddev) - montanacoeff : les coeffciients de Montana Il est par exemple possible d'accéder aux coefficients de Montana via l'une de ces lignes ou une combinaison : display(myqdf.montanacoeff) rt = myqdf.montanacoeff.index display(myqdf.montanacoeff.loc[rt[0]]) display(myqdf.montanacoeff.iloc[0]) display(myqdf.get_Montanacoeff(qdf.RT2)) """ def __init__(self, store_path= 'irm', code:int= 0, name= '', force_import= False, ins:Literal['2018', '2019', '2025', 2018, 2019, 2025] = 2019) -> None: self.myloc = Localities() self.store = store_path if force_import: self.importfromwebsite(store_path) if code !=0: self.ins_read_excel(code=str(code)) self.fit_all() elif name!='': self.ins_read_excel(name=name) self.fit_all() self.montanacoeff=None pass
[docs] def export_allmontana2xls(self): newdf = [] for curcode in self.myloc.get_allcodes(): self.ins_read_excel(code=curcode) if self.montanacoeff is not None: self.montanacoeff['INS'] = [curcode]*12 self.montanacoeff['Name'] = [self.myloc.get_namefromINS(int(curcode))]*12 newdf.append(self.montanacoeff.copy()) self.montanacoeff=None newdf = pd.concat(newdf) newdf.to_excel("allmontana.xlsx")
[docs] def importfromwebsite(self,store_path='irm',verbose=False,waitingtime=.01): """ Import Excel files for all municipalities from the IRM website :param store_path: Where to store the downloaded data. Directory will be created if it doesn't exists. :param verbose: If `True`, will print some progress information. If `False`, will do nothing. If a callable, then will call it with a float in [0, 1]. 0 means nothing downloaded, 1 means everything downloaded. :param waitingtime: How long to wait (in seconds) betwenn the download of each station (will make sure we don't overwhelm IRM's website). """ import requests if not path.exists(store_path): mkdir(store_path) for key,myins in enumerate(self.myloc.inscode2name): #chaîne URL du fichier Excel url="https://www.meteo.be//resources//climatology//climateCity//xls//IDF_table_INS"+str(myins)+".xlsx" #Obtention du fichiers depuis le site web de l'IRM response=requests.get(url) if str(response.content).find("Page not found")==-1 : file=open(path.join(store_path,str(myins)+".xlsx"), 'wb') file.write(response.content) file.close() if verbose: if callable(verbose): verbose(key/len(self.myloc.inscode2name)) else: print(myins) sleep(waitingtime)
[docs] def ins_read_excel(self,code='',name=''): """Lecture des caractéristiques d'une commune depuis le fichier Excel associé au code INS""" import warnings if code !='': loccode=str(code) name = self.myloc.get_namefromINS(int(loccode)) elif name!='': if not name.lower() in self.myloc.insname2code.keys(): return _('Bad name ! - Retry') loccode=str(self.myloc.insname2code[name.lower()]) self.code = loccode self.name = name pathname = path.join(self.store,loccode+".xlsx") with warnings.catch_warnings(record=True): warnings.simplefilter("always") if path.exists(pathname): self.qdf=pd.read_excel(pathname,"Return level",index_col=0,skiprows=range(7),nrows=19,usecols="A:M") self.standarddev=pd.read_excel(pathname,"Standard deviation",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl') self.confintlow=pd.read_excel(pathname,"Conf. interval, lower bound",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl') self.confintup=pd.read_excel(pathname,"Conf. interval, upper bound",index_col=0,skiprows=range(7),nrows=19,usecols="A:M",engine='openpyxl') self.montanacoeff=pd.read_excel(pathname,"Montana coefficients",index_col=0,skiprows=range(11),nrows=12,usecols="A:G",engine='openpyxl') self.montana = MontanaIRM(self.montanacoeff) else: return 'File not present'
[docs] def plot_idf(self,T=None,which='All',color=[27./255.,136./255.,245./255.]): """ Plot IDF relations on a new figure :param T : the return period (based on RT constants) :param which : information to plot - 'Montana' - 'QDFTable' - 'All' """ fig,ax = plt.subplots(1,1,figsize=(15,10)) ax.set_xscale('log') ax.set_yscale('log') if T is None: for k in range(len(RT)): pond = .3+.7*float(k/len(RT)) mycolor = color+[pond] if which=='All' or which=='QDFTable': ax.scatter(durations,self.qdf[RT[k]]/durations*60.,label=RT[k] + _(' QDF Table'),color=mycolor) if which=='All' or which=='Montana': iMontana = [self.montana.get_meanrain(curdur,RT[k]) for curdur in durations] ax.plot(durations,iMontana,label=RT[k] + ' Montana',color=mycolor) else: if which=='All' or which=='QDFTable': ax.scatter(durations,self.qdf[T],label=T+ _(' QDF Table'),color=color) if which=='All' or which=='Montana': iMontana = [self.montana.get_instantrain(curdur,T) for curdur in durations] ax.plot(durations,iMontana,label=T + ' Montana',color=color) ax.legend().set_draggable(True) ax.set_xlabel(_('Duration [min]')) ax.set_ylabel(_('Intensity [mm/h]')) ax.set_xticks(durations) ax.set_xticklabels(durationstext,rotation=45) ax.set_title(self.name + ' - code : ' + str(self.code)) return fig,ax
[docs] def plot_qdf(self,T=None,which='All',color=[27./255.,136./255.,245./255.]): """ Plot QDF relations on a new figure :param T : the return period (based on RT constants) :param which : information to plot - 'Montana' - 'QDFTable' - 'All' """ fig,ax = plt.subplots(1,1,figsize=(15,10)) ax.set_xscale('log') if T is None: for k in range(len(RT)): pond = .3+.7*float(k/len(RT)) mycolor = color+[pond] if which=='All' or which=='QDFTable': ax.scatter(durations,self.qdf[RT[k]],label=RT[k] + _(' QDF Table'),color=mycolor) if which=='All' or which=='Montana': QMontana = [self.montana.get_Q(curdur,RT[k]) for curdur in durations] ax.plot(durations,QMontana,label=RT[k] + ' Montana',color=mycolor) else: if which=='All' or which=='QDFTable': ax.scatter(durations,self.qdf[T],label=T+ _(' QDF Table'),color=color) if which=='All' or which=='Montana': QMontana = [self.montana.get_Q(curdur,T) for curdur in durations] ax.plot(durations,QMontana,label=T + ' Montana',color=color) ax.legend().set_draggable(True) ax.set_xlabel(_('Duration [min]')) ax.set_ylabel(_('Quantity [mm]')) ax.set_xticks(durations) ax.set_xticklabels(durationstext,rotation=45) ax.set_title(self.name + ' - code : ' + str(self.code)) return fig,ax
[docs] def plot_cdf(self,dur=None): fig,ax = plt.subplots(1,1,figsize=(10,10)) if dur is None: for k in range(len(durations)): pond = .3+.7*float(k/len(durations)) mycolor = (27./255.,136./255.,245./255.,pond) ax.scatter(self.qdf.loc[durationstext[k]],freqndep,marker='o',label=durationstext[k],color=mycolor) else: ax.scatter(self.qdf.loc[dur],freqndep,marker='o',label=dur,color=(0,0,1)) ax.legend().set_draggable(True) ax.set_ylabel(_('Cumulative distribution function (cdf)')) ax.set_xlabel(_('Quantity [mm]')) ax.set_title(self.name + ' - code : ' + str(self.code)) return fig,ax
[docs] def fit_all(self): self.popt_all={} self.pcov_all={} for curdur in durationstext: fig,ax,popt,pcov = self.fit_cdf(curdur) self.popt_all[curdur]=popt self.pcov_all[curdur]=pcov
[docs] def fit_cdf(self,dur=None,plot=False): x=np.asarray(self.qdf.loc[dur],dtype=np.float64) def locextreme(x,a,b,c): return genextreme.cdf(x,a,loc=b,scale=c) def locextreme2(a): LL = -np.sum(genextreme.logpdf(x,a[0],loc=a[1],scale=a[2])) return LL popt = genextreme.fit(x) popt,pcov=curve_fit(locextreme,x,freqndep,p0=popt) #ptest = minimize(locextreme2,popt,bounds=[[-10.,0.],[0.,100.],[0.,100.]]) perr = np.sqrt(np.diag(pcov)) fig=ax=None if plot: fig,ax=self.plot_cdf(dur) ax.plot(x,genextreme.cdf(x,popt[0],loc=popt[1],scale=popt[2]),label='fit') # ax.plot(x,genextreme.cdf(x,ptest.x[0],loc=ptest.x[1],scale=ptest.x[2]),label='fit_MLE') ax.legend().set_draggable(True) self.stat = genextreme return fig,ax,popt,pcov
[docs] def get_Tfromrain(self,Q,dur=dur1h): return 1./self.stat.sf(Q,self.popt_all[dur][0],loc=self.popt_all[dur][1],scale=self.popt_all[dur][2])
[docs] def get_rainfromT(self,T,dur=dur1h): return self.stat.isf(1./T,self.popt_all[dur][0],loc=self.popt_all[dur][1],scale=self.popt_all[dur][2])
[docs] def get_MontanacoeffforT(self,return_period): if return_period in RT: return self.montanacoeff.loc[float(return_period)] else: return _('Bad RT - Retry !')
[docs] def plot_hyeto(self,durmax,T,r=.5): fig,ax = self.montana.plot_hyeto(durmax,T,r) ax.set_title(self.name + ' - code : ' + str(self.code)) return fig
[docs] def plot_hyetos(self,durmax,r=.5): fig,ax = self.montana.plot_hyetos(durmax,r) ax.set_title(self.name + ' - code : ' + str(self.code))