"""
EXTREME VALUES ANALYSIS - EVA
@author : Pierre Archambeau - ULiege - HECE
@date : 2023
"""
# extra modules
try:
import numpy as np
import pandas as pd
from scipy import stats
from scipy.interpolate import interp1d
import os
from datetime import datetime as dt, timedelta
import matplotlib.pyplot as plt
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from matplotlib.patches import Rectangle
import matplotlib.colors as mcolors
from distfit import distfit
import typing
except:
raise Exception('Please install "numpy, pandas, scipy, matplotlib, distfit" module via "pip"')
# own modules
try:
from . import bootstrap
from .mixture_models import SeasonMixtureModel
from .joint_models import JointModel
from .hydrogramme_mono import Hydro_HSMF
except:
import bootstrap
from joint_models import JointModel
from mixture_models import SeasonMixtureModel
from hydrogramme_mono import Hydro_HSMF
# global fucntions
[docs]
def change_font_size(ax:Axes, fontsize):
"""Adapte la taille de police d'un graphique"""
for item in ([ax.title, ax.xaxis.label, ax.yaxis.label] +
ax.get_xticklabels() + ax.get_yticklabels()):
item.set_fontsize(fontsize)
[docs]
def set_style(color:typing.Literal['b','g','r','c','m','y','k'],
linestyle:typing.Literal['solid', 'dotted', 'deshed', 'dashdot']='solid',
linewidth=1) -> dict['color':str,'linestyle':str,'linewidth':int]:
return {'color':color, 'linestyle':linestyle, 'linewidth':linewidth}
[docs]
def get_style(style:dict['color':str,'linestyle':str,'linewidth':int]):
try:
color = style['color']
except:
color = 'k'
try:
linestyle = style['linestyle']
except:
linestyle = 'solid'
try:
linewidth = style['linewidth']
except:
linewidth = 2
return color, linestyle, linewidth
[docs]
def cunnane(r,n):
"""
Calcul de la fréquence empirique selon Cunnane(1979)
MAJ de la formulation de Weibull a priori mieux adaptée aux problèmes hydrologiques
r : rang de la donnée 1<=r<=n
n : nombre total de données dans l'échantillon
"""
return ((r-0.4)/(n+0.2))
[docs]
def weibull(r,n):
"""
Calcul de la fréquence empirique selon Weibull - loi "uniforme"
r : rang de la donnée 1<=r<=n
n : nombre total de données dans l'échantillon
"""
return r/(n+1)
[docs]
def filliben(r,n):
"""
Calcul de la fréquence empirique selon Filliben
r : rang de la donnée 1<=r<=n
n : nombre total de données dans l'échantillon
"""
if r==n:
return 0.5**(1/n)
elif r==1:
return 1 - 0.5**(1/n)
else:
return (r - 0.3175) / (n + 0.365)
[docs]
def aic(mle, n, k=2, corrected=True):
"""
The Akaike information criterion (AIC) is a metric that is used to compare the fit of different regression models.
It is calculated as:
AIC = 2K – 2ln(L)
where:
K: The number of model parameters (including loc and scale)
mle = ln(L): The log-likelihood of the model. This tells us how likely the model is, given the data.
https://fr.wikipedia.org/wiki/Crit%C3%A8re_d%27information_d%27Akaike
https://www.statology.org/aic-in-python/
"""
aic = 2*k-2*mle
if corrected:
aic += (2*k*(k+1))/(n-k-1)
return aic
[docs]
def bic(mle, n, k=2):
"""
the Bayesian information criterion (BIC) or Schwarz information criterion (also SIC, SBC, SBIC) is a criterion for model selection among a finite set of models
Models with lower BIC are generally preferred. It is based, in part, on the likelihood function and it is closely related to the Akaike information criterion (AIC).
https://en.wikipedia.org/wiki/Bayesian_information_criterion
https://fr.wikipedia.org/wiki/Crit%C3%A8re_d%27information_bay%C3%A9sien
"""
bic = k * np.log(n) - 2 * mle
return bic
[docs]
def _get_hist_params(X, bins, mhist='numpy'):
"""
Get histogram of original data
"""
histvals, binedges = np.histogram(X, bins=bins, density=True,)
bincenters = (binedges + np.roll(binedges, -1))[:-1] / 2.0
return(bincenters, histvals)
[docs]
def _generate_clusters(exceedances: pd.Series, r: typing.Union[pd.Timedelta, typing.Any]) -> typing.Generator[pd.Series, None, None]:
if not isinstance(r, pd.Timedelta):
try:
r = pd.to_timedelta(r)
except Exception as error:
raise ValueError(f"invalid value in {r} for the 'r' argument") from error
# Locate clusters separated by gaps not smaller than `r`
gap_indices = np.argwhere(
(exceedances.index[1:] - exceedances.index[:-1]) > r
).flatten()
if len(gap_indices) == 0:
# All exceedances fall within the same cluster
yield exceedances
else:
for i, gap_index in enumerate(gap_indices):
if i == 0:
# First cluster contains all values left from the gap
yield exceedances.iloc[: gap_index + 1]
else:
# Other clusters contain values between previous and current gaps
yield exceedances.iloc[gap_indices[i - 1] + 1 : gap_index + 1]
# Last cluster contains all values right from the last gap
yield exceedances.iloc[gap_indices[-1] + 1 :]
# Constants
[docs]
MONTHS_WINTER = [10,11,12,1,2,3]
[docs]
MONTS_SUMMER = [4,5,6,7,8,9]
[docs]
SEASONS = ['annual', 'winter', 'summer']
[docs]
SEASONS_MARKERS = ['o', '*', 'x', '2']
[docs]
SEASONS_COLORS = ['r', 'g', 'b', 'c']
[docs]
SEASONS_ALL = SEASONS + ['mixture', 'joint']
[docs]
RETURN_PERIOD_ESTIMATE = np.asarray([1.005, 1.0101, 1.0204, 1.111, 1.25, 1.4286, 2, 3, 5, 10, 15, 20, 25, 30, 40, 50, 60, 70, 80, 90, 100, 125, 150, 200, 300, 400, 500, 600, 750, 1000], dtype=np.float64)
[docs]
INTERV_CI = [60, 75, 90, 95]
[docs]
def sanitize_seasons(seasons = None, all=False):
if all:
def_values = SEASONS_ALL
else:
def_values = SEASONS
if seasons is None:
seasons = def_values
else:
if isinstance(seasons,list):
pass
elif seasons in def_values:
seasons = [seasons]
else:
raise Exception('Bad season ! - retry')
return seasons
# Liste des lois à tester
# 2, 3 ou 4 paramétres
# fonction, couleur et type de trait, nombre de points de départ différents à tester
[docs]
dists_4params = {'Johnsonsu':[stats.johnsonsu,'k--',-1]}
[docs]
dists_3params = {'GEV':[stats.genextreme,'c',-1], #contains Gumbel, Weibull, Fréchet
'Weibull':[stats.weibull_min,'r',0], #special case of GEV (if c==1 --> exponential, c==2 --> Rayleigh)
'Fréchet':[stats.invweibull,'r--',0], #special case of GEV
# 'Weibull inverse cdf':[stats.invweibull,'k--',0] #== Fréchet
'Log normale':[stats.lognorm,'k',-1],
'Gamma':[stats.gamma,'y',0],
# 'GenGamma cdf':[stats.gengamma,'c--',0],
'Loggamma':[stats.loggamma,'y--',0],
'Pearson3':[stats.pearson3,'m--',-1], #== Gamma
'Gamma inverse':[stats.invgamma,'m',-1],
}
[docs]
dists_2params = {'Gumbel':[stats.gumbel_r,'b',-1], #special case of GEV with c=0
'Normal':[stats.norm,'g',0],
'Gibrat':[stats.gibrat,'orange',0], #special case of lognorm with s=1
'Exponentielle':[stats.expon,'b--',0]} #special case of Weibull with c==1 or genpareto with c==0
[docs]
LAWS_FULL = list(dists_2params.keys()) +list(dists_3params.keys()) + list(dists_4params.keys())
[docs]
LAWS_POPULAR = ['GEV', 'Log normale', 'Gumbel', 'Exponentielle', 'Pearson3', 'Gamma inverse', 'Weibull', 'Gamma', 'Johnsonsu']
[docs]
class EVA_Serie:
"""
One time serie
"""
def __init__(self, data, datetime_index=None, datetime_format="%Y%m%d%H%M%S", data_headers=("#DateHeure", "Debit"), duration=1, startmonth_winter = 10, startmonth_summer=4, hydrol=True, verbose=True) -> None:
self.duration = duration
self.startmonth_winter = startmonth_winter
self.startmonth_summer = startmonth_summer
self.hydrological_year = hydrol
self.best = None
head_date, head_q = data_headers
self.index_NaN = []
self.index_NA = []
if isinstance(data,pd.Series):
self.data = pd.to_numeric(data, 'coerce')
data.dropna(how='all', inplace=True)
elif isinstance(data,np.ndarray):
self.data = pd.to_numeric(pd.Series(data), 'coerce')
self.data.index=datetime_index
self.data.dropna(how='all', inplace=True)
elif isinstance(data,str):
# récupération du fichier CSV
# ouverture du fichier en entrée et création de la matrice de base --> Datetime, Debit
# soit :
# custom_date_parser = lambda x: datetime.strptime(x, "%Y%m%d%H%M%S")
# df=pd.read_csv(fichier_in,sep=";",header=0, parse_dates=['#DateHeure'],date_parser=custom_date_parser,na_values=-777.) # Note PA : Normalement plus rapide à lire que l'Excel
# soit : (un peu plus rapide !)
df = pd.read_csv(data,sep=";",header=0) # Note PA : Normalement plus rapide à lire que l'Excel
df.dropna(how='all', inplace=True)
if head_date in df.columns and head_q in df.columns:
df[head_date] = pd.to_datetime(df[head_date],format=datetime_format)
elif 'Timestamp' in df.columns and 'Value' in df.columns:
head_date, head_q = ('Timestamp','Value')
df[head_date] = pd.to_datetime(df[head_date],format=datetime_format)
else:
raise RuntimeError('Please check your headers {}, {}'.format(head_date,head_q))
#Suppression des dates incorrectes
df.drop(index=np.where(pd.isnull(df[head_date]))[0], inplace=True)
# conversion en time series pandas
self.data = df.set_index(head_date)[head_q]
self.index_NaN = np.where(self.data=="#VALEUR!")[0]
self.index_NA = np.where(self.data!=self.data)[0]
self.data = pd.to_numeric(df.set_index(head_date)[head_q], 'coerce')
# Aprés lecture, le champ d'index '#DateHeure'est un objet datetime assez facile à manipuler --> .year, .day, .month, .hour, ...
# ou à filtrer!
# exemple : self.data[self.data.index >= '2020-09-01'] --> Tous les débits au-delà du 2020/09/01
# exemple : self.data[(self.data.index >= '2020-09-01') & (self.data.index < '2021-01-01')] --> Tous les débits entre 2 dates
else:
raise Exception('Format non supported -- Please code your importing filter')
self.check_dupliactes()
self.nb = self.data.shape[0]
self.index_isnan = np.where(np.isnan(self.data))[0]
self.index_neg = np.where(self.data<0.)[0]
self.index_zero = np.where(self.data==0.)[0]
self.nodata = {'negative':self.index_neg, 'nan': self.index_isnan, "null" : self.index_zero, 'nan_excel': self.index_NaN, 'na_excel': self.index_NA}
self.nbneg = self.index_neg.shape[0]
self.nbzero = self.index_zero.shape[0]
try:
# gestion de ce cas spécial d'un NaN d'Excel
self.nbnan_excel = self.index_NaN.shape[0]
except:
self.nbnan_excel = 0
try:
# gestion de ce cas spécial d'un #N/A d'Excel
self.nbNA_excel = self.index_NA.shape[0]
except:
self.nbNA_excel = 0
self.nbnan = self.index_isnan.shape[0] - self.nbnan_excel - self.nbNA_excel
self.years_bounds = [np.min(self.data.index.year), np.max(self.data.index.year)]
if verbose:
print(self.duration,'heures')
print('How many zero values ? {}'.format(self.nbzero))
print('How many negative values ? {}'.format(self.nbneg))
print('How many NaN values ? {}'.format(self.nbnan))
print('How many #VALEUR! values ? {}'.format(self.nbnan_excel))
print('How many #N/A values ? {}'.format(self.nbNA_excel))
print()
print('Starting year : {}'.format(self.years_bounds[0]))
print('Ending year : {}'.format(self.years_bounds[-1]))
self.filter()
self.maxima={}
self.pot={}
self.tz_info = self.data.index[0].tz
[docs]
def check_dupliactes(self):
"""
Check if there are duplicates in the data
"""
if not self.data.index.is_unique:
print('****\n')
print('Non unique index detected - removing duplicates - keep first value\n')
print(self.data.index[self.data.index.duplicated()])
print('****\n\n')
self.data = self.data[~self.data.index.duplicated(keep='first')]
[docs]
def filter(self):
"""
Set zero, negative, NaN, Nat to np.nan
"""
for curindex in self.nodata.values():
if len(curindex)>0:
self.data.iloc[curindex]=np.nan
[docs]
def get_date_max(self, year, seasons=None) -> list:
"""
Récupère la date du maximum de l'année "year"
Possibilité de traiter plusieurs saisons
"""
seasons = sanitize_seasons(seasons)
resdates = []
for curseason in seasons:
if not 'water_year' in self.maxima[curseason].keys():
self.find_maxima(self.hydrological_year)
idx = self.maxima[curseason]['water_year'].index(dt(year,1,1, tzinfo=self.tz_info))
resdates.append((self.maxima[curseason]['date'][idx], curseason))
return resdates
[docs]
def get_one_maxevent(self, year, seasons=None) -> list:
"""
Récupération de l'événement max pour une année spécifique
Possibilité de traiter plusieurs saisons
"""
seasons = sanitize_seasons(seasons)
resevents = []
for curseason in seasons:
if not 'water_year' in self.maxima[curseason].keys():
self.find_maxima(self.hydrological_year)
if not 'events' in self.maxima[curseason].keys():
self.extract_maxevents()
if dt(year,1,1, tzinfo=self.tz_info) in self.maxima[curseason]['water_year']:
idx = self.maxima[curseason]['water_year'].index(dt(year,1,1, tzinfo=self.tz_info))
curevent = self.maxima[curseason]['events'][year]
date = self.maxima[curseason]['date'][idx]
resevents.append( (date, curevent, idx, curseason))
return resevents
[docs]
def get_nb_maxevents(self, nbevents:int, seasons=None) -> dict[str, pd.DataFrame]:
"""Récupère les nbevents lus grands pics de crue"""
seasons = sanitize_seasons(seasons)
max_nb = {}
for curseason in seasons:
maxima = pd.DataFrame({'date':self.maxima[curseason]['date'].copy(),
'value':self.maxima[curseason]['maxval'].copy()})
maxima.set_index('date')
maxima.sort_values(['value'], inplace=True)
max_nb[curseason] = maxima.iloc[-nbevents:]
return max_nb
[docs]
def get_nb_max_hydro(self, nbevents:int, seasons=None) -> dict[str, pd.DataFrame]:
"""Récupère les nbevents plus grands hydrogrammes de crue"""
seasons = sanitize_seasons(seasons)
ret = {}
for curseason in seasons:
if not 'water_year' in self.maxima[curseason].keys():
self.find_maxima(self.hydrological_year)
if not 'events' in self.maxima[curseason].keys():
self.extract_maxevents()
dates = self.maxima[curseason]['date']
water_year = self.maxima[curseason]['water_year']
curdict = self.maxima[curseason]['events']
hydro = pd.DataFrame({'date':water_year,
'hydro':[val for val in curdict.values()],
'max':[val.max() for val in curdict.values()]})
hydro.sort_values('max', inplace=True)
ret[curseason] = hydro.iloc[-nbevents:]
return ret
[docs]
def save_max_events(self, filename, years_bounds:list=None, seasons=None):
"""
Enregistrement des crues maximales dans un fichier CSV
"""
seasons = sanitize_seasons(seasons)
if years_bounds is None:
year_begin = self.years_bounds[0]
year_end = self.years_bounds[-1]
else:
year_begin = years_bounds[0]
year_end = years_bounds[-1]
# writer = pd.ExcelWriter(filename, engine="xlsxwriter")
for curseason in seasons:
res = {}
for curyear in np.arange(year_begin, year_end+1):
ret = self.get_one_maxevent(curyear, curseason) # récupération du timeseries
if len(ret)>0:
res[curyear] = ret[0][1]
df= pd.DataFrame.from_dict(res) #, 'index')
# df.to_excel(writer, sheet_name=curseason)
df.to_csv(filename, sep=";")
# writer.close()
[docs]
def _select_dict(self,method:typing.Literal['BM', 'POT'] ='BM'):
if method=='BM':
return self.maxima
elif method=='POT':
return self.pot
[docs]
def _get_dates(self, year=None, hydrol=True, season=None):
"""
Compute date for current year and season
Args:
year (integer, optional): If None, the complete interval will be used. Defaults to None.
hydrol (bool, optional): Hydrological year or not. Defaults to True.
season ('annual', 'winter' or 'summer', optional): None == 'annual'. Defaults to None.
"""
if year is None:
start_date = dt(self.years_bounds[0] , 1, 1, tzinfo=self.tz_info)
end_date = dt(self.years_bounds[1]+1, 1, 1, tzinfo=self.tz_info)
else:
if hydrol:
if season is None or season.lower() == 'annual':
start_date = dt(year-1,10,1, tzinfo=self.tz_info)
end_date = dt(year,10,1, tzinfo=self.tz_info)
elif season.lower() == 'winter' or season.lower() == 'hiver':
start_date = dt(year-1,self.startmonth_winter,1, tzinfo=self.tz_info)
end_date = dt(year,self.startmonth_summer,1, tzinfo=self.tz_info)
elif season.lower() == 'summer' or season.lower() == 'ete' or season.lower() == 'été':
start_date = dt(year,self.startmonth_summer,1, tzinfo=self.tz_info)
end_date = dt(year,self.startmonth_winter,1, tzinfo=self.tz_info)
else:
raise Exception('Bad season !! - retry')
else:
start_date = dt(year,1,1, tzinfo=self.tz_info)
end_date = dt(year+1,1,1, tzinfo=self.tz_info)
return start_date, end_date
[docs]
def _get_max(self,
year=None,
hydrol=True,
season=None,
method: typing.Literal['BM', 'POT'] ='BM',
threshold=0.,
r: typing.Union[pd.Timedelta, typing.Any] = "24H"):
"""
Find maxima in specific period
"""
start_date, end_date = self._get_dates(year, hydrol, season)
df = self.data[(self.data.index>=start_date) & (self.data.index<end_date)]
if method=='BM':
if len(df)>0:
idx = df.argmax()
return [df.index[idx], df.iloc[idx]]
else:
return [None, np.nan]
elif method =='POT':
# Get exceedances
exceedances = df.loc[df.values > threshold]
# Locate clusters separated by gaps not smaller than `r`
# and select min or max (depending on `extremes_type`) within each cluster
extreme_dates, extreme_values = [], []
for cluster in _generate_clusters(exceedances=exceedances, r=r):
extreme_dates.append(cluster.idxmax())
extreme_values.append(cluster.loc[cluster.idxmax()])
return [extreme_dates, extreme_values]
[docs]
def get_data_for_one_season(self, method:typing.Literal['BM', 'POT'] ='BM', season=None):
if season is None:
return None
else:
try:
return self._select_dict(method)[season]
except:
return None
[docs]
def get_fit_one_season(self, method:typing.Literal['BM', 'POT'] ='BM', season=None):
if season is None:
return None
else:
try:
return self._select_dict(method)[season]['fit']
except:
return None
[docs]
def save_fit_one_season(self, filename:str, method:typing.Literal['BM', 'POT'] ='BM', season=None, fit_method='MLE', sep=';'):
df = pd.DataFrame.from_dict(self.get_fit_one_season(method='BM', season=season)[fit_method], orient='index')
df.drop('func', inplace=True, axis=1)
df.to_csv(filename, sep=sep)
[docs]
def find_maxima(self, hydrol=True, excluded_years=[], method:typing.Literal['BM', 'POT']='BM', threshold: float=0., r: typing.Union[pd.Timedelta, typing.Any] = "24H", verbose=True):
"""
Find maxima in whole data
"""
self.hydrological_year = hydrol
curdict = self._select_dict(method)
if method=='POT':
self.pot_threshold = threshold
self.pot_r = r
for cur in SEASONS:
curdict[cur]={}
curdict[cur]['date'] = []
curdict[cur]['water_year'] = []
curdict[cur]['maxval'] = []
curdict[cur]['serie'] = None
for cur in SEASONS:
for curyear in range(self.years_bounds[0],self.years_bounds[1]+1):
if not curyear in excluded_years:
date,val = self._get_max(curyear,hydrol,season=cur,method=method , threshold=threshold, r=r)
if method=='BM':
if not np.isnan(val):
curdict[cur]['date'].append(date)
curdict[cur]['water_year'].append(dt(curyear,1,1, tzinfo=self.tz_info))
curdict[cur]['maxval'].append(val)
else:
if verbose:
print('NaN value encountered - season : {} - year : {}'.format(cur,curyear))
elif method=='POT':
if not val==[]:
curdict[cur]['date']+= date
curdict[cur]['water_year']+= [dt(curyear,1,1, tzinfo=self.tz_info)]*len(date)
curdict[cur]['maxval']+=val
else:
if verbose:
print('Void value encountered - season : {} - year : {}'.format(cur,curyear))
for cur in SEASONS:
curdict[cur]['maxval'] = np.asarray(curdict[cur]['maxval'])
df = pd.DataFrame(curdict[cur]['maxval'])
df.index=curdict[cur]['date']
curdict[cur]['serie'] = df.squeeze()
[docs]
def sort_maxima(self, seasons=None):
"""
Sort maxima in increasing order to calculate empirical frequency
"""
seasons = sanitize_seasons(seasons)
for curseason in seasons:
self.maxima[curseason]['sorted_maxval'] = sorted(self.maxima[curseason]['maxval'])
[docs]
def _fit_multistarts(self, data, fdist, nb=0, method='MLE'):
"""
Fit de la loi en adaptant éventuellement le point de départ
Ceci n'est sans doute réellement utile que pour la loi de Weibull
"""
dist:stats.rv_continuous
dist,res=fdist
if res is None:
# Pas de points de départ fournis
if nb==0:
nb=5
if dist is stats.genextreme :
# la loi de Gumbel étant un cas particulier de la GEV, on initialise les paramétres sur base de ce fit
res = stats.gumbel_r.fit(data,method=method)
res = dist.fit(data,0.,loc=res[0],scale=res[1],method=method)
elif dist is stats.weibull_min :
# la loi de Weibull étant un cas particulier de la GEV
res = stats.gumbel_r.fit(data,method=method)
res = dist.fit(data,0.,loc=res[0],scale=res[1],method=method)
res = dist.fit(data,1.,loc=res[0],scale=res[1],method=method)
elif dist is stats.gamma or dist is stats.gengamma:
res = stats.pearson3.fit(data,method=method)
res = dist.fit(data,res[0],loc=res[1],scale=res[2],method=method)
elif dist is stats.lognorm:
res = stats.gibrat.fit(data,method=method)
res = dist.fit(data,0.,loc=res[0],scale=res[1],method=method)
else:
res = dist.fit(data,method=method)
else:
if dist.numargs>0:
res = dist.fit(data,res[0],loc=res[1],scale=res[2],method=method)
else:
res = dist.fit(data,loc=res[0],scale=res[1],method=method)
mle = -dist.nnlf(res,data)
resul=(mle,res)
if nb>0:
mlemax=mle
all=[]
if dist.numargs==1:
loc = np.random.normal(res[1],10,nb)
scale = np.random.normal(res[2],5,nb)
skew = np.random.normal(res[0],1,nb)
for curloc in loc:
for curscal in scale:
for curskew in skew:
try:
res = dist.fit(data,curskew,loc=curloc,scale=curscal,method=method)
mle = dist.nnlf(res,data)
if mle is not np.inf:
all.append((mle,res))
if mle < mlemax:
mlemax=mle
resul=(mle,res)
except:
pass
else:
loc = np.random.normal(res[0],10,nb)
scale = np.random.normal(res[1],5,nb)
for curloc in loc:
for curscal in scale:
res = dist.fit(data,loc=curloc,scale=curscal,method=method)
mle = dist.nnlf(res,data)
if mle is not np.inf:
all.append((mle,res))
if mle < mlemax:
mlemax=mle
resul=(mle,res)
return resul
[docs]
def _evaluate_ci(self, data, fdist, nboot=500, method='MLE'):
"""
Evaluate confidence interval based on bootstrap method
REMARK : CI for MLE can be analyzed by Fisher coefficients
"""
p_cum = 1.-1./RETURN_PERIOD_ESTIMATE
# Intervalles de confiance pour la distribution dist
dist:stats.rv_continuous
dist,res=fdist
# recherche des intervalles de confiance par Bootstraping
data_boot = bootstrap.Bootstrap(data)
data_boot.generate(nboot)
all_mle=[]
all_params=[]
for curdata in data_boot.series:
mle, res = self._fit_multistarts(curdata,fdist,-1,method)
all_mle.append(mle)
all_params.append(res)
ic = {}
ic['mle'] = all_mle
ic['params'] = all_params
ic['sup'] = {}
ic['inf'] = {}
for idx,curinterv in enumerate(INTERV_CI):
ic['inf'][curinterv] = []
ic['sup'][curinterv] = []
for curp in p_cum:
if dist.numargs>0:
ic['funcs'] = [dist(curres[0],loc=curres[1],scale=curres[2]) for curres in all_params]
else:
ic['funcs'] = [dist(loc=curres[0],scale=curres[1]) for curres in all_params]
val = [curfunc.ppf(curp) for curfunc in ic['funcs']]
bounds = [ (100-curinterv)/2./100. for curinterv in INTERV_CI]
bounds = bounds + [ 1.- ((100-curinterv)/2./100.) for curinterv in INTERV_CI]
quant=np.quantile(val,bounds)
nb = len(INTERV_CI)
for idx,curinterv in enumerate(INTERV_CI):
ic['inf'][curinterv].append(quant[idx])
ic['sup'][curinterv].append(quant[idx+nb])
return ic
[docs]
def get_fitted_params(self, season=None, law='GEV', method='MLE'):
if season is None:
return None
try:
return self.maxima[season]['fit'][method][law]['params']
except:
return None
[docs]
def get_T_from_q(self,
q:list,
season:str='best',
law:str='best',
method:str='MLE',
ic=False) -> pd.DataFrame:
try:
if law == 'best':
lawfit = self.best
else:
try:
lawfit = self.maxima[season]['fit'][method][law]['func']
except:
lawfit = None
except:
lawfit = None
if lawfit is None:
raise NameError('Bad law -- retry!')
if not isinstance(q,list):
q=list(q)
rp = {curq : {'mean':1./(1.-lawfit.cdf(curq))} for curq in q}
if ic and law!='best':
dictm = self.maxima[season]['fit'][method]
dictlaw = dictm[law]
if 'ic' in dictlaw.keys():
dictic_sup = dictlaw['ic']['sup']
dictic_inf = dictlaw['ic']['inf']
for idx, curic in enumerate(dictic_inf):
c_inf = dictic_inf[curic]
c_sup = dictic_sup[curic]
finf = interp1d(c_inf, RETURN_PERIOD_ESTIMATE)
fsup = interp1d(c_sup, RETURN_PERIOD_ESTIMATE)
for curq in q:
try:
rp[curq][str(curic)+'sup'] = float(fsup(curq))
except:
rp[curq][str(curic)+'sup'] = None
try:
rp[curq][str(curic)+'inf'] = float(finf(curq))
except:
rp[curq][str(curic)+'inf'] = None
df= pd.DataFrame.from_dict(rp, 'index')
return df
[docs]
def get_q_from_T(self,
return_periods:list=[5,10,15,20,25,50,75,100,200,500,1000],
season:str='best',
law:str='best',
method:str='MLE',
ic=False) -> pd.DataFrame:
try:
if law == 'best':
lawfit = self.best
else:
try:
lawfit = self.maxima[season]['fit'][method][law]['func']
except:
lawfit = None
except:
raise NameError('Bad law -- retry!')
if lawfit is None:
return None
if not isinstance(return_periods,list):
return_periods=list(return_periods)
q = {curT : {'mean_value':lawfit.ppf(1.-1./float(curT))} for curT in return_periods}
if ic and law!='best':
dictm = self.maxima[season]['fit'][method]
dictlaw = dictm[law]
if 'ic' in dictlaw.keys():
dictic_sup = dictlaw['ic']['sup']
dictic_inf = dictlaw['ic']['inf']
for idx, curic in enumerate(dictic_inf):
c_inf = dictic_inf[curic]
c_sup = dictic_sup[curic]
finf = interp1d(RETURN_PERIOD_ESTIMATE, c_inf)
fsup = interp1d(RETURN_PERIOD_ESTIMATE, c_sup)
for curT in return_periods:
# q[curT][str(curic)+'sup'] = float(fsup(curT))
# q[curT][str(curic)+'inf'] = float(finf(curT))
try:
q[curT][str(curic)+'sup'] = float(fsup(curT))
except:
q[curT][str(curic)+'sup'] = None
try:
q[curT][str(curic)+'inf'] = float(finf(curT))
except:
q[curT][str(curic)+'inf'] = None
df= pd.DataFrame.from_dict(q, 'index',)
return df
[docs]
def save_q_from_T(self,
filename:str='',
return_periods=[5,10,15,20,25,50,75,100,200,500,1000],
season='best',
law='best',
method='MLE',
ic=False):
q = self.get_q_from_T(return_periods, season, law, method, ic)
if '.csv' in filename:
q.to_csv(filename, sep=";")
elif '.xlsx' in filename:
q.to_excel(filename)
else:
q.to_excel('q_from_t.xlsx')
[docs]
def save_T_from_q(self,
filename:str='',
q=[5,10,15,20,25,50,75,100,200,500,1000],
season='best',
law='best',
method='MLE',
ic=False):
rp = self.get_T_from_q(q, season, law, method, ic)
if '.csv' in filename:
rp.to_csv(filename)
elif '.xlsx' in filename:
rp.to_excel(filename)
else:
rp.to_excel('q_from_t.xlsx')
[docs]
def fit(self, seasons=None, laws=['GEV'], init_EVA=None, methods=['MLE'], ic=False, nboot=100, verbose=True):
"""
Fitting of selected laws
"""
seasons = sanitize_seasons(seasons)
if laws == 'popular':
laws = LAWS_POPULAR
elif isinstance(laws,str):
laws=[laws]
for curseason in seasons:
data = self.maxima[curseason]['maxval']
if not 'fit' in self.maxima[curseason].keys():
self.maxima[curseason]['fit']={}
curdict = self.maxima[curseason]['fit']
for curmethod in methods:
if not curmethod in curdict.keys():
curdict[curmethod]={}
curdictm = curdict[curmethod]
for curlaw in laws:
if not curlaw in curdictm.keys():
curdictm[curlaw]={}
resdict = curdictm[curlaw]
if curlaw in dists_2params.keys():
val = dists_2params[curlaw]
if verbose:
print(self.duration,curseason,curlaw,curmethod)
dist:stats.rv_continuous
dist, color, nb =val
resini = None
if init_EVA is not None:
resini = init_EVA.get_fitted_params(curseason, curlaw, curmethod)
mle,res = self._fit_multistarts(data,(dist,resini),nb,curmethod)
resdict['mle']=mle
resdict['params']=res
resdict['func']=dist(loc=res[0],scale=res[1])
resdict['aic'] = aic(mle,len(data),2,False)
resdict['aicc'] = aic(mle,len(data),2,True)
resdict['bic'] = bic(mle,len(data),2)
resdict['rss'] = rss(data,resdict['func'].pdf)
resdict['color'] = color
if ic:
resdict['ic'] = self._evaluate_ci(data,(dist,res),nboot,curmethod)
elif curlaw in dists_3params.keys():
val = dists_3params[curlaw]
if verbose:
print(self.duration,curseason,curlaw,curmethod)
dist:stats.rv_continuous
dist, color, nb =val
try:
resini = None
if init_EVA is not None:
resini = init_EVA.get_fitted_params(curseason, curlaw, curmethod)
mle,res = self._fit_multistarts(data,(dist,resini),nb,curmethod)
resdict['mle']=mle
resdict['params']=res
resdict['func']=dist(res[0],loc=res[1],scale=res[2])
resdict['aic'] = aic(mle,len(data),3,False)
resdict['aicc'] = aic(mle,len(data),3,True)
resdict['bic'] = bic(mle,len(data),3)
resdict['rss'] = rss(data,resdict['func'].pdf)
resdict['color'] = color
if ic:
resdict['ic'] = self._evaluate_ci(data,(dist,res),nboot,curmethod)
except:
pass
elif curlaw in dists_4params.keys():
val = dists_4params[curlaw]
if verbose:
print(self.duration,curseason,curlaw,curmethod)
dist:stats.rv_continuous
dist, color, nb =val
try:
resini = None
if init_EVA is not None:
resini = init_EVA.get_fitted_params(curseason, curlaw, curmethod)
mle,res = self._fit_multistarts(data,(dist,resini),nb,curmethod)
resdict['mle']=mle
resdict['params']=res
resdict['func']=dist(loc=res[2],scale=res[3],*res[0:2])
resdict['aic'] = aic(mle,len(data),4,False)
resdict['aicc'] = aic(mle,len(data),4,True)
resdict['bic'] = bic(mle,len(data),4)
resdict['rss'] = rss(data,resdict['func'].pdf)
resdict['color'] = color
if ic:
resdict['ic'] = self._evaluate_ci(data,(dist,res),nboot,curmethod)
except:
pass
[docs]
def select_best_func(self, season, law, method):
"""
Select one function as best fit
"""
try:
self.best = self.maxima[season]['fit'][method][law]['func']
except:
self.best = None
[docs]
def distfit(self, seasons=None, laws='popular', smooth=0, bins=100):
"""
Laws can be 'full', 'popular'
***
Be carefull, fit of law is based on default scipy.stats parames. In our implementation, non-default initial conditions are used for specific laws, including GEV.
***
https://erdogant.github.io/distfit/pages/html/Parametric.html
https://erdogant.github.io/distfit
"""
seasons = sanitize_seasons(seasons)
for curseason in seasons:
data = self.maxima[curseason]['maxval']
if not 'distfit' in self.maxima[curseason].keys():
self.maxima[curseason]['distfit']={}
curdict = self.maxima[curseason]['distfit']
if smooth>0:
curdict['func'] = dist = distfit(distr=laws,smooth=smooth, bins=bins) # Initialize
else:
curdict['func'] = dist = distfit(distr=laws, bins=bins) # Initialize
dist.fit_transform(data) # Fit distributions on empirical data X
[docs]
def set_mixture(self, laws=['GEV'], methods=['MLE']):
"""
Set mixture model based on "50-50" "winter-summer" fitted models
"""
if laws == 'popular':
laws = LAWS_POPULAR
elif isinstance(laws,str):
laws=[laws]
for curmethod in methods:
if ('winter' in self.maxima.keys()) and ('summer' in self.maxima.keys()):
if ('fit' in self.maxima['winter'].keys()) and ('fit' in self.maxima['summer'].keys()):
if (curmethod in self.maxima['winter']['fit'].keys()) and (curmethod in self.maxima['summer']['fit'].keys()):
dict_mix = self.maxima['mixture'] = {}
dict_mixf = dict_mix['fit'] = {}
dict_mixm = dict_mixf[curmethod] = {}
dictm_w = self.maxima['winter']['fit'][curmethod]
dictm_s = self.maxima['summer']['fit'][curmethod]
for curlaw in laws:
if (curlaw in dictm_w.keys()) and (curlaw in dictm_s.keys()):
if ('func' in dictm_w[curlaw].keys()) and ('func' in dictm_s[curlaw].keys()):
func_w = dictm_w[curlaw]['func']
func_s = dictm_s[curlaw]['func']
dict_mixr = dict_mixm[curlaw] = {}
dict_mixr['func'] = SeasonMixtureModel([func_w,func_s],[0.5,0.5])
dict_mixr['color'] = 'k'
else:
print('The desired law {} is not fitted for all seasons'.format(curlaw))
else:
raise Warning('Fit winter and summer before set_mixture')
[docs]
def set_joint(self, laws=['GEV'], methods=['MLE']):
"""
Set joint model based on product probability of fitted models
"""
if laws == 'popular':
laws = LAWS_POPULAR
elif isinstance(laws,str):
laws=[laws]
for curmethod in methods:
if ('winter' in self.maxima.keys()) and ('summer' in self.maxima.keys()):
if ('fit' in self.maxima['winter'].keys()) and ('fit' in self.maxima['summer'].keys()):
if (curmethod in self.maxima['winter']['fit'].keys()) and (curmethod in self.maxima['summer']['fit'].keys()):
dict_mix = self.maxima['joint'] = {}
dict_mixf = dict_mix['fit'] = {}
dict_mixm = dict_mixf[curmethod] = {}
dictm_w = self.maxima['winter']['fit'][curmethod]
dictm_s = self.maxima['summer']['fit'][curmethod]
for curlaw in laws:
if (curlaw in dictm_w.keys()) and (curlaw in dictm_s.keys()):
if ('func' in dictm_w[curlaw].keys()) and ('func' in dictm_s[curlaw].keys()):
func_w = dictm_w[curlaw]['func']
func_s = dictm_s[curlaw]['func']
dict_mixr = dict_mixm[curlaw] = {}
dict_mixr['func'] = JointModel([func_w,func_s])
dict_mixr['color'] = 'k'
else:
print('The desired law {} is not fitted for all seasons'.format(curlaw))
else:
raise Warning('Fit winter and summer before set_joint')
[docs]
def set_empfreq(self, seasons=None):
"""
Compute empirical frequency based on sorted maxima
"""
seasons = sanitize_seasons(seasons)
for curseason in seasons:
if not 'sorted_maxval' in self.maxima[curseason].keys():
self.sort_maxima(curseason)
nb = len(self.maxima[curseason]['maxval'])
curdict=self.maxima[curseason]['Empirical_frequency']={}
curdict['Cunnane'] = np.asarray([cunnane(i+1,nb) for i in range(nb)])
curdict['Weibull'] = np.asarray([weibull(i+1,nb) for i in range(nb)])
curdict['Filliben'] = np.asarray([filliben(i+1,nb) for i in range(nb)])
[docs]
def plot_classified_flowrate_curve(self, fig:Figure=None, ax:Axes=None, label='', textvalues=True, show=False) -> tuple[Figure,Axes]:
if ax is None:
fig,ax = plt.subplots(1,1)
sorteddata = self.data[~(np.isnan(self.data))].copy().sort_values()
nb = len(sorteddata)
f = (np.arange(nb)+1)/nb
resdict = {}
resdict['mean discharge'] = sorteddata.to_numpy(copy=True)
resdict['duration'] = self.duration
resdict['frequency of non exceedance'] = f
resdict['frequency of exceedance'] = 1-f
chardict = resdict['characteristics'] = {}
f = (1-f)*365
curdict = chardict['DCE'] = {}
curdict['help'] = 'Débit journalier dépassé en moyenne 355 jours par an'
curdict['x'] = 355
curdict['value'] = np.quantile(sorteddata, 1-355/365)
curdict = chardict['DC10'] = {}
curdict['help'] = 'Débit journalier dépassé en moyenne 10 mois par an'
curdict['x'] = 300
curdict['value'] = np.quantile(sorteddata, 1-300/365)
curdict = chardict['Q347'] = {}
curdict['help'] = 'Débit atteint ou dépassé, en moyenne, pendant 347 jours par an'
curdict['x'] = 347
curdict['value'] = np.quantile(sorteddata, 1-347/365)
curdict = chardict['DM'] = {}
curdict['help'] = 'Débit atteint ou dépassé, en moyenne, pendant 50% du temps - débit médian'
curdict['x'] = .5*365
curdict['value'] = np.quantile(sorteddata, .5)
curdict = chardict['DCC'] = {}
curdict['x'] = 10
curdict['help'] = 'Débit journalier qui est dépassé 10 jours par an'
curdict['value'] = np.quantile(sorteddata, 1- 10/365)
if label != '':
ax.plot(f,sorteddata, label=label)
ax.legend()
else:
ax.plot(f,sorteddata)
ax.set_xlabel('Number of days with exceedance [days]')
ax.set_ylabel('Discharge [$m^3s^{-1}]')
ax.set_title('Classified flow rates curve')
ax.set_xbound(0,365)
ax.set_ybound(0,sorteddata.max())
# used = ['DCE', 'DC10','Q347', 'DM', 'DCC']
# unused = ['mean discharge','duration', 'frequency of non exceedance','frequency of exceedance','characteristics' ]
if textvalues:
for idx,(key,value) in enumerate(chardict.items()):
ax.plot([value['x'],value['x']], [0, value['value']], 'k--')
ax.text(value['x'], value['value']*1.05, '{0} : {1:.2f}'.format(key,value['value']), rotation=90)
valsx= np.hstack((np.arange(365, step=30), 365))
ax.set_xticks(valsx)
ax.set_xticklabels(valsx)
valsy= np.unique(sorted(np.hstack((np.arange(sorteddata.max(), step=25), sorteddata.max(), [curdict['value'] for curdict in chardict.values()]))))
ax.set_yticks(valsy)
ax.set_yticklabels(valsy)
ax.set_yscale('log')
ax.grid()
if show:
plt.show()
return fig, ax, resdict
[docs]
def plot_distfit(self, seasons=None, n_top=20, show=False):
"""
Plots of distfit module
"""
seasons = sanitize_seasons(seasons)
for curseason in seasons:
if not 'distfit' in self.maxima[curseason].keys():
self.distfit(curseason)
curdict = self.maxima[curseason]['distfit']
dist = curdict['func']
fig,ax=dist.plot() # Plot the best fitted distribution (y is included if prediction is made)
fig,ax=dist.plot_summary(n_top=n_top)
if show:
plt.show()
[docs]
def plot_ci(self, seasons=None, laws=['GEV'], methods=['MLE'], fig:Figure=None, ax:Axes=None, show=False, alpha=.1) -> tuple[Figure,Axes]:
"""
Plotting confidence interval
"""
if laws == 'popular':
laws = LAWS_POPULAR
elif isinstance(laws,str):
laws=[laws]
if fig is None and ax is None:
fig,ax = plt.subplots(1,1)
colors = ['r', 'g', 'b', 'r--', 'g--', 'b--']
seasons = sanitize_seasons(seasons)
ax.set_xscale('log')
ax.set_xlim([1,1000])
ax.set_xticks([1,2,5,10,15,25,50,100,1000])
ax.set_xticklabels([1,2,5,10,15,25,50,100,1000])
ax.set_ylabel('Discharge [$m^3s^{-1}$]')
ax.set_xlabel('Return period [year]')
ax.grid(True)
for curseason in seasons:
for curmethod in methods:
if curmethod in self.maxima[curseason]['fit'].keys():
dictm = self.maxima[curseason]['fit'][curmethod]
for curlaw in laws:
if curlaw in dictm.keys():
dictlaw = dictm[curlaw]
if 'ic' in dictlaw.keys():
dictic_sup = dictlaw['ic']['sup']
dictic_inf = dictlaw['ic']['inf']
for idx, curic in enumerate(dictic_inf):
c_inf = dictic_inf[curic]
c_sup = dictic_sup[curic]
ax.plot(RETURN_PERIOD_ESTIMATE, c_inf, 'k', lw=3, alpha=min(alpha*idx+.1,1), label='c_inf_{}%'.format(curic))
ax.plot(RETURN_PERIOD_ESTIMATE, c_sup, 'k', lw=3, alpha=min(alpha*idx+.1,1), label='c_sup_{}%'.format(curic))
# ax.scatter(RETURN_PERIOD_ESTIMATE, c_inf)
# ax.scatter(RETURN_PERIOD_ESTIMATE, c_sup)
ax.legend()
plt.tight_layout()
if show:
plt.show()
return (fig,ax)
[docs]
def plot_annual(self, years=None, hydrol=True, fig:Figure=None, ax:Axes=None, show=False, withdatemax=True) -> tuple[Figure,Axes]:
"""
Tracé en superposition des différentes années sur l'emprise d'une année civile ou hydrologique
"""
if fig is None and ax is None:
fig,ax = plt.subplots(1,1)
if years is None:
years = range(self.years_bounds[0],self.years_bounds[1]+1)
delta = 0
if hydrol:
delta = (dt(2021,1,1, tzinfo=self.tz_info)-dt(2020,10,1, tzinfo=self.tz_info))
for curyear in years:
start_date, end_date = self._get_dates(curyear, hydrol)
df = self.data[(self.data.index>=start_date) & (self.data.index<end_date)].copy()
if hydrol:
df.index = df.index + delta
ax.step(df.index.day_of_year*24 + df.index.hour,df,where='post',c='black',alpha=.2)
for curdata, curcolor, cursymb in zip(SEASONS,SEASONS_COLORS,SEASONS_MARKERS):
curdate = self.maxima[curdata]['serie'].index.copy() + delta
ax.scatter(curdate.day_of_year*24 + curdate.hour,
self.maxima[curdata]['maxval'],
marker = cursymb,
c=curcolor,label=curdata)
if withdatemax:
curdate = self.maxima['annual']['serie'].index.copy() + delta
txt = [str(cur) for cur in curdate.year]
x = np.asarray(curdate.day_of_year*24 + curdate.hour)
if np.max(self.maxima['annual']['maxval'])>100:
y = self.maxima['annual']['maxval']+5
else:
y = self.maxima['annual']['maxval']+.5
for curx,cury,curtxt in zip(x,y,txt):
ax.text(curx,cury,curtxt,rotation=90)
ax.set_xlabel('Date')
ax.set_ylabel('Discharge [$ m^3s^{-1} $]')
if hydrol:
days = [0,31,30,31,31,28,31,30,31,30,31,31]
ticks_pos = np.cumsum(days)*24
ticks_labels = ['Oct','Nov','Dec','Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep']
else:
days = [0,31,28,31,30,31,30,31,31,30,31,30]
ticks_pos = np.cumsum(days)*24
ticks_labels = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
ax.set_xticks(ticks_pos)
ax.set_xticklabels(ticks_labels)
ax.set_title('Duration {} h'.format(self.duration))
ax.legend()
fig.tight_layout()
if show:
plt.show()
return (fig,ax)
[docs]
def plot_serie(self, fig:Figure=None, ax:Axes=None, show=False, background=True, backcolor='lemonchiffon') -> tuple[Figure,Axes]:
"""Tracé de l'ensemble de la série pour toutes les années"""
if fig is None and ax is None:
fig,ax = plt.subplots(1,1)
ax.step(self.data.index,self.data,where='post')
for curdata, curcolor, cursymb in zip(SEASONS,SEASONS_COLORS,SEASONS_MARKERS):
ax.scatter(self.maxima[curdata]['date'],self.maxima[curdata]['maxval'],marker = cursymb,c=curcolor,label=curdata)
ax.set_xlabel('Date')
ax.set_ylabel('Discharge [$ m^3s^{-1} $]')
ax.legend()
ax.set_title('Duration {} h'.format(self.duration))
if background:
for curyear in range(self.years_bounds[0], self.years_bounds[1]+1):
ax.add_patch( Rectangle((dt(curyear,self.startmonth_summer,1),ax.get_ybound()[0]),
width=dt(curyear,self.startmonth_winter,1)-dt(curyear,self.startmonth_summer,1),
height=ax.get_ybound()[1]-ax.get_ybound()[0],
color=backcolor,
zorder=0))
if show:
plt.show()
return (fig,ax)
[docs]
def plot_cdf(self, seasons=None, fig:Figure=None, ax:Axes=None, show=False, n_bins=100) -> tuple[Figure,Axes]:
"""
Plotting empirical cdf
"""
if fig is None and ax is None:
fig,ax = plt.subplots(1,1)
seasons = sanitize_seasons(seasons)
for curseason in seasons:
values = self.maxima[curseason]['maxval'].copy()
n, bins, patches = ax.hist(values, n_bins, density=True, histtype='step',
cumulative=True, label='Empirical - '+curseason)
ax.grid(True)
ax.legend(loc='right')
ax.set_title('Cumulative step histograms')
ax.set_xlabel('Discharge [$m^3s^{-1}$]')
ax.set_ylabel('Likelihood of occurrence')
ax.set_title('Duration {} h'.format(self.duration))
if show:
plt.show()
return (fig,ax)
[docs]
def plot_T_Qmaxima(self, seasons=None, empirical_func='Cunnane', fig:Figure=None, ax:Axes=None, show=False, alpha=1., color_marker_label=None) -> tuple[Figure,Axes]:
"""
Plotting Q vs return period
"""
if fig is None and ax is None:
fig,ax = plt.subplots(1,1)
seasons = sanitize_seasons(seasons, True)
k=0
if seasons == ['mixture'] or seasons == ['joint']:
for curseason in SEASONS:
if not 'sorted_maxval' in self.maxima[curseason].keys():
self.sort_maxima(curseason)
self.set_empfreq(curseason)
p_emp = self.maxima[curseason]['Empirical_frequency'][empirical_func]
y = self.maxima[curseason]['sorted_maxval']
T_emp = 1/(1-p_emp)
if color_marker_label is not None:
curcolor, curmark, curlab = color_marker_label
ax.scatter(T_emp,y,marker=curmark,c=curcolor,label=curlab, alpha=alpha)
else:
ax.scatter(T_emp,y,marker=SEASONS_MARKERS[k],c=SEASONS_COLORS[k],label=curseason +' - '+str(self.duration), alpha=alpha)
k+=1
else:
for curseason in seasons:
if curseason != 'mixture' and curseason != 'joint':
if len(self.maxima.keys())==0:
self.find_maxima()
if not 'sorted_maxval' in self.maxima[curseason].keys():
self.sort_maxima(curseason)
self.set_empfreq(curseason)
p_emp = self.maxima[curseason]['Empirical_frequency'][empirical_func]
y = self.maxima[curseason]['sorted_maxval']
T_emp = 1./(1.-p_emp)
if color_marker_label is not None:
curcolor, curmark, curlab = color_marker_label
ax.scatter(T_emp,y,marker=curmark,c=curcolor,label=curlab, alpha=alpha)
else:
ax.scatter(T_emp,y,marker=SEASONS_MARKERS[k],c=SEASONS_COLORS[k],label=curseason +' - '+str(self.duration), alpha=alpha)
k+=1
ax.set_xscale('log')
ax.set_xticks([1,2,5,10,15,25,50,100,1000])
ax.set_xticklabels([1,2,5,10,15,25,50,100,1000])
ax.set_ylabel('Discharge [$m^3s^{-1}$]')
ax.set_xlabel('Return period [year]')
ax.grid(True)
ax.set_title('T-Q relation (Emp. freq. estimator : {})'.format(empirical_func) )
ax.legend()
if show:
plt.show()
return (fig,ax)
[docs]
def plot_maxevents(self, seasons=None, before=1, after=2, adim=True, fig:Figure=None, ax:Axes=None, show=False, alpha=.2, nbevents=None) -> tuple[Figure,Axes]:
"""
Tracé des événements/hydrogrammes associés aux maxima identifiés
Les ordonnées sont adimensionnaliées sur base de la valeur maximale
"""
seasons = sanitize_seasons(seasons)
colors = ['black'] * len(seasons)
if fig is None and ax is None:
fig,ax = plt.subplots(len(seasons),1)
self.compute_median_event(seasons, before, after)
nbmax=0
k=0
for curseason in seasons:
if not 'events' in self.maxima[curseason].keys():
self.extract_maxevents(before,after)
elif (self.maxima[curseason]['before'] != before) or (self.maxima[curseason]['after'] != after):
self.extract_maxevents(before,after)
if len(seasons)==1:
curax=ax
else:
curax=ax[k]
curdict = self.maxima[curseason]['events']
dates = self.maxima[curseason]['date']
if nbevents is not None:
locmax = self.maxima[curseason]['maxval'].copy()
locmax = np.sort(locmax)
locmax = list(locmax[-nbevents:])
all_adims = {}
for idx in np.arange((before+after)*24+1, dtype=np.int16):
all_adims[idx]=[]
for idx,(key,curevent) in enumerate(curdict.items()):
nb = len(curevent)
nbmax = max(nb,nbmax)
mymax = np.max(curevent)
if adim:
values = curevent.copy()/mymax
else:
values = curevent
if nb != (before+after)*24+1:
x = np.zeros(nb)
for curidx, curdelta in enumerate(values.index):
curhour = int(((curdelta-dates[idx]).days *24)+((curdelta-dates[idx]).seconds)/3600) + before*24
x[curidx] = curhour
all_adims[curhour].append(values[curidx])
else:
x = np.arange((before+after)*24+1, dtype=np.int16)
for curidx, curq in enumerate(values):
all_adims[curidx].append(curq)
if nbevents is None:
curax.step(x,values,where='post',alpha=alpha,c=colors[k], label = dates[idx].strftime("%m/%Y"))
elif mymax in locmax:
curax.step(x,values,where='post',alpha=alpha,c=colors[k], label = dates[idx].strftime("%m/%Y"))
print(dates[idx].strftime("%m/%Y"), mymax)
if adim:
x = self.maxima[curseason]['median_event']['time [hour]']
crue_mediane = self.maxima[curseason]['median_event']['Discharge[m3s-1]']
curax.step(x,crue_mediane,where='post',alpha=1., c=colors[k], lw=2, label = "Median")
curax.set_xticks(np.arange(0, (before+after)*24+1, 6, dtype=np.int16))
curax.set_xticklabels(np.arange(0, (before+after)*24+1, 6, dtype=np.int16))
curax.grid(True)
curax.set_title('Maxima events - duration : {} - {}'.format(self.duration,curseason))
curax.set_xlabel('Time [hour]')
if adim:
curax.set_ylabel('Discharge / DisMax [-]')
else:
curax.set_ylabel('Discharge [$m^3s^{-1}$]')
curax.set_xbound(0,(before+after)*24+1)
k+=1
plt.tight_layout()
if show:
plt.show()
return (fig,ax)
[docs]
def plot_comp_maxevents(self, seasons=None, before=1, after=2, fig:Figure=None, ax:Axes=None, show=False, alpha=.2, nbevents=None) -> tuple[Figure,Axes]:
"""
Tracé des événements médians
Les ordonnées sont adimensionnaliées sur base de la valeur maximale
"""
seasons = sanitize_seasons(seasons)
colors = ['red', 'green', 'blue']
if fig is None and ax is None:
fig,ax = plt.subplots(1,1)
nbmax=0
k=0
for curseason in seasons:
if not 'events' in self.maxima[curseason].keys():
self.extract_maxevents(before,after)
elif (self.maxima[curseason]['before'] != before) or (self.maxima[curseason]['after'] != after):
self.extract_maxevents(before,after)
curax=ax
curdict = self.maxima[curseason]['events']
dates = self.maxima[curseason]['date']
all_adims = {}
for idx in np.arange((before+after)*24+1, dtype=np.int16):
all_adims[idx]=[]
if nbevents is not None:
locmax = self.maxima[curseason]['maxval'].copy()
locmax = np.sort(locmax)
locmax = list(locmax[-nbevents:])
for idx,(key,curevent) in enumerate(curdict.items()):
nb=len(curevent)
nbmax=max(nb,nbmax)
mymax = np.max(curevent)
values = curevent.copy()/mymax
if nb != (before+after)*24+1:
x = np.zeros(nb)
for curidx, curdelta in enumerate(values.index):
curhour = int(((curdelta-dates[idx]).days *24)+((curdelta-dates[idx]).seconds)/3600) + before*24
x[curidx] = curhour
all_adims[curhour].append(values[curidx])
else:
x = np.arange((before+after)*24+1, dtype=np.int16)
for curidx, curq in enumerate(values):
all_adims[curidx].append(curq)
# if nbevents is None:
# curax.step(x,values,where='post',alpha=alpha,c=colors[k], label = dates[idx].strftime("%m/%Y"))
# elif mymax in locmax:
# curax.step(x,values,where='post',alpha=alpha,c=colors[k], label = dates[idx].strftime("%m/%Y"))
x = np.arange((before+after)*24+1, dtype=np.int16)
crue_mediane = [np.median(curval) for curval in all_adims.values()]
curax.step(x,crue_mediane,where='post',alpha=1., c=colors[k], lw=2, label = 'MedianQ {}'.format(curseason))
self.maxima[curseason]['median_event'] = pd.DataFrame({'time [hour]':x, 'Discharge[m3s-1]':crue_mediane})
curax.set_xticks(np.arange(0, (before+after)*24+1, 6, dtype=np.int16))
curax.set_xticklabels(np.arange(0, (before+after)*24+1, 6, dtype=np.int16))
curax.grid(True)
curax.set_title('Maxima events - duration : {} - {}'.format(self.duration,curseason))
curax.set_xlabel('Time [hour]')
curax.set_ylabel('Discharge / DisMax [-]')
curax.set_xbound(0,(before+after)*24+1)
k+=1
plt.tight_layout()
if show:
plt.show()
return (fig,ax)
[docs]
def plot_one_event(self,
datecenter,
before=1,
after=2,
adim=True,
color='k',
fig:Figure=None,
ax:Axes=None,
show=False,
alpha=.2) -> tuple[Figure,Axes]:
"""
Tracé d'un événement quelconque autour d'une date
"""
if fig is None and ax is None:
fig,ax = plt.subplots(1,1)
ax.set_title('Event - duration : {} - {}'.format(self.duration, 'data source'))
curevent = self.extract_oneevent(datecenter,before,after)
nb=len(curevent)
x = np.arange(nb)
if adim:
values = curevent.copy()/np.max(curevent)
ax.set_ylabel('Discharge / DisMax [-]')
else:
values = curevent
ax.set_ylabel('Discharge [$m^3s^{-1}$]')
ax.step(x,values,where='post',alpha=alpha,c=color, label = datecenter.strftime("%m/%Y"))
ax.set_xticks(np.arange(0,nb,6))
ax.set_xticklabels(np.arange(0,nb,6))
ax.grid(True)
ax.legend()
ax.set_xlabel('Time [hour]')
ax.set_xbound(0,nb)
if show:
plt.show()
return (fig,ax)
[docs]
def plot_one_maxevents(self,
season,
year,
before=1,
after=2,
adim=True,
color='k',
fig:Figure=None,
ax:Axes=None,
show=False,
alpha=.2) -> tuple[Figure,Axes]:
"""
Tracé d'un événement spécifique associés au maxima identifié
Les ordonnées sont adimensionnaliées sur base de la valeur maximale
"""
if fig is None and ax is None:
fig,ax = plt.subplots(1,1)
ax.set_title('Events - duration : {} - {}'.format(self.duration,season))
if not 'events' in self.maxima[season].keys():
self.extract_maxevents(before,after)
elif (self.maxima[season]['before'] != before) or (self.maxima[season]['after'] != after):
self.extract_maxevents(before,after)
idx = self.maxima[season]['water_year'].index(dt(year,1,1, tzinfo=self.tz_info))
curevent = self.maxima[season]['events'][year]
date = self.maxima[season]['date'][idx]
nb=len(curevent)
x = np.arange(nb)
if adim:
values = curevent.copy()/np.max(curevent)
ax.set_ylabel('Discharge / DisMax [-]')
else:
values = curevent
ax.set_ylabel('Discharge [$m^3s^{-1}$]')
ax.step(x,values,where='post',alpha=alpha,c=color, label = date.strftime("%m/%Y"))
ax.set_xticks(np.arange(0,nb,6))
ax.set_xticklabels(np.arange(0,nb,6))
ax.grid(True)
ax.legend()
ax.set_xlabel('Time [hour]')
ax.set_xbound(0,nb)
if show:
plt.show()
return (fig,ax)
[docs]
def plot_fit(self,
seasons=None,
laws=['GEV'],
methods=['MLE'],
fig:Figure=None,
ax:Axes=None,
show=False,
alpha=1.,
styles:dict=None) -> tuple[Figure,Axes]:
"""
Plotting fitted models
"""
## Périodes de retour et fréquences
if laws == 'popular':
laws = LAWS_POPULAR
elif isinstance(laws,str):
laws=[laws]
if fig is None and ax is None:
fig,ax = plt.subplots(1,1)
colors = ['r', 'g', 'b']*20
linestyles = ['solid', 'solid', 'solid', 'dashed', 'dashed', 'dashed'] *10
seasons = sanitize_seasons(seasons, True)
# où doit-on estimer la fonction?
data = self.maxima['annual']['maxval']
Q = np.asarray([(x*((np.max(data)*1.5-np.min(data))/100)+np.min(data)) for x in range(100)])
oldcolor = colors[0]
oldstyle = linestyles[0]
for idx,curseason in enumerate(seasons):
for curmethod in methods:
if curmethod in self.maxima[curseason]['fit'].keys():
dictm = self.maxima[curseason]['fit'][curmethod]
for curlaw in laws:
if curlaw in dictm.keys():
dist = dictm[curlaw]['func']
if len(laws)==1:
if styles is not None :
if curseason in styles.keys():
curcolor, curlinestyle, curlinewidth = get_style(styles[curseason])
else:
curcolor, curlinestyle, curlinewidth = get_style(styles)
else:
k=idx
curcolor = colors[k]
curlinestyle = linestyles[k]
while curcolor == oldcolor or curlinestyle == oldstyle:
k+=1
curcolor = colors[k]
curlinestyle = linestyles[k]
oldcolor = curcolor
oldstyle = curlinestyle
curlinewidth = 2
ax.plot(1/(1-dist.cdf(Q)),Q,
curcolor,
linestyle=curlinestyle,
lw=curlinewidth,
alpha=alpha,
label='{} - {} - {}h'.format(curlaw,curseason,self.duration))
else:
if styles is not None :
if curseason in styles.keys():
curcolor, curlinestyle, curlinewidth = get_style(styles[curseason])
else:
curcolor, curlinestyle, curlinewidth = get_style(styles)
else:
curcolor = dictm[curlaw]['color']
curlinewidth = 2
if not '--' in curcolor:
curlinestyle='solid'
else:
curlinestyle='--'
ax.plot(1/(1-dist.cdf(Q)),Q,
curcolor,
linestyle=curlinestyle,
lw=curlinewidth,
alpha=alpha,
label=curlaw+' - '+curseason )
ax.set_xscale('log')
ax.set_xlim([1,1000])
ax.set_xticks([1,2,5,10,15,25,50,100,1000])
ax.set_xticklabels([1,2,5,10,15,25,50,100,1000])
ax.set_ylabel('Discharge [$m^3s^{-1}$]')
ax.set_xlabel('Return period [year]')
ax.grid(True)
ax.legend()
if show:
plt.show()
return (fig,ax)
[docs]
def plot_qq(self, seasons=None, laws=['GEV'], methods=['MLE'], fig:Figure=None, ax:Axes=None) -> tuple[Figure,Axes]:
"""
Q-Q Plot
Comparaison de la valeur théorique avec la mesure (attention scipy.probploit n'utilise pas Cunnane mais Filliben cf https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.probplot.html)
"""
if laws == 'popular':
laws = LAWS_POPULAR
elif isinstance(laws,str):
laws=[laws]
if fig is None and ax is None:
fig,ax = plt.subplots(1,1)
ax.set_title('QQ Plot')
colors = ['r', 'g', 'b', 'r--', 'g--', 'b--']
seasons = sanitize_seasons(seasons)
k=0
for curseason in seasons:
if not 'sorted_maxval' in self.maxima[curseason].keys():
self.sort_maxima(curseason)
self.set_empfreq(curseason)
data = self.maxima[curseason]['sorted_maxval']
p_cum_Cun = self.maxima[curseason]['Empirical_frequency']['Cunnane']
for curmethod in methods:
if curmethod in self.maxima[curseason]['fit'].keys():
dictm = self.maxima[curseason]['fit'][curmethod]
for curlaw in laws:
if curlaw in dictm.keys():
dist = dictm[curlaw]['func']
# avec Cunnane
th_cun = [dist.ppf(curp) for curp in p_cum_Cun]
ax.scatter(th_cun,data,marker='o',label='Cunnane - '+curseason)
ax.plot([0,np.max(data)],[0,np.max(data)],'r')
ax.set_xlabel('Theoretical Quantile')
ax.set_ylabel('Empirical Quantile')
ax.legend()
return(fig, ax)
[docs]
def plot_summary(self, seasons=None, nb_laws=None, forced_laws=[], sort:typing.Literal['RSS', 'AICc', 'AIC', 'BIC']='RSS', fig:Figure=None, ax:Axes=None, show=False) -> tuple[Figure,Axes]:
"""Plot summary results.
Parameters
----------
n_top : int, optional
Show the top number of results. The default is None.
figsize : tuple, optional (default: (10,8))
The figure size.
ylim : Float, optional (default: None)
Limit figure in y-axis.
fig : Figure, optional (default: None)
Matplotlib figure
ax : Axes, optional (default: None)
Matplotlib Axes object
Returns
-------
tuple (fig, ax)
"""
seasons = sanitize_seasons(seasons)
if ax is None:
fig, ax = plt.subplots(len(seasons),1)
nb_seasons = len(seasons)-1
for idx, curseason in enumerate(seasons):
if 'fit' in self.maxima[curseason].keys():
if len(seasons)>1:
curax = ax[idx]
else:
curax = ax
curax2 = curax.twinx()
curdict = self.maxima[curseason]['fit']
for curmethod in curdict.keys():
if curmethod=='MLE':
curdictm = curdict[curmethod]
aic = np.asarray([curdictm[curlaw]['aic'] for curlaw in curdictm.keys()])
aicc = np.asarray([curdictm[curlaw]['aicc'] for curlaw in curdictm.keys()])
bic = np.asarray([curdictm[curlaw]['bic'] for curlaw in curdictm.keys()])
rss = np.asarray([curdictm[curlaw]['rss'] for curlaw in curdictm.keys()] )
labels = np.asarray(list(curdictm.keys()))
if nb_laws is None:
nb_laws=len(aicc)
if nb_laws>len(aicc):
nb_laws=len(aicc)
if sort=='RSS':
#tri selon RSS
index_sort = rss.argsort()[:nb_laws]
elif sort=='AICc':
#tri selon AICc
index_sort = aicc.argsort()[:nb_laws]
elif sort=='AIC':
#tri selon AIC
index_sort = aic.argsort()[:nb_laws]
elif sort=='BIC':
#tri selon BIC
index_sort = bic.argsort()[:nb_laws]
if len(forced_laws)>0:
# on doit ajouter au moins certaines lois
for curlaw in forced_laws:
selected_labels = list(labels[index_sort])
if not (curlaw in selected_labels):
#la loi n'est pas dans les meilleures sélectionnées
# on doit donc l'ajouter
idx = np.where(labels==curlaw)
index_sort = np.append(index_sort, idx)
curax.plot(aic[index_sort], 'b*', label='AIC')
curax.plot(aicc[index_sort], 'r*', label='AICc')
curax.plot(bic[index_sort], 'gd', label='BIC')
curax2.plot(rss[index_sort], 'co', label='RSS')
curax.set_xticks(np.arange(len(index_sort)))
curax.set_xticklabels(labels[index_sort], rotation='vertical')
curax.set_xlabel('Distribution name')
curax.legend()
curax2.legend()
# if idx == nb_seasons:
# curax.set_xticklabels(labels[index_sort], rotation='vertical')
# curax.set_xlabel('Distribution name')
# curax.legend()
# curax2.legend()
# else:
# curax.set_xticklabels([])
# Pad margins so that markers don't get clipped by the axes
curax.set_ymargin(0.2)
curax.grid(True)
curax.set_ylabel('Indicator (AIC, AICc, BIC - lower is better)')
curax2.set_ylabel('Indicator (RSS - lower is better)')
curax.set_title(curseason)
# Tweak spacing to prevent clipping of tick-labels
plt.subplots_adjust(bottom=0.15)
if show:
plt.show()
return fig, ax, list(labels[index_sort][:nb_laws])
[docs]
def summary_max(self, seasons=None):
seasons = sanitize_seasons(seasons)
res = {}
for curseason in seasons:
data_max = self.maxima[curseason]['maxval']
curdic = res[curseason] = {}
curdic['nb values'] = len(data_max)
curdic['max_of_max'] = np.max(data_max)
curdic['min_of_max'] = np.min(data_max)
curdic['median_of_max'] = np.median(data_max)
curdic['maxvals'] = data_max.tolist()
curdic['maxdates'] = [curdate.strftime('%Y/%m/%d %H') for curdate in self.maxima[curseason]['date']]
if curseason == 'annual':
if self.hydrological_year:
decal_start = [curdate - dt(curwy.year-1, self.startmonth_winter,1, tzinfo=self.tz_info) for curdate, curwy in zip(self.maxima[curseason]['date'],self.maxima[curseason]['water_year'])]
decal_end = [curdate - dt(curwy.year , self.startmonth_winter,1, tzinfo=self.tz_info) for curdate, curwy in zip(self.maxima[curseason]['date'],self.maxima[curseason]['water_year'])]
else:
decal_start = [curdate - curwy for curdate, curwy in zip(self.maxima[curseason]['date'],self.maxima[curseason]['water_year'])]
decal_end = [curdate - dt(curwy.year+1, 1,1, tzinfo=self.tz_info) for curdate, curwy in zip(self.maxima[curseason]['date'],self.maxima[curseason]['water_year'])]
elif curseason == 'winter':
if self.hydrological_year:
decal_start = [curdate - dt(curwy.year-1, self.startmonth_winter,1, tzinfo=self.tz_info) for curdate, curwy in zip(self.maxima[curseason]['date'],self.maxima[curseason]['water_year'])]
decal_end = [curdate - dt(curwy.year , self.startmonth_summer,1, tzinfo=self.tz_info) for curdate, curwy in zip(self.maxima[curseason]['date'],self.maxima[curseason]['water_year'])]
else:
decal_start = [curdate - curwy for curdate, curwy in zip(self.maxima[curseason]['date'],self.maxima[curseason]['water_year'])]
decal_end = [curdate - dt(curwy.year, 7,1, tzinfo=self.tz_info) for curdate, curwy in zip(self.maxima[curseason]['date'],self.maxima[curseason]['water_year'])]
elif curseason =='summer':
if self.hydrological_year:
decal_start = [curdate - dt(curwy.year, self.startmonth_summer,1, tzinfo=self.tz_info) for curdate, curwy in zip(self.maxima[curseason]['date'],self.maxima[curseason]['water_year'])]
decal_end = [curdate - dt(curwy.year, self.startmonth_winter,1, tzinfo=self.tz_info) for curdate, curwy in zip(self.maxima[curseason]['date'],self.maxima[curseason]['water_year'])]
else:
decal_start = [curdate - dt(curwy.year, 7,1, tzinfo=self.tz_info) for curdate, curwy in zip(self.maxima[curseason]['date'],self.maxima[curseason]['water_year'])]
decal_end = [curdate - dt(curwy.year+1, 1,1, tzinfo=self.tz_info) for curdate, curwy in zip(self.maxima[curseason]['date'],self.maxima[curseason]['water_year'])]
curdic['earliest_max [days after start]'] = np.min(decal_start).days
curdic['earliest_date'] = self.maxima[curseason]['date'][np.argmin(decal_start)].strftime('%Y/%m/%d')
curdic['latest_max [days before end]'] = np.max(decal_end).days
curdic['lastest_date'] = self.maxima[curseason]['date'][np.argmax(decal_end)].strftime('%Y/%m/%d')
return res
[docs]
class EVA_Series:
def __init__(self, data, datetime_index=None, datetime_format="%Y%m%d%H%M%S", data_headers=("#DateHeure", "Debit"), startmonth_winter = 10, startmonth_summer=4, hydrol=True) -> None:
"""
Initialisation de la classe sur base de la chronique de données
"""
self.base_serie = EVA_Serie(data,
datetime_index,
datetime_format,
data_headers,
startmonth_winter=startmonth_winter,
startmonth_summer=startmonth_summer,
hydrol=hydrol)
self.all_series:dict[str, EVA_Serie] = {}
self.MFSH={}
self.hydrological_year = hydrol
self.startmonth_winter = startmonth_winter
self.startmonth_summer = startmonth_summer
self._current_serie = self.base_serie
[docs]
def activate_serie(self, key:int = 1):
self._current_serie = self.get_serie(key)
[docs]
def plot_T_Qmaxima(self,
seasons=None,
empirical_func='Cunnane',
fig:Figure=None,
ax:Axes=None,
show=False,
alpha=1.,
color_marker_label=None) -> tuple[Figure,Axes]:
return self._current_serie.plot_T_Qmaxima(seasons, empirical_func, fig, ax, show, alpha, color_marker_label)
[docs]
def plot_fit(self,
seasons=None,
laws=['GEV'],
methods=['MLE'],
fig:Figure=None,
ax:Axes=None,
show=False,
alpha=1.,
styles:dict=None) -> tuple[Figure,Axes]:
return self._current_serie.plot_fit(seasons, laws, methods, fig, ax, show, alpha, styles)
[docs]
def get_q_from_T(self,
return_periods:list=[5,10,15,20,25,50,75,100,200,500,1000],
season:str='best',
law:str='best',
method:str='MLE',
ic=False) -> pd.DataFrame:
return self._current_serie.get_q_from_T(return_periods, season, law, method, ic)
[docs]
def get_T_from_q(self,
q:list,
season:str='best',
law:str='best',
method:str='MLE',
ic=False) -> pd.DataFrame:
return self._current_serie.get_T_from_q(q, season, law, method, ic)
[docs]
def save_q_from_T(self,
filename:str='',
return_periods=[5,10,15,20,25,50,75,100,200,500,1000],
season='best',
law='best',
method='MLE',
ic=False):
return self._current_serie.save_q_from_T(filename, return_periods, season, law, method, ic)
[docs]
def save_T_from_q(self,
filename:str='',
q=[5,10,15,20,25,50,75,100,200,500,1000],
season='best',
law='best',
method='MLE',
ic=False):
return self._current_serie.save_T_from_q(filename, q, season, law, method, ic)
[docs]
def get_serie(self, key=1) -> EVA_Serie:
if key in self.all_series.keys():
return self.all_series[key]
else:
raise Exception('Bad key -- try again !')
[docs]
def find_maxima(self,
excluded_years=[],
hydrol=True,
method:typing.Literal['BM', 'POT']='BM',
threshold: float=0.,
r: typing.Union[pd.Timedelta, typing.Any] = "24H",
verbose = True):
"""
Recherche des maxima pour toutes les saisons et toutes les durées
"""
self.hydrological_year = hydrol
for key,curserie in self.all_series.items():
curserie:EVA_Serie
curserie.find_maxima(hydrol=hydrol, excluded_years=excluded_years, method=method, threshold=threshold, r=r, verbose=verbose)
[docs]
def create_all_durations(self, durations=[1], verbose=True):
"""
Création des chroniques pour toutes les durées souhaitées [heures]
"""
# Opération de convolution pour déterminer les valeurs moyennes sur toutes les durées
# !! Un peu plus optimisé que des boucles Python !!
def convolve_data(data,nbsteps):
locsum = np.ones(nbsteps)
return np.convolve(locsum,data,'same')/float(nbsteps)
# bouclage sur toutes le durées et calcul de la donnée moyenne par convolution
# ATTENTION : les NaN vont se propager --> filtrer/traiter la série initiale en cas de besoin
durations.sort()
for curdur in durations:
if verbose:
print( ' **{}**'.format(curdur))
self.all_series[curdur] = EVA_Serie(convolve_data(self.base_serie.data,curdur),
self.base_serie.data.index,
duration=curdur,
startmonth_winter=self.startmonth_winter,
startmonth_summer=self.startmonth_summer,
hydrol=self.hydrological_year,
verbose=verbose)
[docs]
def bake_data(self,
durations=[1],
excluded_years=[],
hydrol=True,
method:typing.Literal['BM', 'POT']='BM',
threshold: float=0.,
r: typing.Union[pd.Timedelta, typing.Any] = "24H",
verbose=True):
self.create_all_durations(durations, verbose)
self.find_maxima(excluded_years, hydrol,method, threshold, r, verbose)
[docs]
def plot_series(self,
durations=None,
oneyear = True,
show=False,
background=True,
backcolor='lemonchiffon'):
"""
Viauslaisation des chroniques
"""
if durations is None:
durations = self.all_series.keys()
figs=[]
axes=[]
for curdur in durations:
curserie:EVA_Serie
curserie = self.all_series[curdur]
if oneyear:
fig,ax = curserie.plot_annual(withdatemax=True)
figs.append(fig)
axes.append(ax)
else:
fig,ax = curserie.plot_serie(background=background,backcolor=backcolor)
figs.append(fig)
axes.append(ax)
if show:
plt.show()
return figs, axes
[docs]
def plots_spw(self, stationcode:str, fontsize:int = 12, width=20, height=10, durations=[1], backcolor='lemonchiffon'):
fig,ax = self.plot_series(durations= durations, oneyear= False, show=True, backcolor=backcolor)
ax[0].set_title('Station '+ stationcode + ' - Durée 1h - Toutes les années')
ax[0].set_ylabel('Débit $m^3s^{-1}$')
change_font_size(ax[0], fontsize)
fig[0].set_size_inches(width, height)
fig[0].tight_layout()
fig2,ax2 = self.plot_series(durations= durations, oneyear= True , show=True)
ax2[0].set_title('Station '+ stationcode + ' - Durée 1h - Années superposées')
ax2[0].set_ylabel('Débit $m^3s^{-1}$')
change_font_size(ax2[0], fontsize)
fig2[0].set_size_inches(width, height)
return ax[0], ax2[0]
[docs]
def eva(self,
durations=[1],
laws='popular',
hydrol=True,
excluded_years=[],
plot=True,
method:typing.Literal['BM', 'POT']='BM',
threshold: float=0.,
r: typing.Union[pd.Timedelta, typing.Any] = "24H",
verbose=True,
show=False) -> dict[str, tuple]:
"""
Extreme Value Analysis
durations : (list) durations in hours - default [1]
laws : (str) laws to fit - default 'GEV'
hydrol : (bool) use hydrological year - default True
excluded_years : (list) years to exclude
plot : (bool) creation of different plots
method : (str) selection of maxima - default 'BM' == Block Maxima
return : dict of tuples (fig,axes) of the plots
"""
self.hydrological_year = hydrol
self.bake_data(durations, excluded_years, hydrol, method, threshold, r, verbose)
plots = {}
if plot:
plots['series_oneyear'] = self.plot_series(oneyear=True)
plots['series_continuous'] = self.plot_series(oneyear=False)
prev_serie = None
for curdur in durations:
locserie:EVA_Serie
locserie = self.all_series[curdur]
# optimisation des paramètres pour toutes les saisons
locserie.fit(laws=laws, init_EVA=prev_serie, verbose=verbose)
# fonction de mélange sur base d'une pondération "50-50" "summer"-"winter"
locserie.set_mixture(laws=laws)
# fonction jointe sur base du produit des probabilités saisonnières
locserie.set_joint(laws=laws)
if plot:
plots['cdf'] = locserie.plot_cdf()
plots['max_events'] = locserie.plot_maxevents()
plots['summary'] = locserie.plot_summary()
plots['qq'] = locserie.plot_qq()
# création d'un graphique pour chaque saison
for curseason in SEASONS:
fig,ax = locserie.plot_T_Qmaxima(curseason)
locserie.plot_fit(curseason, laws, fig=fig, ax=ax)
plots['TQ_' + curseason] = (fig,ax)
# création d'un graphique et supersposition des saiosns pour une seule loi
fig,ax = locserie.plot_T_Qmaxima()
locserie.plot_fit(laws='GEV', fig=fig, ax=ax)
plots['TQ_all_seasons'] = (fig,ax)
prev_serie = locserie
if show:
plt.show()
self.activate_serie(1)
return plots
[docs]
def evaluate_ci(self, seasons=None, durations=[1], laws=['GEV'], nboot=100, show=False) -> dict:
"""
Intervalles de confiance
Retourne une liste contenant des tuples (fig,ax) pour chaque durée et chaque saison
La boucle principale est sur les durées
La boucle interne est sur les saisons
"""
seasons = sanitize_seasons(seasons, False)
figax={}
for curdur in durations:
locserie:EVA_Serie
locserie = self.all_series[curdur]
locserie.fit(laws=laws, seasons=seasons, ic=True, nboot=nboot)
curfigax = figax[curdur] = {}
for curseason in seasons:
fig,ax = locserie.plot_T_Qmaxima(curseason, show=False)
locserie.plot_fit(curseason,laws,fig=fig,ax=ax, show=False)
locserie.plot_ci(curseason,laws, fig=fig, ax=ax, show=False)
curfigax[curseason] = (fig,ax)
if show:
plt.show()
return figax
[docs]
def evaluate_ic(self, seasons=None, durations=[1], laws=['GEV'], nboot=100, show=False) -> dict:
""" alias evaluate_ci """
return self.evaluate_ci(seasons, durations, laws, nboot, show)
[docs]
def test_distfit(self, durations=[1], plot=True):
"""
Ajustement de lois avec la toolbox distfit
"""
for curdur in durations:
locserie:EVA_Serie
locserie = self.all_series[curdur]
locserie.distfit(laws='full',bins=50,smooth=3)
if plot:
locserie.plot_distfit()
[docs]
def plot_qdf(self, seasons=None, durations=None, law='GEV', show = False):
"""Graphique de toutes les relations fittées pour chaque durée sur une même figure"""
colors = ['r', 'g', 'b', 'r--', 'g--', 'b--'] * int(np.ceil(len(self.all_series.keys()) /6))
seasons = sanitize_seasons(seasons,True)
if durations is None:
durations = self.all_series.keys()
resdict = {}
for curseason in seasons:
fig = None
for idx, curdur in enumerate(durations):
locserie:EVA_Serie
if curdur in self.all_series.keys():
locserie = self.all_series[curdur]
if fig is None:
fig,ax = locserie.plot_fit(curseason, law, styles=set_style(colors[idx]))
resdict[curseason] = (fig,ax)
else:
locserie.plot_fit(curseason, law, fig=fig, ax=ax, styles=set_style(colors[idx]))
if show:
plt.show()
return resdict
[docs]
def select_best_func(self, season='mixture', law='GEV', method='MLE'):
"""
Sélection de la fonction à retenir comme meilleur ajustement
"""
for idx, curdur in enumerate(self.all_series.keys()):
# bouclage sur les durées
locserie = self.all_series[curdur]
locserie.select_best_func(season, law, method)
if locserie.best is None:
raise Exception('Bad "best" func selection - retry !')
[docs]
def get_one_MFSH(self, rising_time:float=6., return_period:float=50., deltat:float=1.) -> Hydro_HSMF:
durees = list(self.all_series.keys())
Qp = [d.ppf(1.-1./float(return_period)) for d in self.bests]
# Qp est maintenant une liste qui reprend les valeurs de Q pour la période de retour T pour les différentes durées
# Sur cette base, on lance la génération de l'hydrogramme
res = Hydro_HSMF(Qp, durees, rising_time, deltat, 'HSMF_{}'.format(return_period))
res.opt_hydro()
return res
[docs]
def opti_mfsh(self, label, rising_time=6, return_period = [2, 5, 10, 25, 50, 100, 200, 500, 1000], deltat=1):
"""
#############################
## CALCUL DES HYDROGRAMMES ##
#############################
"""
return_period.sort()
dict = self.MFSH[label]={}
self.bests = [cur.best for cur in self.all_series.values()]
# lancer une boucle 'for' sur les différentes périodes de retour
for curT in return_period:
dict[curT] = self.get_one_MFSH(rising_time, curT, deltat)
[docs]
def plot_msfh(self,
label,
return_period= [2, 5, 10, 25, 50, 100, 200, 500, 1000],
ref_season='annual',
ylim=None,
before = 1,
after = 3,
show=False) -> tuple[Figure, Axes, Axes]:
"""
################
## GRAPHIQUES ##
################
"""
if not label in self.MFSH.keys():
raise Warning('No hydrogram to plot -- retry')
ref_serie = self.all_series[1]
fig, ax = ref_serie.plot_maxevents(ref_season, before, after, alpha=.15)
ax2 = ax.twinx()
for curT in return_period:
curhydro:Hydro_HSMF
curhydro=self.MFSH[label][curT]
curhydro._plot_Q(ax2, before*24 - curhydro.temps_montee, label=str(curT), lw=3, xup_bound=(before+after)*24)
ax.legend().set_visible(False)
ax2.legend()
if ylim is not None:
ax.set_ylim(ylim)
if show:
plt.show()
return fig,ax,ax2
[docs]
def plot_msfh_mainplot(self,
label,
return_period= [2, 5, 10, 25, 50, 100, 200, 500, 1000],
ref_season='annual',
ylim=None,
before = 1,
after=2,
show=False) -> tuple[Figure, Axes]:
"""
################
## GRAPHIQUES ##
################
"""
if not label in self.MFSH.keys():
raise Warning('No hydrogram to plot -- retry')
fig,ax = plt.subplots(1,1)
for curT in return_period:
curhydro:Hydro_HSMF
curhydro=self.MFSH[label][curT]
curhydro._plot_Q(ax, before*24 - curhydro.temps_montee, label=str(curT), lw=3, xup_bound=(before+after)*24)
ax.legend()
if ylim is not None:
ax.set_ylim(ylim)
if show:
plt.show()
return fig,ax
[docs]
def save_max_event(self,
filename:str,
years_bounds:list,
seasons = None):
self._current_serie.save_max_events(filename, years_bounds, seasons)
[docs]
def save_msfh(self, label,
filename,
return_period= [2, 5, 10, 25, 50, 100, 200, 500, 1000]):
"""
################
## SAUVEGARDE ##
################
"""
if not label in self.MFSH.keys():
raise Warning('No hydrogram to plot -- retry')
dicthydro={}
for curT in return_period:
curhydro:Hydro_HSMF
curhydro=self.MFSH[label][curT]
dicthydro['Temps [heure]'] = curhydro.temps
dicthydro['Debit [m3s-1] - {} ans'.format(curT)] = curhydro.hydro
df = pd.DataFrame.from_dict(dicthydro, orient='columns')
df.to_csv(filename, sep=';')
[docs]
def plot_one_fit(self, seasons=None, durations=None, laws='popular',
split_duration=True, split_season=True, xbounds=None,
ybounds=None, show=False) -> tuple[Figure, Axes]:
seasons = sanitize_seasons(seasons)
if durations is None:
durations = list(self.all_series.keys())
elif isinstance(durations,str) or isinstance(durations,int):
durations=[durations]
if laws == 'popular':
laws = LAWS_POPULAR
elif isinstance(laws,str):
laws=[laws]
nbrows = 1
nbcols = 1
if split_duration:
nbrows = len(durations)
if split_season:
nbcols = len(seasons)
fig,ax = plt.subplots(nbrows, nbcols)
for iddur, curdur in enumerate(durations):
locserie:EVA_Serie
locserie = self.all_series[curdur]
for idseas, curseason in enumerate(seasons):
if split_duration and split_season:
curax = ax[iddur, idseas]
elif split_season:
curax = ax[idseas]
elif split_duration:
curax = ax[iddur]
else:
curax = ax
locserie.plot_T_Qmaxima(curseason, fig=fig, ax = curax)
locserie.plot_fit(laws=laws, fig=fig, ax=curax)
if xbounds is not None:
if split_duration and split_season:
for curax in ax:
for curax2 in curax:
curax2.set_xbound(xbounds)
elif split_duration or split_season:
for curax in ax:
curax.set_xbound(xbounds)
else:
ax.set_xbound(xbounds)
if ybounds is not None:
if split_duration and split_season:
for curax in ax:
for curax2 in curax:
curax2.set_ybound(ybounds)
elif split_duration or split_season:
for curax in ax:
curax.set_ybound(ybounds)
else:
ax.set_ybound(ybounds)
if show:
plt.show()
return fig, ax
[docs]
def print_summary(self, seasons=None, durations=None, show=False):
import json
seasons = sanitize_seasons(seasons)
if durations is None:
durations = list(self.all_series.keys())
summary={}
for idx, curdur in enumerate(durations):
locserie:EVA_Serie
locserie = self.all_series[curdur]
summary[str(curdur)] = locserie.summary_max(seasons)
if show:
print(json.dumps(summary, indent=4))
return summary
[docs]
def plot_selected_max(self, seasons=None, durations=None,
split_seasons = False, scaling=False, show=False)-> tuple[Figure, Axes]:
"""
Graphique des dates des événements sélectionnés pour chaque durée (par défaut, toutes les durées)
"""
seasons = sanitize_seasons(seasons)
if durations is None:
durations = list(self.all_series.keys())
locserie = self.base_serie
ticks_year = [ locserie._get_dates(curyear,self.hydrological_year)[0] for curyear in np.arange(locserie.years_bounds[0],locserie.years_bounds[1]+1)]
ticks_season = [ locserie._get_dates(curyear,self.hydrological_year, 'winter')[1] for curyear in np.arange(locserie.years_bounds[0],locserie.years_bounds[1]+1)]
if split_seasons and len(seasons)>1:
fig, ax = plt.subplots(len(seasons),1)
for idx, curdur in enumerate(durations):
locserie:EVA_Serie
locserie = self.all_series[curdur]
k=0
for curseason, curcolor, curmarker in zip(seasons, SEASONS_COLORS, SEASONS_MARKERS):
label=''
if idx==0:
label=curseason
nb = len(locserie.maxima[curseason]['date'])
if scaling:
ax[k].scatter(locserie.maxima[curseason]['date'],[idx+1]*nb, s=locserie.maxima[curseason]['maxval'], marker=curmarker,c=curcolor, label=label)
else:
ax[k].scatter(locserie.maxima[curseason]['date'],[idx+1]*nb, marker=curmarker,c=curcolor, label=label)
k+=1
for curax in ax:
curax.set_yticks(np.arange(int(len(durations)))+1)
curax.set_yticklabels(durations)
curax.set_ylabel('Durations [hours]')
curax.grid(True)
curax.legend()
curax.set_xticks(ticks_year + ticks_season)
curax.set_xticklabels( [curyear for curyear in np.arange(locserie._get_dates(locserie.years_bounds[0],self.hydrological_year)[0].year
,locserie._get_dates(locserie.years_bounds[1]+1,self.hydrological_year, 'winter')[0].year)] + ['w->s']*len(ticks_season), rotation='vertical')
curax = ax[-1]
if self.hydrological_year:
curax.set_xlabel('Date (hydrological year)')
else:
curax.set_xlabel('Date (real calendar)')
else:
fig, ax = plt.subplots(1,1)
for idx, curdur in enumerate(durations):
locserie:EVA_Serie
locserie = self.all_series[curdur]
for curseason, curcolor, curmarker in zip(seasons, SEASONS_COLORS, SEASONS_MARKERS):
label=''
if idx==0:
label=curseason
nb = len(locserie.maxima[curseason]['date'])
ax.scatter(locserie.maxima[curseason]['date'],[idx+1]*nb,marker=curmarker,c=curcolor, label=label)
ax.set_yticks(np.arange(int(len(durations)))+1)
ax.set_yticklabels(durations)
ticks_year = [ locserie._get_dates(curyear,self.hydrological_year)[0] for curyear in np.arange(locserie.years_bounds[0],locserie.years_bounds[1]+1)]
ticks_season = [ locserie._get_dates(curyear,self.hydrological_year, 'winter')[1] for curyear in np.arange(locserie.years_bounds[0],locserie.years_bounds[1]+1)]
ax.set_xticks(ticks_year + ticks_season)
ax.set_xticklabels( [curyear for curyear in np.arange(locserie._get_dates(locserie.years_bounds[0],self.hydrological_year)[0].year
,locserie._get_dates(locserie.years_bounds[1]+1,self.hydrological_year, 'winter')[0].year)] + ['w->s']*len(ticks_season), rotation='vertical')
ax.set_ylabel('Durations [hours]')
if self.hydrological_year:
ax.set_xlabel('Date (hydrological year)')
else:
ax.set_xlabel('Date (real calendar)')
ax.grid(True)
ax.legend()
# plt.tight_layout()
if show:
plt.show()
return fig, ax
[docs]
def example1():
# durees=np.arange(1,24,1,dtype=np.int32)
durees = [1]
myseries = EVA_Series(filepath, data_headers=("#DateHeure", "Debit"))
myseries.eva(durees,plot=True)
myseries.eva(durees, excluded_years=[2021,2022], plot=True)
myseries.evaluate_ci()
[docs]
def example1bis():
# durees=np.arange(1,24,1,dtype=np.int32)
durees = [24]
myseries = EVA_Series(filepath, data_headers=("#DateHeure", "Debit"))
myseries.bake_data(durees)
locserie = myseries.get_serie(24)
fig,ax,Qchar = locserie.plot_classified_flowrate_curve()
[docs]
def example2():
# durees=np.arange(1,24,1,dtype=np.int32)
durees = [1,6,12,24,48]
myseries = EVA_Series(filepath, data_headers=("#DateHeure", "Debit"))
myseries.eva(durees, laws='GEV', excluded_years=[2021,2022], plot=False)
myseries.plot_qdf(show=True)
myseries.select_best_func('mixture')
myseries.opti_mfsh('summer')
myseries.plot_msfh('summer',show=True)
[docs]
def example3():
# durees=np.arange(1,24,1,dtype=np.int32)
durees = np.hstack([np.arange(1,49),72,120])
myseries = EVA_Series(filepath, data_headers=("#DateHeure", "Debit"))
myseries.eva(durees, laws='GEV', excluded_years=[2021,2022], plot=False)
myseries.select_best_func('mixture')
myseries.plot_qdf(show=False)
myseries.opti_mfsh('12',rising_time=12)
myseries.plot_msfh('12',ref_season='annual')
myseries.plot_msfh('12',ref_season='winter')
myseries.plot_msfh('12',ref_season='summer')
myseries.opti_mfsh('6',rising_time=6)
myseries.plot_msfh('6',ref_season='annual')
myseries.plot_msfh('6',ref_season='winter')
myseries.plot_msfh('6',ref_season='summer',show=True)
[docs]
def example4():
durees = np.hstack([np.arange(1,49),72,120])
durees = [48]
filepath = os.path.join(strdir,filename_verviers_naturel_test)
myseries = EVA_Series(filepath, startmonth_winter=10, startmonth_summer=4, hydrol=True)
myseries.bake_data(durees, excluded_years=[2022], verbose=False)
locserie:EVA_Serie
locserie = myseries.all_series[48]
locserie.plot_one_maxevents('annual',1998,show=True)
myseries.print_summary()
fig,ax = myseries.plot_selected_max(split_seasons=True, show=True)
myseries.plot_selected_max(split_seasons=True, scaling=True, show=True)
myseries.plot_selected_max(split_seasons=True, show=True)
myseries.plot_selected_max(split_seasons=False, show=True)
[docs]
def example5():
durees = np.hstack([np.arange(1,49),72,120])
durees = [1,48]
filepath = os.path.join(strdir,filename_verviers_naturel_test)
myseries = EVA_Series(filepath, startmonth_winter=10, startmonth_summer=4, hydrol=True)
myseries.eva([1], laws='GEV', excluded_years=[2021,2022], plot=True)
[docs]
def example6_POT():
durees = [1]
myseries = EVA_Series(filepath, data_headers=("#DateHeure", "Debit"))
myseries.eva(durees,plot=True, method='POT', threshold=10.)
myseries.eva(durees, excluded_years=[2021,2022], plot=True)
myseries.evaluate_ci()
if __name__=='__main__':
[docs]
strPath = os.path.realpath(__file__)
strdir = os.path.join(os.path.dirname(strPath),'..\\..\\data\\stats')
filename_Theux = 'L5860_Qh_Mean.csv'
filename_verviers = 'L7150_Qh_Mean.csv'
filename_verviers_ecrete = 'L7150_Ecrete_Qh_Mean.csv'
filename_verviers_naturel= 'L7150_Naturel_Qh_Mean.csv'
filename_test_nan_excel = 'L7150_Naturel_Qh_Mean_#valeur.csv'
filename_verviers_naturel_test = 'L7150_Naturel_Qh_Mean_#valeur.csv'
filepath = os.path.join(strdir,filename_verviers)
example6_POT()
pass