Source code for wolfhece.hydrology.Comparison

"""
Author: HECE - University of Liege, Pierre Archambeau, Christophe Dessers
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import sys                              # module to stop the program when an error is encountered
import os
import matplotlib.pyplot as plt
import numpy as np
from numpy.__config__ import show
from numpy.core.arrayprint import set_string_function
from wx.core import DD_CHANGE_DIR

from ..PyTranslate import _
from .Catchment import *

[docs] class Comparison: """ This class contains several Catchment objects and all the procedures to produce plots that can compare different results of these Catchments. """ ## Constructor def __init__(self, workingDir, dictCatchment, dictToCompar): # =========== # # ## @var workingDir The path of the working directory self.workingDir = workingDir + 'PostProcess_Comparison/' ## Dictionary containing all the objects Catchment: # @var dictCatchment dictionnary containing: 'Title' and 'Object' # 'Title': the name that one wants to give to the element on graphs # 'Object': The Catchment object containing all the information of the Catchment. self.myCatchments = dictCatchment ## @var dictToCompar # Dictionary containing: # - 1: if the plot function is used # - 0: otherwise self.dictToCompar = dictToCompar ## @var plotDict disctionnary containing all basic plot informations self.plotDict = {} # ========================================================================================== # Creation of the PostProcess directory # It will contain all the the saved results. if not os.path.exists(self.workingDir): try: os.mkdir(self.workingDir) except OSError: print ("Creation of the directory %s failed" % self.workingDir) else: print ("Successfully created the directory %s" % self.workingDir) # Verification that the number of Catchments to compare is greater than 1 if(len(self.myCatchments)<=1): print('ERROR: Cannot compare less than 2 Catchments') # sys.exit()
[docs] def compare_now(self): # Check and run all the functions to use if(int(self.dictToCompar['hydro subbasin'][key_Param.VALUE]) == 1): self.hydro_subbasin() if(int(self.dictToCompar['hydro junction'][key_Param.VALUE]) == 1): self.hydro_junction() if(int(self.dictToCompar['hydro final'][key_Param.VALUE]) == 1): self.hydro_final()
[docs] def hydro_subbasin(self): print("Comparison of all subbasin ...") # Test that the number of subbasins are the same nbSubBasins=len(self.myCatchments['Catchment 1']['Object'].subBasinDict) for i in range(2, len(self.myCatchments)+1): txtTmp = 'Catchment ' + str(i) tmpNb = len(self.myCatchments[txtTmp]['Object'].subBasinDict) if(tmpNb!=nbSubBasins): print("ERROR: These Catchments cannot be compared as their number of subbasins are not the same!") sys.exit() for subId in range(1,tmpNb+1): # Figure Rain on a first y axis fig,ax1=plt.subplots() ax1.set_xlabel('Time [years]') ax1.set_ylabel(_('Discharge [mm/s]'),color='k') #Color express in %RGB: (1,1,1) # ax1.set_ylim(0, self.myHydro.max()*2) # ax1.hist(data1,color=(0,0,1),edgecolor='black',linewidth=1.2) max_= 0 for element in self.myCatchments: title = self.myCatchments[element]['Title'] pointer = self.myCatchments[element]['Object'].subBasinDict[subId] x = pointer.time/(3600.0*24*365)+2000 # [years] y1 = pointer.myHydro*pointer.surfaceDrained/3.6 y2 = pointer.rain if(y1.max()>max_): max_ = y1.max() ax1.set_ylim(0, max_*2) ax1.set_xticks([2000, 2001, 2002, 2003]) ax1.plot(x,y1,'--',label=title) ax1.tick_params(axis='y',labelcolor='k') # Figure Hydro on a second y axis ax2=ax1.twinx() ax2.set_ylabel(_('Precipitations [m³/s]'),color='b') ax2.set_ylim(y2.max()*3, 0) ax2.plot(x,y2,color='b') ax2.fill_between(x, y2, 0, color='b') ax2.tick_params(axis='y',labelcolor='b') fig.tight_layout() handles, labels = ax1.get_legend_handles_labels() fig.legend(handles, labels) plt.savefig(self.workingDir+'/Subbasin_'+str(subId)+'_Compar'+'.pdf')
[docs] def hydro_junction(self): print("Comparison of all junctions ...")
# # Test that the number of subbasins are the same # nbJunction=len(self.myCatchments['Catchment 1']['Object'].intersection) # for i in range(2, len(self.myCatchments)+1): # txtTmp = 'Catchment ' + str(i) # tmpNb = len(self.myCatchments[txtTmp]['Object'].intersection) # if(tmpNb!=nbJunction): # print("ERROR: These Catchments cannot be compared as their number of subbasins are not the same!") # sys.exit() # for subId in range(1,tmpNb+1): # # Figure Rain on a first y axis # fig,ax1=plt.subplots() # ax1.set_xlabel('Temps [h]') # ax1.set_ylabel('Débits [mm/s]',color='k') #Color express in %RGB: (1,1,1) # # ax1.set_ylim(0, self.myHydro.max()*2) # # ax1.hist(data1,color=(0,0,1),edgecolor='black',linewidth=1.2) # max_= 0 # for element in self.myCatchments: # title = self.myCatchments[element]['Title'] # pointer = self.myCatchments[element]['Object'].intersection[subId] # x = pointer.time/3600.0 # [h] # y1 = pointer.myHydro # y2 = pointer.rain # if(y1.max()>max_): # max_ = y1.max() # ax1.set_ylim(0, max_*2) # ax1.plot(x,y1,label=title) # ax1.tick_params(axis='y',labelcolor='k') # # Figure Hydro on a second y axis # ax2=ax1.twinx() # ax2.set_ylabel('Précipitations [m³/s]',color='b') # ax2.set_ylim(y2.max()*3, 0) # ax2.plot(x,y2,color='b') # ax2.fill_between(x, y2, 0, color='b') # ax2.tick_params(axis='y',labelcolor='b') # fig.tight_layout() # handles, labels = ax1.get_legend_handles_labels() # fig.legend(handles, labels) # plt.savefig(self.workingDir+'/Subbasin_'+str(subId)+'_Compar'+'.pdf') ## Caution the number of levels are not assumed to be the same here! This function can compare # catchments with completely different topology
[docs] def hyetos(self, catchColors=[], rangeRain=[], writeFile='', myTraits=None, beginDate=None, endDate=None, dt=None, yrangeData=[], \ addData=[], dt_addData=[], beginDates_addData=[], endDates_addData=[],label_addData=[], color_addData=[], typeOfTraits_addData=[],cumulOrigin_addData=[]): """ TO DO : A Généraliser dans le cas où les sous-bassins n'ont pas le même nom et topologie !!!!! """ print("Comparison of the hyetos from all subbasins") if(beginDate is None): beginDate = [] for idCatch in self.myCatchments: beginDate.append(self.myCatchments[idCatch]['Object'].dateBegin) # beginDate = self.myCatchments[idCatch]['Object'].dateBegin # break if(endDate is None): endDate = [] for idCatch in self.myCatchments: endDate.append(self.myCatchments[idCatch]['Object'].dateEnd) # endDate = self.myCatchments[idCatch]['Object'].dateEnd # break if(dt is None): dt=[] for idCatch in self.myCatchments: dt.append(self.myCatchments[idCatch]['Object'].deltaT) basinNames = [] # contains all the names of the subbasin for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] for nameBasin in curCatch.subBasinDict: if(not(nameBasin in basinNames)): basinNames.append(nameBasin) nbCatchment = len(self.myCatchments) x_title = "Dates" if(catchColors==[]): tmpColors = np.random.rand(nbCatchment) for icolor in range(len(tmpColors)): catchColors.append(tmpColors[icolor]) elif(len(catchColors)!=nbCatchment): print("ERROR: the number of catchments is not the same as the number of colors given") for nameBasin in basinNames: y1 = [] y2 = [] yLabels = [] myTraits = [] for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] y1.append(curCatch.subBasinDict[nameBasin].myRain[:-1]) dtH = curCatch.subBasinDict[nameBasin].deltaT/3600.0 tmpCumul = datt.cumul_data(curCatch.subBasinDict[nameBasin].myRain[:-1], dtH, dtH) y2.append(tmpCumul) yLabels.append(curCatch.name) graph_title = curCatch.subBasinDict[nameBasin].name myTraits.append('-') nbAddData = 0 if(addData!=[]): nbAddData = len(addData) for i in range(nbAddData): y1.append(addData[i][:-1]) tmpY2 = datt.cumul_data(addData[i][:-1], dt_addData[i]/3600.0, dt_addData[i]/3600.0) if(cumulOrigin_addData!=[]): y2.append(tmpY2-cumulOrigin_addData[i]) else: y2.append(tmpY2) beginDate.append(beginDates_addData[i]) endDate.append(endDates_addData[i]) dt.append(dt_addData[i]) if(label_addData!=[]): yLabels.append(label_addData[i]) if(color_addData!=[]): catchColors.append(color_addData[i]) if(typeOfTraits_addData!=[]): myTraits.append(typeOfTraits_addData[i]) # Plot Rains yTitles = "Pluies [mm/h]" writeFileDef = writeFile + "_Rain_" + graph_title.replace(".","") ph.plot_hydro(nbCatchment+nbAddData, y1, x_title=x_title, y_titles=yTitles, beginDate=beginDate,endDate=endDate,dt=dt,graph_title=graph_title, \ y_labels=yLabels,rangeData=rangeRain,y_data_range=yrangeData,myColors=catchColors,typeOfTraits=myTraits,writeFile=writeFileDef) # Plot Cumulated rains yTitles = "Cumulated Rain [mm]" writeFileDef = writeFile + "_CumulRain_" + graph_title.replace(".","") ph.plot_hydro(nbCatchment+nbAddData, y2, x_title=x_title, y_titles=yTitles, beginDate=beginDate,endDate=endDate, dt=dt,graph_title=graph_title, \ y_labels=yLabels,rangeData=rangeRain,y_data_range=yrangeData,myColors=catchColors,typeOfTraits=myTraits,writeFile=writeFileDef) plt.show()
[docs] def comparison_with_measures(self,addCatchment): print("TO DO") sys.exit() x_range = [x_min,x_max] attemptName = "Comparaison Hydro" for element in testFroudeCatchment.subBasinDict: if(testFroudeCatchment.subBasinDict[element].name == "Chaudfontaine"): stationName = "Chaudfontaine" Qnat = naturalCatchment.subBasinDict[element].myHydro[:] addData = [] addData.append(Qnat[:-1]) dtAddData = [testFroudeCatchment.deltaT] beginDatesAddData = [testFroudeCatchment.dateBegin] endDatesAddData = [testFroudeCatchment.dateEnd] label_addData = ["BV avant modification du Froude"] color_addData = ["y"] testFroudeCatchment.subBasinDict[element].plot_outlet(Measures=RV.MeasuresChau,withEvap=False,rangeData=x_range,graph_title=stationName ,writeFile=writeDir+stationName+attemptName, withDelay=False) elif(testFroudeCatchment.subBasinDict[element].name == "Theux"): stationName = "Theux" Qnat = naturalCatchment.subBasinDict[element].myHydro[:] addData = [] addData.append(Qnat[:-1]) dtAddData = [testFroudeCatchment.deltaT] beginDatesAddData = [testFroudeCatchment.dateBegin] endDatesAddData = [testFroudeCatchment.dateEnd] label_addData = ["BV avant modification du Froude"] color_addData = ["y"] testFroudeCatchment.subBasinDict[element].plot_outlet(Measures=RV.MeasuresTheu,withEvap=False,rangeData=x_range,graph_title=stationName,writeFile=writeDir+stationName+attemptName, withDelay=False) elif(testFroudeCatchment.subBasinDict[element].name == "SPIXHE"): stationName = "SPIXHE" Qnat = naturalCatchment.subBasinDict[element].myHydro[:] addData = [] addData.append(Qnat[:-1]) dtAddData = [testFroudeCatchment.deltaT] beginDatesAddData = [testFroudeCatchment.dateBegin] endDatesAddData = [testFroudeCatchment.dateEnd] label_addData = ["BV avant modification du Froude"] color_addData = ["y"] testFroudeCatchment.subBasinDict[element].plot_outlet(Measures=RV.MeasuresSpix,withEvap=False,rangeData=x_range,graph_title=stationName ,writeFile=writeDir+stationName+attemptName, withDelay=False) elif(testFroudeCatchment.subBasinDict[element].name == "Station Verviers"): stationName = "Station Verviers" Qnat = naturalCatchment.subBasinDict[element].myHydro[:] addData = [] addData.append(Qnat[:-1]) dtAddData = [testFroudeCatchment.deltaT] beginDatesAddData = [testFroudeCatchment.dateBegin] endDatesAddData = [testFroudeCatchment.dateEnd] label_addData = ["BV avant modification du Froude"] color_addData = ["y"] testFroudeCatchment.subBasinDict[element].plot_outlet(Measures=RV.MeasuresVerv,withEvap=False,rangeData=x_range,graph_title=stationName ,writeFile=writeDir+stationName+attemptName, withDelay=False) elif(testFroudeCatchment.subBasinDict[element].name == "Station Foret (Magne)"): stationName = "Station Foret (Magne)" Qnat = naturalCatchment.subBasinDict[element].myHydro[:] addData = [] addData.append(Qnat[:-1]) dtAddData = [testFroudeCatchment.deltaT] beginDatesAddData = [testFroudeCatchment.dateBegin] endDatesAddData = [testFroudeCatchment.dateEnd] label_addData = ["BV avant modification du Froude"] color_addData = ["y"] testFroudeCatchment.subBasinDict[element].plot_outlet(Measures=RV.MeasuresFor,withEvap=False,rangeData=x_range,graph_title=stationName ,writeFile=writeDir+stationName+attemptName, withDelay=False) elif(testFroudeCatchment.subBasinDict[element].name == "Polleur"): stationName = "Polleur" Qnat = naturalCatchment.subBasinDict[element].myHydro[:] addData = [] addData.append(Qnat[:-1]) dtAddData = [testFroudeCatchment.deltaT] beginDatesAddData = [testFroudeCatchment.dateBegin] endDatesAddData = [testFroudeCatchment.dateEnd] label_addData = ["BV avant modification du Froude"] color_addData = ["y"] testFroudeCatchment.subBasinDict[element].plot_outlet(Measures=RV.MeasuresPoll,withEvap=False,rangeData=x_range,graph_title=stationName ,writeFile=writeDir+stationName+attemptName, withDelay=False) elif(testFroudeCatchment.subBasinDict[element].name == "Belleheid"): stationName = "Belleheid" Qnat = naturalCatchment.subBasinDict[element].myHydro[:] addData = [] addData.append(Qnat[:-1]) dtAddData = [testFroudeCatchment.deltaT] beginDatesAddData = [testFroudeCatchment.dateBegin] endDatesAddData = [testFroudeCatchment.dateEnd] label_addData = ["BV avant modification du Froude"] color_addData = ["y"] testFroudeCatchment.subBasinDict[element].plot_outlet(Measures=RV.MeasuresBell,withEvap=False,rangeData=x_range,graph_title=stationName ,writeFile=writeDir+stationName+attemptName, withDelay=False) elif(testFroudeCatchment.subBasinDict[element].name == "Barrage Vesdre"): stationName = "Barrage Vesdre" Qnat = naturalCatchment.subBasinDict[element].myHydro[:] addData = [] addData.append(Qnat[:-1]) dtAddData = [testFroudeCatchment.deltaT] beginDatesAddData = [testFroudeCatchment.dateBegin] endDatesAddData = [testFroudeCatchment.dateEnd] label_addData = ["BV avant modification du Froude"] color_addData = ["y"] testFroudeCatchment.subBasinDict[element].plot_outlet(Measures=RV.MeasuresBVesIn,withEvap=False,rangeData=x_range,graph_title=stationName ,writeFile=writeDir+stationName+attemptName, withDelay=False, yrangeData=yrangeData, yrangeRain=yrangeRain) elif(testFroudeCatchment.subBasinDict[element].name == "BV Barrage Vesdre"): stationName = "BV B Vesdre" Qnat = naturalCatchment.subBasinDict[11].myHydro[:] addData = [] addData.append(QVesdreFroude[:-1]) addData.append(QVesdreBeforeFroude[:-1]) addData.append(RV.MeasuresBVesIn.myHydro[:]) dtAddData = [testFroudeCatchment.deltaT,testFroudeCatchment.deltaT,RV.MeasuresBVesIn.deltaT] beginDatesAddData = [testFroudeCatchment.dateBegin, testFroudeCatchment.dateBegin,RV.MeasuresBVesIn.dateBegin] endDatesAddData = [testFroudeCatchment.dateEnd,testFroudeCatchment.dateEnd,RV.MeasuresBVesIn.dateEnd] label_addData = ["Debits simulés avec apport de la Helle et de la Soor","BV avant modification du Froude", "Hydrogramme entrant reconstruit"] color_addData = ["b","y","k"] testFroudeCatchment.subBasinDict[element].plot_outlet(withEvap=False,rangeData=x_range,graph_title=stationName ,writeFile=writeDir+stationName+attemptName, withDelay=False, \ addData=addData,dt_addData=dtAddData,beginDates_addData=beginDatesAddData,endDates_addData=endDatesAddData,label_addData=label_addData,color_addData=color_addData)
[docs] def outlet_all_basins_same_topo(self, plotDict={}, show=True, envelop=False, refModuleName =""): # Load General characteristics of the dictionnary if(not("Time Zone Plot" in plotDict["General Parameters"])): tzPlot = 0 tzDelta = datetime.timedelta(hours=0) else: tzPlot = plotDict["General Parameters"]["Time Zone Plot"] tzDelta = datetime.timedelta(hours=tzPlot) if(envelop and not("Ref Name" in plotDict["General Parameters"])): refName = "Catchment 1" else: refName = plotDict["General Parameters"]["Ref Name"] if(not("Add Table" in plotDict["General Parameters"])): addTable = False else: addTable = plotDict["General Parameters"]["Add Table"] if(not("Date Begin" in plotDict["General Parameters"])): beginDate = [] for idCatch in self.myCatchments: if(not(envelop)): beginDate.append(self.myCatchments[idCatch]['Object'].dateBegin+tzDelta) elif(idCatch==refName): beginDate.append(self.myCatchments[idCatch]['Object'].dateBegin+tzDelta) else: beginDate = plotDict["General Parameters"]["Date Begin"]+tzDelta if(not("Date End" in plotDict["General Parameters"])): endDate = [] for idCatch in self.myCatchments: if(not(envelop)): endDate.append(self.myCatchments[idCatch]['Object'].dateEnd+tzDelta) elif(idCatch==refName): endDate.append(self.myCatchments[idCatch]['Object'].dateEnd+tzDelta) else: endDate = plotDict["General Parameters"]["Date End"]+tzDelta if(not("Dt" in plotDict["General Parameters"])): dt=[] for idCatch in self.myCatchments: if(not(envelop)): dt.append(self.myCatchments[idCatch]['Object'].deltaT) elif(idCatch==refName): dt.append(self.myCatchments[idCatch]['Object'].deltaT) else: dt = plotDict["General Parameters"]["Dt"] # All the useful junctions will be listed so that all the elements won't be analysed anymore basinId = [] # contains all the names of the subbasin basinNames = {} for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] for id in curCatch.subBasinDict: curBasin = curCatch.subBasinDict[id] if(not(id in basinId) and (curBasin.name in plotDict)): basinId.append(id) basinNames[id] = (curBasin.name) nbCatchment = len(self.myCatchments) x_title = "Dates " + "(GMT+"+ str(tzPlot) + ")" if(not("Catchment colors" in plotDict["General Parameters"])): tmpColors = np.random.rand(nbCatchment) catchColors = [] for icolor in range(len(tmpColors)): if(not(envelop)): catchColors.append(tmpColors[icolor]) elif(idCatch==refName): catchColors.append(tmpColors[icolor]) else: catchColors = [] catchColors = plotDict["General Parameters"]["Catchment colors"] if("Catchment traits" in plotDict["General Parameters"]): myTraits = [] myTraits = plotDict["General Parameters"]["Catchment traits"] if(len(myTraits)!=nbCatchment): print("ERROR: the number of catchments is not the same as the number of colors given") sys.exit() if(len(catchColors)!=nbCatchment and not(envelop)): print("ERROR: the number of catchments is not the same as the number of colors given") sys.exit() if("Same rain" in plotDict["General Parameters"]): sameRain = plotDict["General Parameters"]["Same rain"] else: sameRain = True if("Display rain" in plotDict["General Parameters"]): displayRain = plotDict["General Parameters"]["Display rain"] else: displayRain = True if(refModuleName!=""): print("To Do !!!!") sys.exit() for id in basinId: y1 = [] yLabels = [] if(not("Catchment traits" in plotDict["General Parameters"])): myTraits = [] rain = [] z = [] nbAddRain=0 y_labelAddRain = [] upperPlot = False allSurfaces = [] mySurf = 0.0 if(basinNames[id]=="BV Barrage Vesdre"): if(envelop): firstKey = list(self.myCatchments.items())[0][0] nbTElements = len(self.myCatchments[firstKey]['Object'].subBasinDict[id].get_outFlow_noDelay()) allHydros = np.zeros((nbTElements,nbCatchment)) counter = 0 for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] tmp = curCatch.retentionBasinDict["J18"].directFluxInRB tmpHydro = np.zeros(len(tmp)) index = math.floor(curCatch.retentionBasinDict["J18"].timeDelay/curCatch.retentionBasinDict["J18"].deltaT) if(index==0): tmpHydro = tmp elif(index<len(tmp)): tmpHydro[:-index] = tmp[index:] else: print("ERROR: the simulation time is not long enough for this subbasin to be taken into account") sys.exit() allHydros[:,counter] = tmpHydro[:] if(idCatch==refName): y1.append(tmpHydro[:]) yLabels.append(curCatch.name) if("Station Name" in plotDict[basinNames[id]]): graph_title = plotDict[basinNames[id]]["Station Name"] else: graph_title = curCatch.subBasinDict[id].name myTraits.append('-') ymax = np.amax(allHydros, axis=1) ymin = np.amin(allHydros, axis=1) else: for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] if(curCatch.myModel==cst.tom_UH): # tmp = curCatch.retentionBasinDict["J18"].directFluxInRB # tmpHydro = np.zeros(len(tmp)) # index = math.floor(curCatch.retentionBasinDict["J18"].timeDelay/curCatch.retentionBasinDict["J18"].deltaT) # if(index==0): # tmpHydro = tmp # elif(index<len(tmp)): # tmpHydro[:-index] = tmp[index:] # else: # print("ERROR: the simulation time is not long enough for this subbasin to be taken into account") # sys.exit() cur_module:RetentionBasin = curCatch.retentionBasinDict["J18"] tmpHydro = cur_module.get_direct_insideRB_inlets(unit='m3/s') y1.append(tmpHydro[:]) else: # tmp = curCatch.retentionBasinDict["J18"].directFluxInRB # tmpHydro = np.zeros(len(tmp)) # index = math.floor(curCatch.retentionBasinDict["J18"].timeDelay/curCatch.retentionBasinDict["J18"].deltaT) # if(index==0): # tmpHydro = tmp # elif(index<len(tmp)): # tmpHydro[:-index] = tmp[index:] # else: # print("ERROR: the simulation time is not long enough for this subbasin to be taken into account") # sys.exit() cur_module:RetentionBasin = curCatch.retentionBasinDict["J18"] tmpHydro = cur_module.get_direct_insideRB_inlets(unit='m3/s') y1.append(tmpHydro[:]) yLabels.append(curCatch.name) if("Station Name" in plotDict[basinNames[id]]): graph_title = plotDict[basinNames[id]]["Station Name"] else: graph_title = curCatch.subBasinDict[id].name if(not("Catchment traits" in plotDict["General Parameters"])): myTraits.append('-') elif(basinNames[id]=="BV Barrage Gileppe"): if(envelop): firstKey = list(self.myCatchments.items())[0][0] nbTElements = len(self.myCatchments[firstKey]['Object'].subBasinDict[id].get_outFlow_noDelay()) allHydros = np.zeros((nbTElements,nbCatchment)) counter = 0 for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] tmp = curCatch.retentionBasinDict["J16"].directFluxInRB tmpHydro = np.zeros(len(tmp)) index = math.floor(curCatch.retentionBasinDict["J16"].timeDelay/curCatch.retentionBasinDict["J18"].deltaT) if(index==0): tmpHydro = tmp elif(index<len(tmp)): tmpHydro[:-index] = tmp[index:] else: print("ERROR: the simulation time is not long enough for this subbasin to be taken into account") sys.exit() allHydros[:,counter] = tmpHydro[:] if(idCatch==refName): y1.append(tmpHydro[:]) yLabels.append(curCatch.name) if("Station Name" in plotDict[basinNames[id]]): graph_title = plotDict[basinNames[id]]["Station Name"] else: graph_title = curCatch.subBasinDict[id].name myTraits.append('-') ymax = np.amax(allHydros, axis=1) ymin = np.amin(allHydros, axis=1) else: for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] if(curCatch.myModel==cst.tom_UH): # tmp = curCatch.subBasinDict[19].myHydro[:] + curCatch.retentionBasinDict["J19"].get_outFlow_noDelay() # y1.append(tmp[:]) tmp = curCatch.retentionBasinDict["J16"].directFluxInRB tmpHydro = np.zeros(len(tmp)) index = math.floor(curCatch.retentionBasinDict["J16"].timeDelay/curCatch.retentionBasinDict["J18"].deltaT) if(index==0): tmpHydro = tmp elif(index<len(tmp)): tmpHydro[:-index] = tmp[index:] else: print("ERROR: the simulation time is not long enough for this subbasin to be taken into account") sys.exit() y1.append(tmpHydro[:]) else: tmp = curCatch.retentionBasinDict["J16"].directFluxInRB tmpHydro = np.zeros(len(tmp)) index = math.floor(curCatch.retentionBasinDict["J16"].timeDelay/curCatch.retentionBasinDict["J18"].deltaT) if(index==0): tmpHydro = tmp elif(index<len(tmp)): tmpHydro[:-index] = tmp[index:] else: print("ERROR: the simulation time is not long enough for this subbasin to be taken into account") sys.exit() y1.append(tmpHydro[:]) yLabels.append(curCatch.name) if("Station Name" in plotDict[basinNames[id]]): graph_title = plotDict[basinNames[id]]["Station Name"] else: graph_title = curCatch.subBasinDict[id].name if(not("Catchment traits" in plotDict["General Parameters"])): myTraits.append('-') else: # Envelop graph. It will only save the min, max and the ref if(envelop): firstKey = list(self.myCatchments.items())[0][0] nbTElements = len(self.myCatchments[firstKey]['Object'].subBasinDict[id].get_outFlow_noDelay()) allHydros = np.zeros((nbTElements,nbCatchment)) counter = 0 for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] tmp = curCatch.subBasinDict[id].get_outFlow_noDelay() allHydros[:,counter] = tmp[:] counter += 1 if(idCatch==refName): y1.append(tmp[:]) yLabels.append(curCatch.name) if("Station Name" in plotDict[basinNames[id]]): graph_title = plotDict[basinNames[id]]["Station Name"] else: graph_title = curCatch.subBasinDict[id].name myTraits.append('-') if(sameRain and displayRain): if(rain==[]): rain = curCatch.subBasinDict[id].rain/curCatch.subBasinDict[id].surfaceDrainedHydro*3.6 elif(displayRain): upperPlot = True nbAddRain += 1 z.append(curCatch.subBasinDict[id].rain/curCatch.subBasinDict[id].surfaceDrainedHydro*3.6) y_labelAddRain.append(curCatch.name) ymax = np.amax(allHydros, axis=1) ymin = np.amin(allHydros, axis=1) else: for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] tmp = curCatch.subBasinDict[id].get_outFlow_noDelay() y1.append(tmp[:]) yLabels.append(curCatch.name) if("Station Name" in plotDict[basinNames[id]]): graph_title = plotDict[basinNames[id]]["Station Name"] else: graph_title = curCatch.subBasinDict[id].name if(not("Catchment traits" in plotDict["General Parameters"])): myTraits.append('-') if(sameRain and displayRain): if(rain==[]): rain = curCatch.subBasinDict[id].rain/curCatch.subBasinDict[id].surfaceDrainedHydro*3.6 elif(displayRain): upperPlot = True nbAddRain += 1 z.append(curCatch.subBasinDict[id].rain/curCatch.subBasinDict[id].surfaceDrainedHydro*3.6) y_labelAddRain.append(curCatch.name) mySurf = curCatch.subBasinDict[id].surfaceDrainedHydro nbAddData = 0 beginDateAddData = [] endDateAddData = [] dtAddData = [] catchColorsAddData = [] if("Add Data" in plotDict[basinNames[id]]): nbAddData = len(plotDict[basinNames[id]]["Add Data"]["Data"]) for i in range(nbAddData): y1.append(plotDict[basinNames[id]]["Add Data"]["Data"][i][:]) if("Date Begin" in plotDict[basinNames[id]]["Add Data"]): beginDateAddData.append(plotDict[basinNames[id]]["Add Data"]["Date Begin"][i]+tzDelta) elif("Date Begin" in plotDict["General Parameters"]["Add Data"]): beginDateAddData.append(plotDict["General Parameters"]["Add Data"]["Date Begin"][i]+tzDelta) if("Date End" in plotDict[basinNames[id]]["Add Data"]): endDateAddData.append(plotDict[basinNames[id]]["Add Data"]["Date End"][i]+tzDelta) elif("Date End" in plotDict["General Parameters"]["Add Data"]): endDateAddData.append(plotDict["General Parameters"]["Add Data"]["Date End"][i]+tzDelta) if("Dt" in plotDict[basinNames[id]]["Add Data"]): dtAddData.append(plotDict[basinNames[id]]["Add Data"]["Dt"][i]) elif("Dt" in plotDict["General Parameters"]["Add Data"]): dtAddData.append(plotDict["General Parameters"]["Add Data"]["Dt"][i]) if("Labels" in plotDict[basinNames[id]]["Add Data"]): yLabels.append(plotDict[basinNames[id]]["Add Data"]["Labels"][i]) elif("Labels" in plotDict["General Parameters"]["Add Data"]): yLabels.append(plotDict["General Parameters"]["Add Data"]["Labels"][i]) if("Colors" in plotDict[basinNames[id]]["Add Data"]): catchColorsAddData.append(plotDict[basinNames[id]]["Add Data"]["Colors"][i]) elif("Colors" in plotDict["General Parameters"]["Add Data"]): catchColorsAddData.append(plotDict["General Parameters"]["Add Data"]["Colors"][i]) if("Type of Traits" in plotDict[basinNames[id]]["Add Data"]): myTraits.append(plotDict[basinNames[id]]["Add Data"]["Type of Traits"][i]) elif("Type of Traits" in plotDict["General Parameters"]["Add Data"]): myTraits.append(plotDict["General Parameters"]["Add Data"]["Type of Traits"][i]) if("Measures" in plotDict[basinNames[id]]): Measures = plotDict[basinNames[id]]["Measures"] myMeasure = Measures.myHydro yLabels.append(_("Measures")) catchColorsAddData.append('k') myTraits.append('-') if(Measures.surfaceDrainedHydro>0.0): surfaceMeasure=Measures.surfaceDrainedHydro elif(mySurf!=0.0): surfaceMeasure = mySurf else: surfaceMeasure = -1.0 else: myMeasure = [] Measures = None if("X Range" in plotDict["General Parameters"]): xRange = plotDict["General Parameters"]["X Range"] else: xRange = [] if("Y Range" in plotDict["General Parameters"]): yRange = plotDict["General Parameters"]["Y Range"] else: yRange = [] if("Writing Directory" in plotDict["General Parameters"]): writeFile = plotDict["General Parameters"]["Writing Directory"] else: writeFile = "" if("Add Measure in table" in plotDict["General Parameters"]): addMeasfInTab = plotDict["General Parameters"]["Add Measure in table"] else: addMeasfInTab = False if(mySurf!=0.0): allSurfaces = [mySurf]*(nbCatchment+nbAddData) # Plot Rains yTitles = _("Discharge [m³/s]") writeFileDef = os.path.join(writeFile, "OutFlow_" + graph_title.replace(".","")) if(Measures is not None): if(envelop): ph.plot_hydro(1+nbAddData,y1,rain=rain,x_title=x_title, y_titles=yTitles, beginDate=beginDate+beginDateAddData,endDate=endDate+endDateAddData,dt=dt+dtAddData,graph_title=graph_title, \ y_labels=yLabels,rangeData=xRange,y_data_range=yRange,myColors=catchColors+catchColorsAddData,typeOfTraits=myTraits,writeFile=writeFileDef,\ measures=myMeasure,beginDateMeasure=Measures.dateBegin+tzDelta, endDateMeasure=Measures.dateEnd+tzDelta, dtMeasure=Measures.deltaT, surfaceMeasure=surfaceMeasure, addMeasfInTab=addMeasfInTab,\ upperPlot=upperPlot,nbAddPlot=nbAddRain,z=z,y_labelAddPlot=y_labelAddRain,deltaMajorTicks=86400/2.0,deltaMinorTicks=3600,\ y_envelop=[ymin,ymax],addTable=addTable, allSurfaces=allSurfaces) else: ph.plot_hydro(nbCatchment+nbAddData,y1,rain=rain,x_title=x_title, y_titles=yTitles, beginDate=beginDate+beginDateAddData,endDate=endDate+endDateAddData,dt=dt+dtAddData,graph_title=graph_title, \ y_labels=yLabels,rangeData=xRange,y_data_range=yRange,myColors=catchColors+catchColorsAddData,typeOfTraits=myTraits,writeFile=writeFileDef,\ measures=myMeasure,beginDateMeasure=Measures.dateBegin+tzDelta, endDateMeasure=Measures.dateEnd+tzDelta, dtMeasure=Measures.deltaT, surfaceMeasure=surfaceMeasure, addMeasfInTab=addMeasfInTab,\ upperPlot=upperPlot,nbAddPlot=nbAddRain,z=z,y_labelAddPlot=y_labelAddRain,deltaMajorTicks=86400/2.0,deltaMinorTicks=3600,addTable=addTable,allSurfaces=allSurfaces) else: if(envelop): ph.plot_hydro(1+nbAddData,y1,rain=rain,x_title=x_title, y_titles=yTitles, beginDate=beginDate+beginDateAddData,endDate=endDate+endDateAddData,dt=dt+dtAddData,graph_title=graph_title, \ y_labels=yLabels,rangeData=xRange,y_data_range=yRange,myColors=catchColors+catchColorsAddData,typeOfTraits=myTraits,writeFile=writeFileDef,\ upperPlot=upperPlot,nbAddPlot=nbAddRain,z=z,y_labelAddPlot=y_labelAddRain,deltaMajorTicks=86400/2.0,deltaMinorTicks=3600,\ y_envelop=[ymin,ymax],addTable=addTable,allSurfaces=allSurfaces) else: ph.plot_hydro(nbCatchment+nbAddData,y1,rain=rain,x_title=x_title, y_titles=yTitles, beginDate=beginDate+beginDateAddData,endDate=endDate+endDateAddData,dt=dt+dtAddData,graph_title=graph_title, \ y_labels=yLabels,rangeData=xRange,y_data_range=yRange,myColors=catchColors+catchColorsAddData,typeOfTraits=myTraits,writeFile=writeFileDef,\ upperPlot=upperPlot,nbAddPlot=nbAddRain,z=z,y_labelAddPlot=y_labelAddRain,deltaMajorTicks=86400/2.0,deltaMinorTicks=3600,addTable=addTable,allSurfaces=allSurfaces) if(show): plt.show()
[docs] def outlet_all_RB_height_same_topo(self, plotDict={}, show=True, envelop=False, displayMax=False): """ Plot the heights of all rentention basin. The measures given should also be heights in [m]. This function considers that the topology in all the catchments is the same. """ if(not("Time Zone Plot" in plotDict["General Parameters"])): tzPlot = 0 tzDelta = datetime.timedelta(hours=0) else: tzPlot = plotDict["General Parameters"]["Time Zone Plot"] tzDelta = datetime.timedelta(hours=tzPlot) if(not("Date Begin" in plotDict["General Parameters"])): beginDate = [] for idCatch in self.myCatchments: beginDate.append(self.myCatchments[idCatch]['Object'].dateBegin+tzDelta) else: beginDate = plotDict["General Parameters"]["Date Begin"]+tzDelta if(not("Date End" in plotDict["General Parameters"])): endDate = [] for idCatch in self.myCatchments: endDate.append(self.myCatchments[idCatch]['Object'].dateEnd+tzDelta) else: endDate = plotDict["General Parameters"]["Date End"]+tzDelta if(not("Dt" in plotDict["General Parameters"])): dt=[] for idCatch in self.myCatchments: dt.append(self.myCatchments[idCatch]['Object'].deltaT) else: dt = plotDict["General Parameters"]["Dt"] # if(envelop and not("Ref Name" in plotDict["General Parameters"])): # refName = "Catchment 1" # else: # refName = plotDict["General Parameters"]["Ref Name"] RBId = [] # contains all the names of the subbasin RBNames = {} for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] for id in curCatch.retentionBasinDict: curBasin = curCatch.retentionBasinDict[id] if(not(id in RBId) and (curBasin.name in plotDict)): RBId.append(id) RBNames[id] = (curBasin.name) nbCatchment = len(self.myCatchments) x_title = "Dates " + "(GMT+" + str(tzPlot) + ")" if(not("Catchment colors" in plotDict["General Parameters"])): tmpColors = np.random.rand(nbCatchment) catchColors = [] for icolor in range(len(tmpColors)): catchColors.append(tmpColors[icolor]) else: catchColors = [] catchColors = plotDict["General Parameters"]["Catchment colors"] if(len(catchColors)!=nbCatchment): print("ERROR: the number of catchments is not the same as the number of colors given") sys.exit() if("Catchment traits" in plotDict["General Parameters"]): myTraits = [] myTraits = plotDict["General Parameters"]["Catchment traits"] if(len(myTraits)!=nbCatchment): print("ERROR: the number of catchments is not the same as the number of colors given") sys.exit() for id in RBId: y1 = [] yLabels = [] myTraits = [] timeDelay = [] for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] myH = np.zeros(len(curCatch.retentionBasinDict[id].filledVolume)) for ii in range(len(curCatch.retentionBasinDict[id].filledVolume)): myH[ii] = curCatch.retentionBasinDict[id].volume_to_h(curCatch.retentionBasinDict[id].filledVolume[ii]) y1.append(myH[:]) yLabels.append(curCatch.name) timeDelay.append(datetime.timedelta(seconds=curCatch.retentionBasinDict[id].timeDelay)) beginDateRB = [] endDateRB = [] for ii in range(nbCatchment): # beginDateRB.append(beginDate[ii] - timeDelay[ii]) # A ameliorer!!! # endDateRB.append(endDate[ii] - timeDelay[ii]) beginDateRB.append(beginDate[ii]) # A ameliorer!!! endDateRB.append(endDate[ii]) if("Station Name" in plotDict[RBNames[id]]): graph_title = plotDict[RBNames[id]]["Station Name"] else: graph_title = curCatch.retentionBasinDict[id].name if(not("Catchment traits" in plotDict["General Parameters"])): myTraits.append('-') nbAddData = 0 beginDateAddData = [] endDateAddData = [] dtAddData = [] catchColorsAddData = [] if("Add Data" in plotDict[RBNames[id]]): nbAddData = len(plotDict[RBNames[id]]["Add Data"]["Data"]) for i in range(nbAddData): y1.append(plotDict[RBNames[id]]["Add Data"]["Data"][i][:]) if("Date Begin" in plotDict[RBNames[id]]["Add Data"]): beginDateAddData.append(plotDict[RBNames[id]]["Add Data"]["Date Begin"][i]+tzDelta) elif("Date Begin" in plotDict["General Parameters"]["Add Data"]): beginDateAddData.append(plotDict["General Parameters"]["Add Data"]["Date Begin"][i]+tzDelta) if("Date End" in plotDict[RBNames[id]]["Add Data"]): endDateAddData.append(plotDict[RBNames[id]]["Add Data"]["Date End"][i]+tzDelta) elif("Date End" in plotDict["General Parameters"]["Add Data"]): endDateAddData.append(plotDict["General Parameters"]["Add Data"]["Date End"][i]+tzDelta) if("Dt" in plotDict[RBNames[id]]["Add Data"]): dtAddData.append(plotDict[RBNames[id]]["Add Data"]["Dt"][i]) elif("Dt" in plotDict["General Parameters"]["Add Data"]): dtAddData.append(plotDict["General Parameters"]["Add Data"]["Dt"][i]) if("Labels" in plotDict[RBNames[id]]["Add Data"]): yLabels.append(plotDict[RBNames[id]]["Add Data"]["Labels"][i]) elif("Labels" in plotDict["General Parameters"]["Add Data"]): yLabels.append(plotDict["General Parameters"]["Add Data"]["Labels"][i]) if("Colors" in plotDict[RBNames[id]]["Add Data"]): catchColorsAddData.append(plotDict[RBNames[id]]["Add Data"]["Colors"][i]) elif("Colors" in plotDict["General Parameters"]["Add Data"]): catchColorsAddData.append(plotDict["General Parameters"]["Add Data"]["Colors"][i]) if("Type of Traits" in plotDict[RBNames[id]]["Add Data"]): myTraits.append(plotDict[RBNames[id]]["Add Data"]["Type of Traits"][i]) elif("Type of Traits" in plotDict["General Parameters"]["Add Data"]): myTraits.append(plotDict["General Parameters"]["Add Data"]["Type of Traits"][i]) if("Y Range" in plotDict[RBNames[id]]): yRange = plotDict[RBNames[id]]["Y Range"] elif("Y Range" in plotDict["General Parameters"]): yRange = plotDict["General Parameters"]["Y Range"] else: yRange = [] if("Measures" in plotDict[RBNames[id]]): Measures = plotDict[RBNames[id]]["Measures"] myMeasure = Measures.myHydro yLabels.append(_("Measures")) catchColorsAddData.append('k') myTraits.append('--') else: myMeasure = [] Measures = None if("X Range" in plotDict["General Parameters"]): xRange = plotDict["General Parameters"]["X Range"] else: xRange = [] if("Writing Directory" in plotDict["General Parameters"]): writeFile = plotDict["General Parameters"]["Writing Directory"] else: writeFile = "" # Plot Rains yTitles = "Hauteurs d'eau [m]" writeFileDef = writeFile + "_H_" + graph_title.replace(".","") if(Measures is not None): ph.plot_hydro(nbCatchment+nbAddData, y1,x_title=x_title, y_titles=yTitles, beginDate=beginDateRB+beginDateAddData,endDate=endDateRB+endDateAddData,dt=dt+dtAddData,graph_title=graph_title, \ y_labels=yLabels,rangeData=xRange,y_data_range=yRange,myColors=catchColors+catchColorsAddData,typeOfTraits=myTraits,writeFile=writeFileDef,\ measures=myMeasure,beginDateMeasure=Measures.dateBegin+tzDelta, endDateMeasure=Measures.dateEnd+tzDelta,dtMeasure=Measures.deltaT,deltaMajorTicks=86400/2.0,deltaMinorTicks=3600) else: ph.plot_hydro(nbCatchment+nbAddData, y1,x_title=x_title, y_titles=yTitles, beginDate=beginDateRB+beginDateAddData,endDate=endDateRB+endDateAddData,dt=dt+dtAddData,graph_title=graph_title, \ y_labels=yLabels,rangeData=xRange,y_data_range=yRange,myColors=catchColors+catchColorsAddData,typeOfTraits=myTraits,writeFile=writeFileDef,deltaMajorTicks=86400/2.0,deltaMinorTicks=3600) if(show): plt.show()
[docs] def outlet_all_RB_hydro_same_topo(self, plotDict={}, show=True, envelop=False, refModuleName =""): # Load General characteristics of the dictionnary if(not("Time Zone Plot" in plotDict["General Parameters"])): tzPlot = 0 tzDelta = datetime.timedelta(hours=0) else: tzPlot = plotDict["General Parameters"]["Time Zone Plot"] tzDelta = datetime.timedelta(hours=tzPlot) #****************************************************** #FIXME : is it correct ?? if(envelop and not("Ref Name" in plotDict["General Parameters"])): refName = "Catchment 1" else: refName = plotDict["General Parameters"]["Ref Name"] #****************************************************** if(not("Add Table" in plotDict["General Parameters"])): addTable = False else: addTable = plotDict["General Parameters"]["Add Table"] if(not("Date Begin" in plotDict["General Parameters"])): beginDate = [] for idCatch in self.myCatchments: if(not(envelop)): beginDate.append(self.myCatchments[idCatch]['Object'].dateBegin+tzDelta) elif(idCatch==refName): beginDate.append(self.myCatchments[idCatch]['Object'].dateBegin+tzDelta) else: beginDate = plotDict["General Parameters"]["Date Begin"]+tzDelta if(not("Date End" in plotDict["General Parameters"])): endDate = [] for idCatch in self.myCatchments: if(not(envelop)): endDate.append(self.myCatchments[idCatch]['Object'].dateEnd+tzDelta) elif(idCatch==refName): endDate.append(self.myCatchments[idCatch]['Object'].dateEnd+tzDelta) else: endDate = plotDict["General Parameters"]["Date End"]+tzDelta if(not("Dt" in plotDict["General Parameters"])): dt=[] for idCatch in self.myCatchments: if(not(envelop)): dt.append(self.myCatchments[idCatch]['Object'].deltaT) elif(idCatch==refName): dt.append(self.myCatchments[idCatch]['Object'].deltaT) else: dt = plotDict["General Parameters"]["Dt"] # All the useful junctions will be listed so that all the elements won't be analysed anymore RBId = [] # contains all the names of the subbasin RBNames = {} for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] for id in curCatch.retentionBasinDict: curBasin = curCatch.retentionBasinDict[id] if(not(id in RBId) and (curBasin.name in plotDict)): RBId.append(id) RBNames[id] = (curBasin.name) nbCatchment = len(self.myCatchments) x_title = "Dates " + "(GMT+"+ str(tzPlot) + ")" if(not("Catchment colors" in plotDict["General Parameters"])): tmpColors = np.random.rand(nbCatchment) catchColors = [] for icolor in range(len(tmpColors)): if(not(envelop)): catchColors.append(tmpColors[icolor]) elif(idCatch==refName): catchColors.append(tmpColors[icolor]) else: catchColors = [] catchColors = plotDict["General Parameters"]["Catchment colors"] if("Catchment traits" in plotDict["General Parameters"]): myTraits = [] myTraits = plotDict["General Parameters"]["Catchment traits"] if(len(myTraits)!=nbCatchment): print("ERROR: the number of catchments is not the same as the number of colors given") sys.exit() if(len(catchColors)!=nbCatchment and not(envelop)): print("ERROR: the number of catchments is not the same as the number of colors given") sys.exit() if(refModuleName!=""): print("To Do !!!!") sys.exit() for id in RBId: y1 = [] yLabels = [] if(not("Catchment traits" in plotDict["General Parameters"])): myTraits = [] rain = [] z = [] nbAddRain=0 y_labelAddRain = [] upperPlot = False allSurfaces = [] mySurf = 0.0 if(envelop): firstKey = list(self.myCatchments.items())[0][0] nbTElements = len(self.myCatchments[firstKey]['Object'].subBasinDict[id].get_outFlow_noDelay()) allHydros = np.zeros((nbTElements,nbCatchment)) counter = 0 for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] tmp = curCatch.retentionBasinDict["J18"].directFluxInRB tmpHydro = np.zeros(len(tmp)) index = math.floor(curCatch.retentionBasinDict["J18"].timeDelay/curCatch.retentionBasinDict["J18"].deltaT) if(index==0): tmpHydro = tmp elif(index<len(tmp)): tmpHydro[:-index] = tmp[index:] else: print("ERROR: the simulation time is not long enough for this subbasin to be taken into account") sys.exit() allHydros[:,counter] = tmpHydro[:] if(idCatch==refName): y1.append(tmpHydro[:]) yLabels.append(curCatch.name) if("Station Name" in plotDict[basinNames[id]]): graph_title = plotDict[basinNames[id]]["Station Name"] else: graph_title = curCatch.subBasinDict[id].name myTraits.append('-') ymax = np.amax(allHydros, axis=1) ymin = np.amin(allHydros, axis=1) else: for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] if(curCatch.myModel==cst.tom_UH): tmp = curCatch.retentionBasinDict[id].directFluxInRB tmpHydro = np.zeros(len(tmp)) index = math.floor(curCatch.retentionBasinDict[id].timeDelay/curCatch.retentionBasinDict[id].deltaT) if(index==0): tmpHydro = tmp elif(index<len(tmp)): tmpHydro[:-index] = tmp[index:] else: print("ERROR: the simulation time is not long enough for this subbasin to be taken into account") sys.exit() y1.append(tmpHydro[:]) elif(curCatch.myModel==cst.tom_2layers_linIF or curCatch.myModel==cst.tom_2layers_UH): tmp = curCatch.retentionBasinDict[id].directFluxInRB tmpHydro = np.zeros(len(tmp)) index = math.floor(curCatch.retentionBasinDict[id].timeDelay/curCatch.retentionBasinDict[id].deltaT) if(index==0): tmpHydro = tmp elif(index<len(tmp)): tmpHydro[:-index] = tmp[index:] else: print("ERROR: the simulation time is not long enough for this subbasin to be taken into account") sys.exit() y1.append(tmpHydro[:]) yLabels.append(curCatch.name) if("Station Name" in plotDict[RBNames[id]]): graph_title = plotDict[RBNames[id]]["Station Name"] else: graph_title = curCatch.retentionBasinDict[id].name if(not("Catchment traits" in plotDict["General Parameters"])): myTraits.append('-') nbAddData = 0 beginDateAddData = [] endDateAddData = [] dtAddData = [] catchColorsAddData = [] if("Add Data" in plotDict[RBNames[id]]): nbAddData = len(plotDict[RBNames[id]]["Add Data"]["Data"]) for i in range(nbAddData): y1.append(plotDict[RBNames[id]]["Add Data"]["Data"][i][:]) if("Date Begin" in plotDict[RBNames[id]]["Add Data"]): beginDateAddData.append(plotDict[RBNames[id]]["Add Data"]["Date Begin"][i]+tzDelta) elif("Date Begin" in plotDict["General Parameters"]["Add Data"]): beginDateAddData.append(plotDict["General Parameters"]["Add Data"]["Date Begin"][i]+tzDelta) if("Date End" in plotDict[RBNames[id]]["Add Data"]): endDateAddData.append(plotDict[RBNames[id]]["Add Data"]["Date End"][i]+tzDelta) elif("Date End" in plotDict["General Parameters"]["Add Data"]): endDateAddData.append(plotDict["General Parameters"]["Add Data"]["Date End"][i]+tzDelta) if("Dt" in plotDict[RBNames[id]]["Add Data"]): dtAddData.append(plotDict[RBNames[id]]["Add Data"]["Dt"][i]) elif("Dt" in plotDict["General Parameters"]["Add Data"]): dtAddData.append(plotDict["General Parameters"]["Add Data"]["Dt"][i]) if("Labels" in plotDict[RBNames[id]]["Add Data"]): yLabels.append(plotDict[RBNames[id]]["Add Data"]["Labels"][i]) elif("Labels" in plotDict["General Parameters"]["Add Data"]): yLabels.append(plotDict["General Parameters"]["Add Data"]["Labels"][i]) if("Colors" in plotDict[RBNames[id]]["Add Data"]): catchColorsAddData.append(plotDict[RBNames[id]]["Add Data"]["Colors"][i]) elif("Colors" in plotDict["General Parameters"]["Add Data"]): catchColorsAddData.append(plotDict["General Parameters"]["Add Data"]["Colors"][i]) if("Type of Traits" in plotDict[RBNames[id]]["Add Data"]): myTraits.append(plotDict[RBNames[id]]["Add Data"]["Type of Traits"][i]) elif("Type of Traits" in plotDict["General Parameters"]["Add Data"]): myTraits.append(plotDict["General Parameters"]["Add Data"]["Type of Traits"][i]) if("Measures" in plotDict[RBNames[id]]): Measures = plotDict[RBNames[id]]["Measures"] myMeasure = Measures.myHydro yLabels.append(_("Measures")) catchColorsAddData.append('k') myTraits.append('-') if(Measures.surfaceDrainedHydro>0.0): surfaceMeasure=Measures.surfaceDrainedHydro elif(mySurf!=0.0): surfaceMeasure = mySurf else: myMeasure = [] Measures = None if("X Range" in plotDict["General Parameters"]): xRange = plotDict["General Parameters"]["X Range"] else: xRange = [] if("Y Range" in plotDict["General Parameters"]): yRange = plotDict["General Parameters"]["Y Range"] else: yRange = [] if("Writing Directory" in plotDict["General Parameters"]): writeFile = plotDict["General Parameters"]["Writing Directory"] else: writeFile = "" if("Add Measure in table" in plotDict["General Parameters"]): addMeasfInTab = plotDict["General Parameters"]["Add Measure in table"] else: addMeasfInTab = False if(mySurf!=0.0): allSurfaces = [mySurf]*(nbCatchment+nbAddData) # Plot Rains yTitles = _("Discharge [m³/s]") writeFileDef = writeFile + "OutFlow_" + graph_title.replace(".","") if(Measures is not None): if(envelop): ph.plot_hydro(1+nbAddData,y1,rain=rain,x_title=x_title, y_titles=yTitles, beginDate=beginDate+beginDateAddData,endDate=endDate+endDateAddData,dt=dt+dtAddData,graph_title=graph_title, \ y_labels=yLabels,rangeData=xRange,y_data_range=yRange,myColors=catchColors+catchColorsAddData,typeOfTraits=myTraits,writeFile=writeFileDef,\ measures=myMeasure,beginDateMeasure=Measures.dateBegin+tzDelta, endDateMeasure=Measures.dateEnd+tzDelta, dtMeasure=Measures.deltaT, addMeasfInTab=False,\ upperPlot=upperPlot,nbAddPlot=nbAddRain,z=z,y_labelAddPlot=y_labelAddRain,deltaMajorTicks=86400/2.0,deltaMinorTicks=3600,\ y_envelop=[ymin,ymax],addTable=addTable, allSurfaces=allSurfaces) else: ph.plot_hydro(nbCatchment+nbAddData,y1,rain=rain,x_title=x_title, y_titles=yTitles, beginDate=beginDate+beginDateAddData,endDate=endDate+endDateAddData,dt=dt+dtAddData,graph_title=graph_title, \ y_labels=yLabels,rangeData=xRange,y_data_range=yRange,myColors=catchColors+catchColorsAddData,typeOfTraits=myTraits,writeFile=writeFileDef,\ measures=myMeasure,beginDateMeasure=Measures.dateBegin+tzDelta, endDateMeasure=Measures.dateEnd+tzDelta, dtMeasure=Measures.deltaT, addMeasfInTab=False,\ upperPlot=upperPlot,nbAddPlot=nbAddRain,z=z,y_labelAddPlot=y_labelAddRain,deltaMajorTicks=86400/2.0,deltaMinorTicks=3600,addTable=addTable,allSurfaces=allSurfaces) else: if(envelop): ph.plot_hydro(1+nbAddData,y1,rain=rain,x_title=x_title, y_titles=yTitles, beginDate=beginDate+beginDateAddData,endDate=endDate+endDateAddData,dt=dt+dtAddData,graph_title=graph_title, \ y_labels=yLabels,rangeData=xRange,y_data_range=yRange,myColors=catchColors+catchColorsAddData,typeOfTraits=myTraits,writeFile=writeFileDef,\ upperPlot=upperPlot,nbAddPlot=nbAddRain,z=z,y_labelAddPlot=y_labelAddRain,deltaMajorTicks=86400/2.0,deltaMinorTicks=3600,\ y_envelop=[ymin,ymax],addTable=addTable,allSurfaces=allSurfaces) else: ph.plot_hydro(nbCatchment+nbAddData,y1,rain=rain,x_title=x_title, y_titles=yTitles, beginDate=beginDate+beginDateAddData,endDate=endDate+endDateAddData,dt=dt+dtAddData,graph_title=graph_title, \ y_labels=yLabels,rangeData=xRange,y_data_range=yRange,myColors=catchColors+catchColorsAddData,typeOfTraits=myTraits,writeFile=writeFileDef,\ upperPlot=upperPlot,nbAddPlot=nbAddRain,z=z,y_labelAddPlot=y_labelAddRain,deltaMajorTicks=86400/2.0,deltaMinorTicks=3600,addTable=addTable,allSurfaces=allSurfaces) if(show): plt.show()
[docs] def plot_ind_subBasins(self, withEvap=False, withCt=False, selection_by_iD=[], writeDir=""): """ Plot subbasins selected by id from all catchment on individual graphs. Not regrouped yet on a sigle graph. """ for idCatch in self.myCatchments: graph_title = self.myCatchments[idCatch]['Object'].name + " :" writeFile = writeDir + self.myCatchments[idCatch]['Object'].name self.myCatchments[idCatch]['Object'].plot_allSub(withEvap=withEvap, withCt=withCt, selection_by_iD=selection_by_iD, graph_title=graph_title, show=False, writeDir=writeFile)
[docs] def plot_all_diff_cumulRain_with_lagtime(self, interval, selection_by_iD=[], writeDir=""): for idCatch in self.myCatchments: curCatch:Catchment = self.myCatchments[idCatch]['Object'] graph_title = curCatch.name + " :" curCatch.plot_all_diff_cumulRain_with_lagtime(interval, lagTime=0.0, selection_by_iD=selection_by_iD, graph_title=graph_title, show=False, writeDir=writeDir)
# FIXME : TODO complete that function that plot a comparison of the cumulated volumes # def plot_all_cumulRain(self, selection_by_iD:list=[], writeDir:str="", show:bool=True): # times = [] # for idCatch in self.myCatchments: # curCatch:Catchment = self.myCatchments[idCatch]['Object'] # graph_title = curCatch.name + " :" # cur_t, curVol = curCatch.get_all_cumulRain(selection_by_iD) # for # plt.figure() # plt.title(graph_title) # for i in range(len(curVol)): # plt.plot(cur_t, curVol[i]) # plt.savefig(os.path.join(writeDir, graph_title)) # if show : plt.show()
[docs] def save_all_ExcelFile_Vesdre_simul2D(self, writeDir=""): if(writeDir==""): writeDir = self.workingDir for idCatch in self.myCatchments: curCatch = self.myCatchments[idCatch]['Object'] fileName = curCatch.name.replace("/","_") + "_Data_simul2D.xlsx" fileName = fileName.replace(" ", "_") curCatch.save_ExcelFile_Vesdre_simul2D(fileName=fileName, directory=writeDir)
[docs] def construct_default_plotDict(self, mycolors=[]): nbCatchments = len(self.myCatchments) allColors = ["b","r","g","m","tab:pink","c","tab:gray","tab:brown"] if(mycolors==[]): if(nbCatchments>len(allColors)): print("ERROR : the number of comparison is greater than the number of saved colors. Please execute this fonction again and specify your own colors in 'mycolors='") sys.exit() else: mycolors = allColors[:nbCatchments] self.plotDict["General Parameters"] = {} self.plotDict["General Parameters"]["Writing Directory"] = self.workingDir self.plotDict["General Parameters"]["Catchment colors"] = mycolors self.plotDict["General Parameters"]["X Range"] = [] self.plotDict["General Parameters"]["Y Range"] = [] self.plotDict["General Parameters"]["Same rain"] = True self.plotDict["General Parameters"]["Display rain"] = True self.plotDict["General Parameters"]["Time Zone Plot"] = 0 # GMT+0 self.plotDict["General Parameters"]["Display rain"] = False self.plotDict["General Parameters"]["Add Measure in table"] = False
[docs] def add_stations_to_plotDict(self, stationKeys:list=[], stationNames:list=[], measures:list=[]): nbStations = len(stationKeys) for i in range(nbStations): element = stationKeys[i] self.plotDict[element] = {} if(stationNames!=[]): if(stationNames[i]!=""): name = stationNames[i] self.plotDict[element]["Station Name"] = name if(measures!=[]): if(measures[i]is not None): self.plotDict[element]["Measures"] = measures[i]
[docs] def add_station_to_plotDict(self, stationKey:str, stationName:str="",measure=None, xRange=None, yRange=None): if(not(stationKey in self.plotDict)): self.plotDict[stationKey] = {} if(measure is not None): self.plotDict[stationKey]["Measures"] = measure if(xRange is not None): self.plotDict[stationKey]["X Range"] = xRange if(yRange is not None): self.plotDict[stationKey]["Y Range"] = yRange
[docs] def add_data_in_station(self, stationKey:str, data:list=[], datesBegin:list=[], datesEnd:list=[], deltaT:list=[], labels:list=[], colors:list=[], typeOfTraits:list=[], addDataObj:list=[], addDataUnits:str="m3/s"): # If the name of the station is not already in the dictionnary we created it. if(not(stationKey in self.plotDict)): self.add_station_to_plotDict(stationKey) if(not("Add Data" in self.plotDict["General Parameters"])): self.plotDict["General Parameters"]["Add Data"] = {} self.plotDict[stationKey]["Add Data"] = {} if(addDataObj==[]): if(data==[]): print("WARNING: no additionnal data given in this station!") del self.plotDict[stationKey]["Add Data"] return self.plotDict[stationKey]["Add Data"]["Data"] = data if(datesBegin!=[]): self.plotDict[stationKey]["Add Data"]["Date Begin"] = datesBegin if(datesEnd!=[]): self.plotDict[stationKey]["Add Data"]["Date End"] = datesEnd if(deltaT!=[]): self.plotDict[stationKey]["Add Data"]["Dt"] = deltaT else: self.add_data_in_station_from_Subbasin(stationKey, addDataObj, addDataUnits=addDataUnits) if(labels!=[]): self.plotDict[stationKey]["Add Data"]["Labels"] = labels if(colors!=[]): self.plotDict[stationKey]["Add Data"]["Colors"] = colors if(typeOfTraits!=[]): self.plotDict[stationKey]["Add Data"]["Type of Traits"] = typeOfTraits
[docs] def add_data_in_station_from_Subbasin(self, stationKey:str, myAddDataObj:list, addDataUnits:str="m3/s"): self.plotDict[stationKey]["Add Data"]["Data"] = [] self.plotDict[stationKey]["Add Data"]["Date Begin"] = [] self.plotDict[stationKey]["Add Data"]["Date End"] = [] self.plotDict[stationKey]["Add Data"]["Dt"] = [] for element in myAddDataObj: if addDataUnits == "mm/h": self.plotDict[stationKey]["Add Data"]["Data"].append(element.myHydro[:]) else: self.plotDict[stationKey]["Add Data"]["Data"].append(element.myHydro[:]*element.surfaceDrained/3.6) self.plotDict[stationKey]["Add Data"]["Date Begin"].append(element.dateBegin) self.plotDict[stationKey]["Add Data"]["Date End"].append(element.dateEnd) self.plotDict[stationKey]["Add Data"]["Dt"].append(element.deltaT)
[docs] def set_general_plot(self, xRange=None, yRange=None, timeZone=None,sameRain=None, displayRain=None, addTable=None, addMeasureTable=None, refName=None, writingDir=""): if(xRange is not None): self.plotDict["General Parameters"]["X Range"] = xRange if(yRange is not None): self.plotDict["General Parameters"]["Y Range"] = yRange if(timeZone is not None): self.plotDict["General Parameters"]["Time Zone Plot"] = timeZone if(sameRain is not None): self.plotDict["General Parameters"]["Same rain"] = sameRain if(displayRain is not None): self.plotDict["General Parameters"]["Display rain"] = displayRain if(addTable is not None): self.plotDict["General Parameters"]["Add Table"] = addTable if(addMeasureTable is not None): self.plotDict["General Parameters"]["Add Measure in table"] = addMeasureTable if(refName is not None): self.plotDict["General Parameters"]["Ref Name"] = refName if(writingDir!=""): self.plotDict["General Parameters"]["Writing Directory"] = writingDir
[docs] def set_station_plot(self, stationKey, xRange=None, yRange=None, measure=None): if(xRange is not None): self.plotDict[stationKey]["X Range"] = xRange if(yRange is not None): self.plotDict[stationKey]["Y Range"] = yRange if(measure is not None): self.plotDict[stationKey]["Measures"] = measure
[docs] def reset_addData(self, stationKey, backupAddData=None): backupAddData = self.plotDict[stationKey]["Add Data"] # self.plotDict[stationKey]["Add Data"] = {} del self.plotDict[stationKey]["Add Data"]
[docs] def plot_Nash_and_peak(self, stationKey:list[str, int], measures:list[SubBasin], intervals:list=[]): assert len(stationKey) == len(measures) all_ns = {stationKey[i]: [ self.myCatchments[el]["Object"].get_sub_Nash(measures[i], stationKey[i], intervals) for el in self.myCatchments ] for i in range(len(stationKey)) } print(all_ns) all_peaks = [ [ self.myCatchments[el]["Object"].get_sub_peak(stationKey[i], intervals) for el in self.myCatchments ] for i in range(len(stationKey)) ] print(all_peaks) meas_peak = [ measures[i].get_peak(intervals) for i in range(len(stationKey)) ] # The following lines is take the peak difference between simulation and measurements -> Display 0.0 if the measurement is 0.0 isZero = np.array(meas_peak)==0 notZero = np.array(meas_peak)!=0 peak_prop = {stationKey[i]: [ list( (np.array(el)-np.array(meas_peak[i]))/(np.array(meas_peak[i])+isZero[i]) *notZero[i] ) for el in all_peaks[i] ] for i in range(len(stationKey)) } print(meas_peak) print(peak_prop) all_data = [all_ns, peak_prop] # Define all colors colors_Nash = {} for key, value in all_ns.items(): colors_Nash[key] = [] for i_model in range(len(value)): colors_Nash[key].append([]) for j in range(len(value[i_model])): curNS = value[i_model][j] if curNS<0.0: colors_Nash[key][i_model].append("r") continue elif curNS<0.4: colors_Nash[key][i_model].append("tab:orange") continue elif curNS<0.6: colors_Nash[key][i_model].append("tab:olive") continue elif curNS<0.8: colors_Nash[key][i_model].append("tab:green") continue else: colors_Nash[key][i_model].append("g") colors_peaks = {} for key, value in peak_prop.items(): colors_peaks[key] = [] for i_model in range(len(value)): colors_peaks[key].append([]) for j in range(len(value[i_model])): curP = value[i_model][j] if curP<0.0: colors_peaks[key][i_model].append("r") else: colors_peaks[key][i_model].append("b") all_colors = [colors_Nash, colors_peaks] ## Sort all station in a particular order sorted_keys = list(all_data[0].keys()) ## Str of dates all_names = ["\n - \n".join([cdate[0].strftime("%d/%m/%Y"), cdate[1].strftime("%d/%m/%Y")]) for cdate in intervals] ## Plot nb_stations = len(stationKey) type_of_model = [self.myCatchments[el]["Title"] for el in self.myCatchments] type_of_data = ["Nash", r"$ \frac{Q^{s}_{max}-Q^{m}_{max}}{Q^{m}_{max}} $ "] type_of_data_names = ["Nash", "Exceedance"] ph.bar_Nash_n_other(all_data, all_colors, nb_x=len(intervals), nb_data=len(type_of_model), nb_lines=nb_stations, y_titles=type_of_data, x_titles=all_names, nameModel=type_of_model, line_names=sorted_keys, toShow=False) # ========= # ========= # Plot tables - 2nd version with the table instead of bars all_ns= { cur_catch["Title"]: np.array([list(cur_catch["Object"].get_sub_Nash(measures[i], stationKey[i], intervals)) for i in range(len(stationKey))]) for cur_catch in self.myCatchments.values() } all_peaks = {cur_catch["Title"]: np.array([cur_catch["Object"].get_sub_peak(stationKey[i], intervals) for i in range(len(stationKey))]) for cur_catch in self.myCatchments.values() } print(all_peaks) meas_peak = np.array([ measures[i].get_peak(intervals) for i in range(len(stationKey)) ]) # The following lines is take the peak difference between simulation and measurements -> Display 0.0 if the measurement is 0.0 isZero = (meas_peak==0) notZero = (meas_peak!=0) peak_prop = { cur_model: (value-meas_peak)/(meas_peak+isZero) *notZero for cur_model, value in all_peaks.items() } # Concatenate all data all_data = [all_ns, peak_prop] for data, name_of_data in zip(all_data, type_of_data_names): for cur_model, cur_data in data.items(): file_name = os.path.join(self.workingDir, name_of_data+"_"+cur_model)+".png" # cur_title = cur_model + ": " + name_of_data ph.table_Nash_n_other(cur_data, name_of_data, row_names=sorted_keys, column_names=all_names, writeFile=file_name, toShow=False) plt.show()