Source code for wolfhece.RatingCurve_xml

"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from scipy.optimize import curve_fit, root_scalar
import xmltodict
import pandas as pd
from sympy import *
from datetime import datetime, timedelta
import glob
import logging
import os.path as path
from typing import Literal, Union
from pathlib import Path
from itertools import cycle
from enum import Enum

from .PyTranslate import _
try:
    from .hydrometry_hece.kiwis_hece import hydrometry_hece as hydrometry
except:
    logging.debug(_('Hydrometry HECE module not found - Load hydrometry instead of hydrometry_hece'))
    from .hydrometry.kiwis import hydrometry

# FONCTIONS UTILES pour FIT
# -------------------------
[docs] def get_power_law(): return lambda x,a,b,c : a*(x-c)**b
[docs] def get_power_law_(*args): a,b,c = args[0] return lambda x : a*(x-c)**b
[docs] def get_poly3_law(): return lambda x,a,b,c,d : a*x**3.+b*x**2.+c*x+d
[docs] def get_poly3_law_(*args): a,b,c,d = args[0] return lambda x : a*x**3.+b*x**2.+c*x+d
[docs] def get_poly2_law(): return lambda x,a,b,c : a*x**2.+b*x+c
[docs] def get_poly2_law_(*args): a,b,c = args[0] return lambda x : a*x**2.+b*x+c
[docs] class laws(Enum):
[docs] poly2 = (get_poly2_law , get_poly2_law_)
[docs] poly3 = (get_poly3_law, get_poly3_law_)
[docs] power = (get_power_law, get_power_law_)
[docs] class Gauging(): """ Class pour un jaugeage """
[docs] date:datetime
[docs] waterdepth:float
[docs] discharge:float
[docs] width:float
[docs] wet_area:float
[docs] wet_perimeter:float
def __init__(self, date:datetime, waterdepth:float, discharge:float, wet_area:float = np.nan, width:float = np.nan, wet_perimeter:float = np.nan ) -> None: """ Initialisation d'un jaugeage :param date: date du jaugeage :param waterdepth: hauteur d'eau :param discharge: débit :param wet_area: aire mouillée :param width: largeur :param wet_perimeter: périmètre mouillé """ self.date = date self.waterdepth= waterdepth self.discharge = discharge self.wet_area = wet_area self.width = width self.wet_perimeter = wet_perimeter
[docs] class FragmentCurve(): """ Fragment de courbe de tarage """
[docs] expression:str
[docs] h_min:float
[docs] h_max:float
[docs] river:str
def __init__(self, expression:Union[str, Function], h_min:float, h_max:float, river:str=None) -> None: """ Initialisation d'un fragment de courbe de tarage :param expression: expression analytique de la courbe -- chaîne de caractères ou fonction :param h_min: hauteur minimale :param h_max: hauteur maximale :param river: nom de la rivière """ self.h_min = float(h_min) self.h_max = float(h_max) self.river = river if isinstance(expression, str): #test si l'expression analytique a la forme d'une puissance if expression[0:3]=='rem': l=expression.find('=',30) expression=expression[l+1:] self.expression = expression x=symbols('x') string_function=expression.replace("^","**") strfunc = parse_expr(string_function) func = lambdify(x,strfunc) else: func = expression self.expression = None self.func = func
[docs] def plot_frag(self, figax=None, ls='solid', color='k'): """ Graphique du fragment de courbe de tarage :param figax: figure et axes matplotlib (optionel) -- si non fourni, une nouvelle figure est créée :param ls: style de ligne (optionel) -- solide par défaut :param color: couleur de la courbe (optionel) -- noir par défaut """ if figax is None: fig,ax = plt.subplots(1,1) else: fig,ax = figax m=np.linspace(self.h_min,self.h_max,100) y=self.func(m) if isinstance(m, float): pass if isinstance(y, float): pass #plot courbes de tarage ax.plot(m, y, ls=ls, c=color) qhmax = self.func(self.h_max) ax.plot([self.h_max, self.h_max],[min(qhmax*.9,qhmax-5.), max(qhmax*.9,qhmax+5.)], ls='--', color='k') return fig,ax
[docs] class RatingCurve(): """ Class pour les courbes de tarage """
[docs] date_start:datetime
[docs] date_end:datetime
[docs] fragments:list[FragmentCurve]
[docs] active:bool
def __init__(self, date_start:datetime, date_end:datetime, active:bool=True) -> None: """ Initialisation de la courbe de tarage :param date_start: date de début de validité :param date_end: date de fin de validité :param active: courbe active ou non """ self.date_start = date_start self.date_end = date_end self.fragments = [] self.active = active
[docs] def add(self, expression:Union[str, Function], h_min:float, h_max:float): """ Ajout d'un fragment de courbe de tarage :param expression: expression analytique de la courbe -- chaîne de caractères ou fonction :param h_min: hauteur minimale :param h_max: hauteur maximale """ self.fragments.append(FragmentCurve(expression, h_min, h_max))
[docs] def plot_curve(self, figax=None, color='k'): """ Graphique de la courbe de tarage :param figax: figure et axes matplotlib (optionel) -- si non fourni, une nouvelle figure est créée :param color: couleur de la courbe (optionel) -- noir par défaut """ if figax is None: fig,ax = plt.subplots(1,1) else: fig,ax = figax ls = 'solid' if self.active else '--' for cur_frag in self.fragments: cur_frag.plot_frag((fig,ax), ls=ls, color=color) ax.get_lines()[-2].set_label(self.date_start.strftime('%Y-%m-%d') + ' - ' + self.date_end.strftime('%Y-%m-%d')) return fig,ax
[docs] def compute_q_from_h(self, h:pd.Series) -> pd.Series: """ Conversion mesure de hauteur H en débit Q :param h: série Pandas de hauteur H :return: série Pandas de débit Q """ q = h.copy(True) #copir de la série for curfrag in self.fragments: #bouclage sur les fragemets q[(h >= curfrag.h_min) & (h <= curfrag.h_max)] = curfrag.func(h[(h >= curfrag.h_min) & (h <= curfrag.h_max)]) return q
[docs] def compute_h_from_q(self, q:pd.Series) -> pd.Series: """ Conversion mesure de débit Q en Hauteur H :param q: série Pandas de débit Q :return: série Pandas de hauteur H """ h = q.copy(True) for curfrag in self.fragments: #bouclage sur les fragemets #bornes en terme de Q qmin = curfrag.func(curfrag.h_min) qmax = curfrag.func(curfrag.h_max) # fonction à résoudre def hq(h, qobj): return curfrag.func(h)-qobj # résolution inverse du fragment hroots = [root_scalar(hq, args=curq, method='brentq', bracket=[curfrag.h_min, curfrag.h_max]).root for curq in h[(q >= qmin) & (q<=qmax)]] h[(q >= qmin) & (q<=qmax)] = hroots return h
[docs] class StationCurvesGaugings(): """ Une station avec courbes de tarage et jaugeages """
[docs] curves:list[RatingCurve]
[docs] gaugings:list[Gauging]
def __init__(self, filename:str = None) -> None: self.code = None # code de la station self.href = None # altitude de référence self.hydrometry = None # lien vers une instance de la classe hydrometry self.gaugings = [] # liste des jaugeages self.curves = [] # liste des courbes de tarage self.name = '' # nom de la station self.code = '' # code de la station if filename is not None: if filename.endswith('.xml'): # seul les frichiers xml sont acceptés comme données individuelles (extraction de la BDD Kiwis -> DCENN) # le SPW-MI fournit plutôt un Excel regroupant toutes les stations dans un même fichier self._read_xml(filename) def __str__(self) -> str: """ Affichage des infos de la station """ txt = f'Station {self.name} - {self.code}\n\n' txt += f'Courbes de tarage: {len(self.curves)}\n' txt += f'Jaugeages: {len(self.gaugings)}\n' txt += f'Altitude de référence: {self.href}\n' # txt += '\n\n' # txt += 'Courbes de tarage:\n' # for cur_curve in self.curves: # txt += f'- {cur_curve.date_start.strftime("%Y-%m-%d")} - {cur_curve.date_end.strftime("%Y-%m-%d")}\n' # txt += '\nJaugeages:\n' # for cur_gauging in self.gaugings: # txt += f'- {cur_gauging.date.strftime("%Y-%m-%d")} - {cur_gauging.waterdepth} - {cur_gauging.discharge}\n' return txt
[docs] def get_href(self, hydrometry): """ Récupération de l'altitude de référence depuis le site hydrometrie.wallonie.be""" self.hydrometry = hydrometry if self.hydrometry is not None: self.href = self.hydrometry.get_gauge_datum(code = self.code) else: self.href = None
[docs] def add_ratingcurve(self, rc:RatingCurve): """ Ajout d'une courbe de tarage """ self.curves.append(rc)
[docs] def _read_xml(self, file_xml:str): """ Lecture des infos au format XML --> DCENN export KIWIS :remark: Si le format change, il faut adapter cette méthode """ with open(file_xml) as fd: m = xmltodict.parse(fd.read()) idd =m['KISTERSRatingcurve']['Station']['@Number'] name=m['KISTERSRatingcurve']['Station']['@Name'] periods = m['KISTERSRatingcurve']['Station']['Parameter']['Periods']['Period'] rc_list = m['KISTERSRatingcurve']['Station']['Parameter']['Rc'] rc_names = [cur_rc['@Name'] for cur_rc in rc_list] for j in range(len(rc_names)-1): cur_name = rc_names[j] next_name = rc_names[j+1] for cur_period in periods: try: if cur_period['@RatingName'] == cur_name: break except: pass for next_period in periods: try: if next_period['@RatingName'] == next_name: break except: pass date_debut = datetime.strptime(cur_period['@Start'], '%Y-%m-%dT%H:%M:%S') date_fin = datetime.strptime(next_period['@Start'], '%Y-%m-%dT%H:%M:%S') cur_rc = rc_list[rc_names.index(cur_name)] if isinstance(cur_rc['Version'], dict): cur_curve = RatingCurve(date_debut, date_fin) self.add_ratingcurve(cur_curve) parameters = cur_rc['Version']['TableParameterSet']['TableParameters']['TableParameter'] for curparam in range(int(parameters[-1]['@Value'])): expr = parameters[curparam + 1 + (curparam)*4]['@Value'] hmax = parameters[curparam + 1 + (curparam)*4+2]['@Value'] hmin = parameters[curparam + 1 + (curparam)*4+3]['@Value'] cur_curve.add(expr, hmin, hmax) else: for i in range(len(cur_rc['Version'])): cur_curve = RatingCurve(date_debut, date_fin) self.add_ratingcurve(cur_curve) parameters = cur_rc['Version'][i]['TableParameterSet']['TableParameters']['TableParameter'] for curparam in range(int(parameters[-1]['@Value'])): expr = parameters[curparam + 1 + (curparam)*4]['@Value'] hmax = parameters[curparam + 1 + (curparam)*4+2]['@Value'] hmin = parameters[curparam + 1 + (curparam)*4+3]['@Value'] cur_curve.add(expr, hmin, hmax) cur_curve.active = cur_rc['Version'][i]['@Active'] =='true' j = len(rc_names)-1 cur_name = rc_names[j] for cur_period in periods: try: if cur_period['@RatingName'] == cur_name: break except: pass date_debut = datetime.strptime(cur_period['@Start'], '%Y-%m-%dT%H:%M:%S') date_fin = datetime.now() cur_rc = rc_list[rc_names.index(cur_name)] if isinstance(cur_rc['Version'], dict): cur_curve = RatingCurve(date_debut, date_fin) self.curves.append(cur_curve) parameters = cur_rc['Version']['TableParameterSet']['TableParameters']['TableParameter'] for curparam in range(int(parameters[-1]['@Value'])): expr = parameters[curparam + 1 + (curparam)*4]['@Value'] hmax = parameters[curparam + 1 + (curparam)*4+2]['@Value'] hmin = parameters[curparam + 1 + (curparam)*4+3]['@Value'] cur_curve.add(expr, hmin, hmax) else: for i in range(len(cur_rc['Version'])): cur_curve = RatingCurve(date_debut, date_fin) self.curves.append(cur_curve) parameters = cur_rc['Version'][i]['TableParameterSet']['TableParameters']['TableParameter'] for curparam in range(int(parameters[-1]['@Value'])): expr = parameters[curparam + 1 + (curparam)*4]['@Value'] hmax = parameters[curparam + 1 + (curparam)*4+2]['@Value'] hmin = parameters[curparam + 1 + (curparam)*4+3]['@Value'] cur_curve.add(expr, hmin, hmax) cur_curve.active = cur_rc['Version'][i]['@Active'] =='true' self.name = name self.code = idd
[docs] def _read_gaugings(self, fichier_xls:str): """ Récupération des données de jaugeages sur base d'un fichier Excel """ if not path.exists(fichier_xls): return if Path(fichier_xls).name[0]=='~': return doc=pd.read_excel(fichier_xls) for i in range (len(doc[doc.columns[0]])): add_jaugeage=Gauging(doc[doc.columns[0]][i], doc[doc.columns[1]][i], doc[doc.columns[2]][i]) self.gaugings.append(add_jaugeage)
[docs] def get_periods(self): """Récupération des périodes de validité des courbes de tarage""" return [(cur_curve.date_start, cur_curve.date_end) for cur_curve in self.curves]
[docs] def get_curves(self, date_debut:datetime=None, date_fin:datetime=None, only_active=True, only_last = False) -> list[RatingCurve]: """ Récupération des courbes sur base de dates :param date_debut: date de début d'intervalle :param date_fin: date de fin d'intervalle :param only_active: ne prendre que les courbes actives (dépend si lu via xml ou xlsx) :param only_last: ne prendre que la dernière courbe encodée return: liste de courbes de tarage """ if only_last: return [self.curves[-1]] else: if (date_debut is None) and (date_fin is None): if only_active: useful_curves = [cur_curve for cur_curve in self.curves if cur_curve.active] else: useful_curves = self.curves else: if only_active: useful_curves = [cur_curve for cur_curve in self.curves if (cur_curve.date_start>= date_debut) and (cur_curve.date_end<= date_fin) and cur_curve.active] else: useful_curves = [cur_curve for cur_curve in self.curves if (cur_curve.date_start>= date_debut) and (cur_curve.date_end<= date_fin)] return useful_curves
[docs] def get_gaugings(self, date_debut:datetime=None, date_fin:datetime=None) -> list[Gauging]: """ Récupération des jaugeages sur base de dates :param date_debut: date de début d'intervalle :param date_fin: date de fin d'intervalle return: liste de jaugeages """ if (date_debut is None) and (date_fin is None): useful_jau = self.gaugings else: useful_jau = [cur_jau for cur_jau in self.gaugings if (cur_jau.date>= date_debut) and (cur_jau.date<= date_fin)] return useful_jau
[docs] def print_gaugings(self, date_debut:datetime=None, date_fin:datetime=None): """ Affichage des jaugeages sur base de dates """ txt = "date - hauteur - débit\n" gaug = self.get_gaugings(date_debut, date_fin) for curg in gaug: txt += f'{curg.date} - {curg.waterdepth} - {curg.discharge}\n' return txt
[docs] def get_max_gauging(self, date_debut:datetime=None, date_fin:datetime=None): """ Récupération du jaugeage avec le débit le plus élevé """ useful_jau = self.get_gaugings(date_debut, date_fin) if len(useful_jau) == 0: return None else: return max(useful_jau, key=lambda x: x.discharge)
[docs] def plot_curves(self, figax = None, date_debut:datetime = None, date_fin:datetime = None, only_active = True, only_last = False) -> tuple[Figure,Axes]: """ Affichage des courbes de tarage :param figax: figure et axes matplotlib :param date_debut: date de début d'intervalle :param date_fin: date de fin d'intervalle :param only_active: ne prendre que les courbes actives (dépend si lu via xml ou xlsx) :param only_last: ne prendre que la dernière courbe encodée return: figure et axes matplotlib """ if figax is None: fig,ax = plt.subplots(1,1) else: fig,ax = figax useful_curves = self.get_curves(date_debut, date_fin, only_active=only_active, only_last=only_last) colorcycler = cycle(['c', 'm', 'y', 'k']) def next_color(): return next(colorcycler) for cur_curve in useful_curves: cur_curve.plot_curve((fig,ax), color=next_color()) ax.set_title(f'Relation entre la hauteur et le débit pour la station {self.name}') ax.set_xlabel('Hauteur [m]') ax.set_ylabel('Débit [$m^3s^{-1}$]') ax.legend() return fig,ax
[docs] def plot_gaugings(self, figax = None, date_debut:datetime = None, date_fin:datetime = None, pointsize:float = 1., confidence_h:float = None, confidence_q:float = None) -> tuple[Figure,Axes]: """ Affichage des jaugeages :param figax: figure et axes matplotlib :param date_debut: date de début d'intervalle :param date_fin: date de fin d'intervalle :param pointsize: taille des points -- 1 par défaut :param confidence_h: incertitude sur la hauteur [m] :param confidence_q: pourcentage d'incertitude sur le débit [-] -- 0.1 pour 10% return: figure et axes matplotlib """ if figax is None: fig,ax = plt.subplots(1,1) else: fig,ax = figax useful_jau = self.get_gaugings(date_debut, date_fin) x = [jaugeage.waterdepth for jaugeage in useful_jau] y = [jaugeage.discharge for jaugeage in useful_jau] if confidence_h is None and confidence_q is None: ax.scatter(x, y, s=pointsize, label='Jaugeages') elif confidence_h is not None and confidence_q is not None: assert confidence_q < 1 and confidence_q > 0, 'Confidence interval must be between 0 and 1' ax.errorbar(x, y, xerr=confidence_h, yerr= [cury * confidence_q for cury in y], fmt='x', label='Jaugeages') elif confidence_h is not None: ax.errorbar(x, y, xerr=confidence_h, fmt='x', label='Jaugeages') elif confidence_q is not None: assert confidence_q < 1 and confidence_q > 0, 'Confidence interval must be between 0 and 1' ax.errorbar(x, y, yerr= [cury * confidence_q for cury in y], fmt='x', label='Jaugeages') return fig,ax
[docs] def fit(self, date_debut:datetime=None, date_fin:datetime=None, which = laws.power): """ Ajustement d'une courbe de tarage :param date_debut: date de début d'intervalle :param date_fin: date de fin d'intervalle :param which: type de courbe de tarage (puissance, polynome 2 ou 3) --see laws enum """ mygaugings = self.get_gaugings(date_debut, date_fin) h, q, dates = zip(*sorted([(curg.waterdepth, curg.discharge, curg.date) for curg in mygaugings])) func = which.value[0]() ret = curve_fit(func, h, q, maxfev= 100000) curve = RatingCurve(date_start=np.min(np.asarray(dates)), date_end=np.max(np.asarray(dates))) curve.add(which.value[1](ret[0]), np.min(np.asarray(h)), np.max(np.asarray(h))) self.curves.append(curve) fig, ax = self.plot_gaugings() curve.plot_curve((fig,ax))
[docs] class StationsCurvesGaugings(): """ Liste des stations avec courbes de tarage et jaugeages """
[docs] stations:dict[str, StationCurvesGaugings]
def __init__(self, dir:str) -> None: """ Initialisation de la liste des stations :param dir: répertoire contenant les données avec un répertoire DCENN et DGH """ try: self.hydrometry = hydrometry() except: self.hydrometry = None dcenn = path.join(dir, 'DCENN') if path.exists(dcenn): logging.debug('DCENN directory found') self.add_data_SPWDCENN(path.join(dir, 'DCENN')) else: logging.warning('DCENN directory not found') dgh = path.join(dir, 'DGH') if path.exists(dgh): logging.debug('DGH directory found') self.add_data_SPWMI( path.join(dir, 'DGH')) else: logging.warning('DGH directory not found') def __getitem__(self, key:str) -> StationCurvesGaugings: """ Accès à une station par son code """ if key in self.stations.keys(): return self.stations[key] else: logging.warning('Station not found - {}'.format(key)) return None def __str__(self) -> str: """ Affichage des stations """ txt = 'Stations:\n\n' for curstation in self.stations: txt += '- ' + curstation + '\n' return txt
[docs] def add_data_SPWDCENN(self, dir:str): """ Gestion des stations SPW-DCENN 1 station = 1 fichier xml """ #récupération de tous les fichiers xml et excel fichiers_xml = glob.glob(dir + '\\*.xml') fichiers_excel = glob.glob(dir + '\\*_Jaugeages.xlsx') fichiers_xml = [fich.lower() for fich in fichiers_xml] fichiers_excel = [fich.lower() for fich in fichiers_excel] self.stations = {} for r in fichiers_xml: station_id = r.split('_')[0] newstation = StationCurvesGaugings(r) newstation.get_href(self.hydrometry) self.stations[newstation.code.lower()] = newstation for r in fichiers_excel: station_id = r.split('_')[0].split('\\')[-1] if station_id in self.stations.keys(): self.stations[station_id]._read_gaugings(r) else: logging.warning('Existing gaugings but no station associated - {}'.format(station_id))
[docs] def add_data_SPWMI(self, dir:str): """ Gestion des sttaions SPW-MI 1 fichier xlsx = plusieurs stations """ #récupération des données fichiers_stations_curves = glob.glob(path.join(dir,'*.xlsx')) for curfich in fichiers_stations_curves: if Path(curfich).name[0]!='~': # lecture des courbes de tarage dataframe=pd.read_excel(curfich, 'CT', skiprows=2) name='' m=0 i=0 while (i < len(dataframe['Station'])) and (str(dataframe['Code Mesure'].iloc[i]) != 'Nombre total de courbes :'): # parcours de toutes les lignes du fichier Excel if dataframe['Station'].iloc[i]!=name and type(dataframe['Station'].iloc[i])==str: # nouvelle station code = str(dataframe['Code Mesure'].iloc[i]) idd = code[0:4] name = dataframe['Station'].iloc[i] newstation = StationCurvesGaugings() newstation.code = idd newstation.name = name newstation.get_href(self.hydrometry) self.stations[idd.lower()] = newstation #stockage des expressions analytiques z=i while (z<len(dataframe['Station'])) and (dataframe['Station'].iloc[z]==name or type(dataframe['Station'].iloc[z])==float): j=z #conversion de l'heure et de la date en type datetime heure_début = dataframe['Hd'].iloc[z] heure_fin = dataframe['Hf'].iloc[z] delta1 = timedelta(hours=heure_début) delta2 = timedelta(hours=heure_fin) date_debut = dataframe['Début'].iloc[z] + delta1 date_fin = dataframe['Fin'].iloc[z] + delta2 curRC = RatingCurve(date_debut, date_fin) newstation.add_ratingcurve(curRC) while j<len(dataframe['Station']) and (type(dataframe['Début'].iloc[j])!=datetime or j==z) and (not np.isnan(dataframe['A x 3'].iloc[j])): a = dataframe['A x 3'].iloc[j] b = dataframe['B x 2'].iloc[j] c = dataframe['C x'].iloc[j] d = dataframe['D'].iloc[j] # create symbolic function x = symbols('x') expression = a * x**3 + b * x**2 + c * x + d # convert to function # curRC.add(lambdify(x, expression), dataframe['H min'].iloc[j], dataframe['H max'].iloc[j]) if str(expression) == 'nan': pass curRC.add(str(expression), dataframe['H min'].iloc[j], dataframe['H max'].iloc[j]) j=j+1 else : while j<len(dataframe['Station']) and np.isnan(dataframe['A x 3'].iloc[j]): j=j+1 z=j # break i=z # lecture des jaugeages data_jaugeages=pd.read_excel(curfich, 'Jaugeages', skiprows=2) #stockage des points de jaugeages name='' i=0 l=0 while i < len(data_jaugeages['Station']): # parcours de toutes les lignes du fichier Excel if data_jaugeages['Station'].iloc[i]!=name and type(data_jaugeages['Station'].iloc[i])==str: # nouvelle station code = str(data_jaugeages['Stasssat'].iloc[i]) idd = code[0:4].lower() name = data_jaugeages['Station'].iloc[i] if idd in self.stations.keys(): curstation = self.stations[idd] else: curstation = None logging.warning('Existing gaugings but no station associated - {}'.format(idd)) if curstation is not None: l=i while l<len(data_jaugeages['Station']) and ((data_jaugeages['Station'][l]==name and type(data_jaugeages['Station'][l])==str) or (type(data_jaugeages['Station'][l])!=str)): if str(data_jaugeages['hhmm'][l])=='nan': pass else : delta=datetime.strptime(str(data_jaugeages['hhmm'][l]), '%Hh%M') date = datetime.combine(data_jaugeages['Date'][l],delta.time()) add_jaugeage=Gauging(date, data_jaugeages['Hl'][l], data_jaugeages['Débit'][l], data_jaugeages['S'][l], data_jaugeages['L'][l], data_jaugeages['PE'][l]) l=l+1 curstation.gaugings.append(add_jaugeage) i=l else: i+=1
if __name__=='__main__':
[docs] func = get_power_law()
ret = func(1., .5, 2., .25) all_stations = StationsCurvesGaugings('data\\tarage') curstation= all_stations.stations["L5860".lower()] curstation.fit(which=laws.poly3) h = pd.Series(np.asarray([1.,2,3.])) q =curstation.curves[-2].compute_q_from_h(h) h2 =curstation.curves[-2].compute_h_from_q(q) calibT2 = pd.read_csv(r'data\sim1D\Theux\calibration_T-2.csv') calibT25 = pd.read_csv(r'data\sim1D\Theux\calibration_T-25.csv') calibT100 = pd.read_csv(r'data\sim1D\Theux\calibration_T-100.csv') calibT2021 = pd.read_csv(r'data\sim1D\Theux\calibration_T-2021.csv') fig,ax = curstation.plot_curves(only_active=False) curstation.plot_gaugings((fig,ax)) periods = curstation.get_periods() whichone = -2 fig,ax = curstation.plot_curves(date_debut = periods[whichone][0], date_fin = periods[whichone][1]) curstation.plot_gaugings((fig,ax), date_debut = periods[whichone][0], date_fin = periods[whichone][1]) fig,ax = curstation.plot_curves(only_active=False) curstation.plot_gaugings((fig,ax)) ax.scatter(calibT2021['h_T-2021'], calibT2021['q_T-2021'], s=1., label='Y2021') ax.scatter(calibT100['h_T-100'] , calibT100['q_T-100'], s=1., label='T100 ans') ax.scatter(calibT25['h_T-25'] , calibT25['q_T-25'], s=1., label='T25 ans') ax.scatter(calibT2['h_T-2'] , calibT2['q_T-2'], s=1., label='T2 ans') ax.legend() fig.show() periods = curstation.get_periods() whichone = -2 fig,ax = curstation.plot_curves(date_debut = periods[whichone][0], date_fin = periods[whichone][1]) curstation.plot_gaugings((fig,ax), date_debut = periods[whichone][0], date_fin = periods[whichone][1]) ax.scatter(calibT100['h_T-100'] , calibT100['q_T-100'], s=1., label='T100 ans') ax.scatter(calibT25['h_T-25'] , calibT25['q_T-25'], s=1., label='T25 ans') ax.scatter(calibT2['h_T-2'] , calibT2['q_T-2'], s=1., label='T2 ans') ax.scatter(calibT2021['h_T-2021'], calibT2021['q_T-2021'], s=2., label='Y2021') qmax2 = calibT2['q_T-2'].max() qmax25 = calibT25['q_T-25'].max() qmax100 = calibT100['q_T-100'].max() qmax2021 = calibT2021['q_T-2021'].max() hmax2 = calibT2['h_T-2'].max() hmax25 = calibT25['h_T-25'].max() hmax100 = calibT100['h_T-100'].max() hmax2021 = calibT2021['h_T-2021'].max() ax.hlines(qmax2, 0,hmax2, linestyles='--') ax.hlines(qmax25, 0,hmax25, linestyles='--') ax.hlines(qmax100, 0,hmax100, linestyles='--') ax.hlines(qmax2021, 0,hmax2021, linestyles='--') ax.vlines(hmax2, 0,qmax2, linestyles='--') ax.vlines(hmax25, 0,qmax25, linestyles='--') ax.vlines(hmax100, 0,qmax100, linestyles='--') ax.vlines(hmax2021, 0,qmax2021, linestyles='--') ax.scatter(160.62-157.203, 130., marker='o', label='2D T25') ax.scatter(161.10-157.203, 210., marker='o', label='2D T100') ax.scatter(162.-157.203, 380., marker='o', label='2D T1000') # ax.scatter(3.1, 130., marker='+', label='2D T25-H') ax.legend() fig.show() pass