Source code for wolfhece.hydrology.SubBasin

"""
Author: HECE - University of Liege, Pierre Archambeau, Christophe Dessers
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import sys
import numpy as np
import csv
import os
import logging

from numpy.ma.core import append, shape
from numpy.testing._private.utils import suppress_warnings
# import constant as cst
import math
import time as time_mod
import matplotlib.pyplot as plt
from matplotlib.dates import DayLocator, HourLocator, DateFormatter, drange, MicrosecondLocator
import scipy.stats as stats             #pip install Scipy
import datetime                         # module which contains objects treating dates
from matplotlib.font_manager import FontProperties
from dbfread import DBF
import ctypes as ct
import pandas as pd
from pathlib import Path

from ..PyTranslate import _
from . import plot_hydrology as ph
from . import data_treatment as datt
from . import read as rd
from . import constant as cst

from ..wolf_array import *
from ..PyParams import*

## TO DO:
# - add the arguments _dateBegin, _dateEnd and _deltaT as optional
#       -> In this way, the init of these variable can be done by reading the rain, evap or outflow file if not already init

[docs] class SubBasin:
[docs] name:str
[docs] iD:int
[docs] iDSorted:int
[docs] x:float
[docs] y:float
[docs] haveInlets:bool
[docs] alreadyUsed:bool
[docs] isLeveled:bool
[docs] isActivated:bool
## Time array containing all the timestamps # @var time timestamps array of dimension equal to rain and evap (or 1 element more than myHydro so far (VHM but not UH)).
[docs] time:np.ndarray
[docs] dateBegin:datetime.datetime # Must be in GMT+0 !!!
[docs] dateEnd:datetime.datetime # Must be in GMT+0 !!!
[docs] deltaT:datetime.timedelta
# @var timezone in GMT saved to converted all computed or read data so that all data are expressed in GMT+0
[docs] tz:float
[docs] model:int
[docs] treated:bool
[docs] myLevel:bool
[docs] fileNameRead:str
[docs] fileNameWrite:str # FIXME TO DO !!!!!!!!
## Dictionary containing all the objects Catchment: ## @var myHydro an array whose dimensions depends on the model chosen: # - Unit hydrographs and Linear reservoir models : $1\times n$ elements # - VHM model : $3\times n$ elements: # myHydro[i][0] : overland flow # myHydro[i][1] : interflow # myHydro[i][2] : baseflow # @unit $[\si{m}^3/\si{s}}]$
[docs] myHydro:np.ndarray # [m^3/s] Hydro of the subbasin only -> UPDATE: [mm/h]!!!!!
[docs] intletsObj:dict
[docs] inlets:np.ndarray
[docs] inletsRaw:np.ndarray
[docs] downstreamObj:dict
## @var outFlow # Hydro of the hydrological subbasin. Combined with the potentiel upstream hydros. # Version : # - < 2023.0 : "outFlow" is a dictionnary and public variable. # Consider timeDelay so that time is at 0 at the global outlet. Therefore the transfer to the general outlet is applied. # - >= 2023.1 : "_outflow" is now a private dictionnary linked to a property called "outFlow". # It considers the hydro at the local time. To apply the transfer call the property "glob_hydro" # @unit $[\si{m}^3/\si{s}}]$
[docs] _outFlow:dict # [m^3/s] Hydro of the hydrological subbasin. Combined with the potentiel upstream hydros. Consider timeDelay so that time is at 0
# self.outFlowRaw = [] # [m^3/s] # Hyeto
[docs] myHyetoDict:dict
[docs] myRain:np.ndarray # [mm/h] Caution in the difference of units in rain !!!!!!
[docs] rain:np.ndarray # [m^3/h] Caution in the difference of units in rain !!!!!!
# Evapotranspiration
[docs] myEvap:np.ndarray # [mm/h]
[docs] evap:np.ndarray # [mm/h]
# Temperature
[docs] myTemp:np.ndarray
# Main subbasin characteristics
[docs] mainCharactDict:dict # Dictionnary with the main characteristics of the subbasin
[docs] mainCharactDictWholeHydro:dict # Dictionnary with the main characteristics of the subbasin
[docs] landuseDict:dict # Dictionnary with all landuses of the subbasin
[docs] landuseHydroDict:dict # Dictionnary with all landuses of the hydro subbasin
# Further information
[docs] surfaceDrained:float # [km^2]
[docs] surfaceDrainedHydro:float # [km^2]
[docs] timeDelay:float # [s]
[docs] peakVal:float # [m³/s] peak value for total outFlow
[docs] peakTime:datetime.datetime # datetime of the peak for total outflow -> time delay is already applied
# Hello!
[docs] ptr_q_all:np.ndarray # Pointer to Fortran for the result matrix
[docs] transferParam:Wolf_Param # Parameter file with the type of transfer applied -> e.g. timeDelay
[docs] _version:float # version of the wolfHydro python code. Useful for identifying the file versions to read and how to interpret them
def __init__(self, _dateBegin:datetime.datetime=None, _dateEnd:datetime.datetime=None, _deltaT:int=0, _model=cst.measures,_workingDir:str="", _hyeto:dict={}, _x:float=0.0, _y:float=0.0, surfaceDrained:float=0.0, _iD_interiorPoint:int=1,_idSorted:int=1, name:str=None, readHydro=True, _tz:int=0, version:str=cst.VERSION_WOLFHYDRO): if(name is None): self.name = 'ss '+ str(_iD_interiorPoint) else: self.name = name self.iD = 'ss'+str(_iD_interiorPoint) self.iDSorted = _idSorted self.x = _x self.y = _y self.haveInlets = False self.alreadyUsed = False # // self.isLeveled = False self.isActivated = True ## Time array containing all the timestamps # @var time timestamps array of dimension equal to rain and evap (or 1 element more than myHydro so far (VHM but not UH)). self.time = None self.dateBegin = _dateBegin # Must be in GMT+0 !!! self.dateEnd = _dateEnd # Must be in GMT+0 !!! self.deltaT = _deltaT # @var timezone in GMT saved to converted all computed or read data so that all data are expressed in GMT+0 self.tz = _tz self.model = _model self.treated = False # // self.myLevel = 1 self.fileNameRead = _workingDir # // self.fileNameWrite = self.fileNameRead # TO DO !!!!!!!! # self.intersectIndex = 0 ## Dictionary containing all the objects Catchment: ## @var myHydro an array whose dimensions depends on the model chosen: # - Unit hydrographs and Linear reservoir models : $1\times n$ elements # - VHM model : $3\times n$ elements: # myHydro[i][0] : overland flow # myHydro[i][1] : interflow # myHydro[i][2] : baseflow # @unit $[\si{m}^3/\si{s}}]$ self.myHydro = [] # [m^3/s] Hydro of the subbasin only -> UPDATE: [mm/h]!!!!! self.intletsObj = {} self.inlets = None self.inletsRaw = None self.downstreamObj = {} ## @var outFlow # Hydro of the hydrological subbasin. Combined with the potentiel upstream hydros. Consider timeDelay so that time is at 0 # @unit $[\si{m}^3/\si{s}}]$ self._outFlow = {} # [m^3/s] Hydro of the hydrological subbasin. Combined with the potentiel upstream hydros. Consider timeDelay so that time is at 0 # self.outFlowRaw = [] # [m^3/s] # Hyeto self.myHyetoDict = {} self.myRain = None # [mm/h] Caution in the difference of units in rain !!!!!! self.rain = [] # [m^3/h] Caution in the difference of units in rain !!!!!! # Evapotranspiration self.myEvap = None # [mm/h] self.evap = None # [mm/h] # Temperature self.myTemp = None # Outflow converted in hystograph self.hydrograph = None # // # self.hystograph = [] # Main subbasin characteristics self.mainCharactDict = {} # Dictionnary with the main characteristics of the subbasin self.mainCharactDictWholeHydro = {} # Dictionnary with the main characteristics of the hydro subbasin self.landuseDict = {} # Dictionnary with all landuses of the subbasin self.landuseHydroDict = {} # Dictionnary with all landuses of the hydro subbasin # Further information self.surfaceDrained = surfaceDrained # [km^2] self.surfaceDrainedHydro = surfaceDrained # [km^2] self.timeDelay = 0.0 # [s] self.peakVal = 0.0 # [m³/s] peak value for total outFlow self.peakTime = None # datetime of the peak for total outflow -> time delay is already applied # param files self.transferParam = None # Hello! self.ptr_q_all = None # version of the Python WOLFHydro self._version = version # Verification of the unicity of the time array # Load all the hydrographs of the sub-basins if(self.model==cst.measures or self.model==cst.compare_opti): readHydro=False # Get the main characteristics of the subbasin. If the hydro can be read, so be the main characteristics if(readHydro): self.read_myMainCharacteristics(_workingDir) self.init_timeDelay() if(readHydro): timeTest, self.myHydro = self.get_hydro(self.iDSorted, _workingDir, tzDelta=datetime.timedelta(hours=self.tz)) if(self.time is None): self.time = timeTest else: if not(np.array_equal(timeTest,self.time)): print('ERROR: Time array not the same! Please check your answers.') sys.exit() print('SubBasin Initialised!')
[docs] def change_haveInlets(self): "This procedure only increment the number of inlets of a subbasin" self.haveInlets = True
[docs] def get_hydro(self, iDSorted, workingDir, fileNames=None, tzDelta=datetime.timedelta(hours=0)): if(self.model==cst.tom_UH): print("Reading the Unit Hydrograph outlets...") # initialisation of the fileNames if(fileNames is None): subBasinName = 'Subbasin_' + str(iDSorted) + '/' typeOfFileName = 'simul_of.txt' fileName = os.path.join(workingDir, subBasinName, typeOfFileName) file_exists = os.path.exists(fileName) if(not(file_exists)): typeOfFileName = 'simul_net_trans_rain.txt' fileName = os.path.join(workingDir,subBasinName, typeOfFileName) file_exists = os.path.exists(fileName) if(file_exists): print("ERROR : the file simul_net_trans_rain.txt is not used yet in this version! Please check version of the code before 05/11/2021 !") print("Hydro file = ", fileName) sys.exit() else: print("ERROR : the hydro file is not present here!") print("Hydro file = ", fileName) sys.exit() else: # The file can only be a string or a list with 1 string if(type(fileNames)==str): fileName = workingDir + fileNames[0] elif(type(fileNames)==list and len(fileNames)!=1): fileName = workingDir + fileNames else: print("ERROR: Expecting only 1 file name for UH model!") sys.exit() # Reading the hydro output file with open(fileName, newline = '') as fileID: data_reader = csv.reader(fileID, delimiter='\t') list_data = [] i = 0 for raw in data_reader: if i>1: list_data.append(raw) i += 1 matrixData = np.array(list_data).astype("float") timeArray = np.zeros(len(matrixData)) # +1 as the time array is not one element more than the outlet in UH outFlow = np.zeros(len(matrixData),dtype=ct.c_double, order='F') # Init the time properties if not already done if self.dateBegin is None or self.dateEnd is None or self.deltaT == 0: self.dateBegin = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), minute=int(matrixData[0][4]), second=int(matrixData[0][5]), microsecond=0, tzinfo=datetime.timezone.utc) self.dateEnd = datetime.datetime(year=int(matrixData[-1][2]), month=int(matrixData[-1][1]), day=int(matrixData[-1][0]), hour=int(matrixData[-1][3]), minute=int(matrixData[-1][4]), second=int(matrixData[-1][5]), microsecond=0, tzinfo=datetime.timezone.utc) self.deltaT = (datetime.datetime(year=int(matrixData[1][2]), month=int(matrixData[1][1]), day=int(matrixData[1][0]), hour=int(matrixData[1][3]), minute=int(matrixData[1][4]), second=int(matrixData[1][5]), microsecond=0, tzinfo=datetime.timezone.utc) - self.dateBegin).total_seconds() secondsInDay = 24*60*60 prevDate = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), minute=int(matrixData[0][4]), second=int(matrixData[0][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta if(self.dateBegin!=prevDate): print("ERROR: The first date in hydro data does not coincide with the one expected!") print("Date read = ", prevDate) print("Date expected = ", self.dateBegin) sys.exit() # outFlow[0] = matrixData[0][6]*self.surfaceDrained/3.6 if self._version<2022.0: outFlow[0] = matrixData[0][6]/self.surfaceDrained*3.6 else: outFlow[0] = matrixData[0][6] timeArray[0] = datetime.datetime.timestamp(prevDate) # Caution!! -1 is here because the size of hydros in UH is the same as rain (not the case in VHM) nbData = len(matrixData) for i in range(1,nbData): currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=int(matrixData[i][3]), minute=int(matrixData[i][4]), second=int(matrixData[i][5]), microsecond=0, tzinfo=datetime.timezone.utc) currDate -= tzDelta prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=int(matrixData[i-1][3]), minute=int(matrixData[i-1][4]), second=int(matrixData[i-1][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta diffDate = currDate - prevDate diffTimeInSeconds = diffDate.days*secondsInDay + diffDate.seconds timeArray[i] = datetime.datetime.timestamp(currDate) # timeArray[i] = timeArray[i-1] + diffTimeInSeconds # outFlow[i] = matrixData[i][6]*self.surfaceDrained/3.6 outFlow[i] = matrixData[i][6] # timeArray[-1] = timeArray[-2] + diffTimeInSeconds # The last date is not taken into account in hydro as the last date rain and evap is needed for implicit simulations # if(self.dateEnd-diffDate!=currDate): # print("ERROR: The last date in hydro data does not coincide with the one expected!") # sys.exit() if(self.dateEnd!=currDate): if(self.dateEnd!=currDate+datetime.timedelta(seconds=diffTimeInSeconds)): print("ERROR: The last date in hydro data does not coincide with the one expected!") print("Date read = ", currDate) print("Date expected = ", self.dateEnd) sys.exit() else: logging.warning(_("ERROR: The last date in hydro data does not coincide with the one expected!")) logging.warning(_("Date read = "+ str(currDate))) logging.warning(_("Date expected = "+ str(self.dateEnd))) logging.warning(_("The time series will be adapted to fit the expected dimensions")) timeArray = np.append(timeArray, 0) outFlow = np.append(outFlow, 0.0) if(self.deltaT!=diffTimeInSeconds): print("ERROR: The last timestep in hydro data does not coincide with the one expected!") print("Delta t read = ", diffTimeInSeconds) print("Delta t expected = ", self.deltaT) sys.exit() # Save time array if it does not exist yet # Otherwise, check the consistency of the array with the time array of the object if(self.time is None): self.time=timeArray elif(self.time!=timeArray): print("ERROR: the dates read are not consitent with the dates already recored in this subbasin!") sys.exit() if self._version<2022.0: outFlow[:] = outFlow[:]/self.surfaceDrained*3.6 elif(self.model==cst.tom_2layers_linIF or self.model==cst.tom_2layers_UH): # For this model, there are 3 different layers to read. print("Reading the 2 outlet files...") matrixData = [] # Reading the overland flow file and time if(fileNames is None): subBasinName = os.path.join(workingDir, 'Subbasin_' + str(iDSorted)) subBasinName = os.path.join(subBasinName, 'simul_') fileName = subBasinName + "of.txt" # fileName = subBasinName + "of.dat" # isOk, fileName = rd.check_path(fileName) # readBin = True # if not isOk: # fileName = subBasinName + "of.txt" # readBin = False else: if(len(fileNames)!=2): print("ERROR: Expecting 2 file names for VHM model!") sys.exit() fileName = workingDir + fileNames[0] # readBin = False # if readBin: # os.path.join(workingDir, 'Subbasin_' + str(iDSorted)) # list_data = rd.read_bin(os.path.join(workingDir, 'Subbasin_' + str(iDSorted)), 'simul_if.dat', hydro=True) # matrixData = np.array(list_data).astype("float") # else: # with open(fileName, newline = '') as fileID: # data_reader = csv.reader(fileID, delimiter='\t') # list_data = [] # i=0 # for raw in data_reader: # if i>1: # list_data.append(raw) # i += 1 # matrixData = np.array(list_data).astype("float") with open(fileName, newline = '') as fileID: data_reader = csv.reader(fileID, delimiter='\t') list_data = [] i=0 for raw in data_reader: if i>1: list_data.append(raw) i += 1 matrixData = np.array(list_data).astype("float") timeArray = np.zeros(len(matrixData)) # Init of the outflow array outFlow = np.zeros((len(matrixData),2),dtype=ct.c_double, order='F') # Init the time properties if not already done if self.dateBegin is None or self.dateEnd is None or self.deltaT == 0: self.dateBegin = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), minute=int(matrixData[0][4]), second=int(matrixData[0][5]), microsecond=0, tzinfo=datetime.timezone.utc) self.dateEnd = datetime.datetime(year=int(matrixData[-1][2]), month=int(matrixData[-1][1]), day=int(matrixData[-1][0]), hour=int(matrixData[-1][3]), minute=int(matrixData[-1][4]), second=int(matrixData[-1][5]), microsecond=0, tzinfo=datetime.timezone.utc) self.deltaT = (datetime.datetime(year=int(matrixData[1][2]), month=int(matrixData[1][1]), day=int(matrixData[1][0]), hour=int(matrixData[1][3]), minute=int(matrixData[1][4]), second=int(matrixData[1][5]), microsecond=0, tzinfo=datetime.timezone.utc) - self.dateBegin).total_seconds() secondsInDay = 24*60*60 prevDate = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), minute=int(matrixData[0][4]), second=int(matrixData[0][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta if(self.dateBegin!=prevDate): print("ERROR: The first date in hydro data does not coincide with the one expected!") print("Date read = ", prevDate) print("Date expected = ", self.dateBegin) sys.exit() timeArray[0] = datetime.datetime.timestamp(prevDate) # Older versions of that of UH file was expressed in m³/s if self._version<2022.0: outFlow[0][0] = matrixData[0][6]/self.surfaceDrained*3.6 else: outFlow[0][0] = matrixData[0][6] for i in range(1,len(matrixData)): currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=int(matrixData[i][3]), minute=int(matrixData[i][4]), second=int(matrixData[i][5]), microsecond=0, tzinfo=datetime.timezone.utc) currDate -= tzDelta prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=int(matrixData[i-1][3]), minute=int(matrixData[i-1][4]), second=int(matrixData[i-1][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta diffDate = currDate - prevDate diffTimeInSeconds = diffDate.days*secondsInDay + diffDate.seconds timeArray[i] = datetime.datetime.timestamp(currDate) # timeArray[i] = timeArray[i-1] + diffTimeInSeconds if self._version<2022.0: outFlow[i][0] = matrixData[i][6]/self.surfaceDrained*3.6 else: outFlow[i][0] = matrixData[i][6] # timeArray[-1] = timeArray[-2] + diffTimeInSeconds # The last date is not taken into account in hydro as the last date rain and evap is needed for implicit simulations # if(self.dateEnd-diffDate!=currDate): # print("ERROR: The last date in hydro data does not coincide with the one expected!") # sys.exit() # if(self.dateEnd!=currDate): # print("ERROR: The last date in hydro data does not coincide with the one expected!") # print("Date read = ", currDate) # print("Date expected = ", self.dateEnd) # sys.exit() if(self.dateEnd!=currDate): if(self.dateEnd!=currDate+datetime.timedelta(seconds=diffTimeInSeconds)): print(_("ERROR: The last date in hydro data does not coincide with the one expected!")) print(_("Date read = ", currDate)) print(_("Date expected = ", self.dateEnd)) sys.exit() else: logging.warning("ERROR: The last date in hydro data does not coincide with the one expected!") logging.warning(_("Date read = ") + str(currDate)) logging.warning(_("Date expected = ") + str(self.dateEnd)) logging.warning(_("The time series will be adapted to fit the expected dimensions")) timeArray = np.append(timeArray, 0) # outFlow = np.append(outFlow, 0.0) newLine = np.zeros((1, 2), dtype=ct.c_double, order='F') outFlow = np.vstack([outFlow, newLine]) if(self.deltaT!=diffTimeInSeconds): print("ERROR: The last timestep in hydro data does not coincide with the one expected!") print("Delta t = ", diffTimeInSeconds) print("Delta t expected = ", self.deltaT) sys.exit() # Save time array if it does not exist yet # Otherwise, check the consistency of the array with the time array of the object if(self.time is None): self.time=timeArray elif((self.time!=timeArray).all()): print("ERROR: the dates read are not consitent with the dates already recored in this subbasin!") sys.exit() # Reading the interflow file matrixData = [] # if(fileNames is None): # # Changes ongoing -> read bin # fileName = subBasinName + "if.dat" # isOk, fileName = rd.check_path(fileName) # readBin = True # if not isOk: # fileName = subBasinName + "if.txt" # readBin = False # # ====== # else: # fileName = workingDir + fileNames[1] # readBin = False fileName = subBasinName + "if.txt" readBin = False if readBin: os.path.join(workingDir, 'Subbasin_' + str(iDSorted)) list_data = rd.read_bin(os.path.join(workingDir, 'Subbasin_' + str(iDSorted)), 'simul_if.dat', hydro=True) matrixData = np.array(list_data).astype("float") else: with open(fileName, newline = '') as fileID: data_reader = csv.reader(fileID, delimiter='\t') list_data = [] i=0 for raw in data_reader: if i>1: list_data.append(raw) i += 1 matrixData = np.array(list_data).astype("float") if(self.model==cst.tom_2layers_linIF): # outFlow[0][1] = matrixData[0][6]*self.surfaceDrained/3.6 outFlow[0][1] = matrixData[0][6] for i in range(1,len(matrixData)): # outFlow[i][1] = matrixData[i][6]*self.surfaceDrained/3.6 outFlow[i][1] = matrixData[i][6] else: if self._version<2022.0: outFlow[0][1] = matrixData[0][6]/self.surfaceDrained*3.6 for i in range(1,len(matrixData)): outFlow[i][1] = matrixData[i][6]/self.surfaceDrained*3.6 else: outFlow[0][1] = matrixData[0][6] for i in range(1,len(matrixData)): outFlow[i][1] = matrixData[i][6] elif(self.model==cst.tom_VHM): # For this model, there are 3 different layers to read. print("Reading the 3 VHM outlet files...") matrixData = [] # Reading the overland flow file and time if(fileNames is None): subBasinName = os.path.join(workingDir, 'Subbasin_' + str(iDSorted)) subBasinName = os.path.join(subBasinName, 'simul_') fileName = subBasinName + "of.txt" else: if(len(fileNames)!=3): print("ERROR: Expecting 3 file names for VHM model!") sys.exit() fileName = workingDir + fileNames[0] with open(fileName, newline = '') as fileID: data_reader = csv.reader(fileID, delimiter='\t') list_data = [] i=0 for raw in data_reader: if i>1: list_data.append(raw) i += 1 matrixData = np.array(list_data).astype("float") timeArray = np.zeros(len(matrixData)) # Init the time properties if not already done if self.dateBegin is None or self.dateEnd is None or self.deltaT == 0: self.dateBegin = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), minute=int(matrixData[0][4]), second=int(matrixData[0][5]), microsecond=0, tzinfo=datetime.timezone.utc) self.dateEnd = datetime.datetime(year=int(matrixData[-1][2]), month=int(matrixData[-1][1]), day=int(matrixData[-1][0]), hour=int(matrixData[-1][3]), minute=int(matrixData[-1][4]), second=int(matrixData[-1][5]), microsecond=0, tzinfo=datetime.timezone.utc) self.deltaT = (datetime.datetime(year=int(matrixData[1][2]), month=int(matrixData[1][1]), day=int(matrixData[1][0]), hour=int(matrixData[1][3]), minute=int(matrixData[1][4]), second=int(matrixData[1][5]), microsecond=0, tzinfo=datetime.timezone.utc) - self.dateBegin).total_seconds() # Init of the outflow array outFlow = np.zeros((len(matrixData),3),dtype=ct.c_double, order='F') secondsInDay = 24*60*60 prevDate = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), minute=int(matrixData[0][4]), second=int(matrixData[0][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta if(self.dateBegin!=prevDate): print("ERROR: The first date in hydro data does not coincide with the one expected!") print("Date read = ", prevDate) print("Date expected = ", self.dateBegin) sys.exit() timeArray[0] = datetime.datetime.timestamp(prevDate) # outFlow[0][0] = matrixData[0][6]*self.surfaceDrained/3.6 outFlow[0][0] = matrixData[0][6] for i in range(1,len(matrixData)): currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=int(matrixData[i][3]), minute=int(matrixData[i][4]), second=int(matrixData[i][5]), microsecond=0, tzinfo=datetime.timezone.utc) currDate -= tzDelta prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=int(matrixData[i-1][3]), minute=int(matrixData[i-1][4]), second=int(matrixData[i-1][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta diffDate = currDate - prevDate diffTimeInSeconds = diffDate.days*secondsInDay + diffDate.seconds timeArray[i] = datetime.datetime.timestamp(currDate) # timeArray[i] = timeArray[i-1] + diffTimeInSeconds # outFlow[i][0] = matrixData[i][6]*self.surfaceDrained/3.6 outFlow[i][0] = matrixData[i][6] # timeArray[-1] = timeArray[-2] + diffTimeInSeconds # The last date is not taken into account in hydro as the last date rain and evap is needed for implicit simulations # if(self.dateEnd-diffDate!=currDate): # print("ERROR: The last date in hydro data does not coincide with the one expected!") # sys.exit() if(self.dateEnd!=currDate): print("ERROR: The last date in hydro data does not coincide with the one expected!") print("Date read = ", currDate) print("Date expected = ", self.dateEnd) sys.exit() if(self.deltaT!=diffTimeInSeconds): print("ERROR: The last timestep in hydro data does not coincide with the one expected!") print("Delta t read = ", diffTimeInSeconds) print("Delta t expected = ", self.deltaT) sys.exit() # Save time array if it does not exist yet # Otherwise, check the consistency of the array with the time array of the object if(self.time is None): self.time=timeArray elif((self.time!=timeArray).all()): print("ERROR: the dates read are not consitent with the dates already recored in this subbasin!") sys.exit() # Reading the interflow file matrixData = [] if(fileNames is None): fileName = subBasinName + "if.txt" else: fileName = workingDir + fileNames[1] with open(fileName, newline = '') as fileID: data_reader = csv.reader(fileID, delimiter='\t') list_data = [] i=0 for raw in data_reader: if i>1: list_data.append(raw) i += 1 matrixData = np.array(list_data).astype("float") # outFlow[0][1] = matrixData[0][6]*self.surfaceDrained/3.6 outFlow[0][1] = matrixData[0][6] for i in range(1,len(matrixData)): # outFlow[i][1] = matrixData[i][6]*self.surfaceDrained/3.6 outFlow[i][1] = matrixData[i][6] # Reading the baseflow file matrixData = [] if(fileNames is None): fileName = subBasinName + "bf.txt" else: fileName = workingDir + fileNames[2] with open(fileName, newline = '') as fileID: data_reader = csv.reader(fileID, delimiter='\t') list_data = [] i=0 for raw in data_reader: if i>1: list_data.append(raw) i += 1 matrixData = np.array(list_data).astype("float") # outFlow[0][2] = matrixData[0][6]*self.surfaceDrained/3.6 outFlow[0][2] = matrixData[0][6] for i in range(1,len(matrixData)): # outFlow[i][2] = matrixData[i][6]*self.surfaceDrained/3.6 outFlow[i][2] = matrixData[i][6] elif(self.model==cst.tom_GR4): # For this model, there is only 1 output to consider. print("Reading the 1 outlet file ...") matrixData = [] # Reading the overland flow file and time if(fileNames is None): subBasinName = os.path.join(workingDir, 'Subbasin_' + str(iDSorted), 'simul_') fileName = subBasinName + "GR4_out.txt" else: # The file can only be a string or a list with 1 string if(type(fileNames)==str): fileName = workingDir + fileNames[0] elif(type(fileNames)==list and len(fileNames)==1): fileName = workingDir + fileNames else: print("ERROR: Expecting only 1 file name for UH model!") sys.exit() with open(fileName, newline = '') as fileID: data_reader = csv.reader(fileID, delimiter='\t') list_data = [] i=0 for raw in data_reader: if i>1: list_data.append(raw) i += 1 matrixData = np.array(list_data).astype("float") timeArray = np.zeros(len(matrixData)) # Init of the outflow array outFlow = np.zeros((len(matrixData),1),dtype=ct.c_double, order='F') # Init the time properties if not already done if self.dateBegin is None or self.dateEnd is None or self.deltaT == 0: self.dateBegin = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), minute=int(matrixData[0][4]), second=int(matrixData[0][5]), microsecond=0, tzinfo=datetime.timezone.utc) self.dateEnd = datetime.datetime(year=int(matrixData[-1][2]), month=int(matrixData[-1][1]), day=int(matrixData[-1][0]), hour=int(matrixData[-1][3]), minute=int(matrixData[-1][4]), second=int(matrixData[-1][5]), microsecond=0, tzinfo=datetime.timezone.utc) self.deltaT = (datetime.datetime(year=int(matrixData[1][2]), month=int(matrixData[1][1]), day=int(matrixData[1][0]), hour=int(matrixData[1][3]), minute=int(matrixData[1][4]), second=int(matrixData[1][5]), microsecond=0, tzinfo=datetime.timezone.utc) - self.dateBegin).total_seconds() secondsInDay = 24*60*60 prevDate = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), minute=int(matrixData[0][4]), second=int(matrixData[0][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta if(self.dateBegin!=prevDate): print("ERROR: The first date in hydro data does not coincide with the one expected!") print("Date read = ", prevDate) print("Date expected = ", self.dateBegin) sys.exit() timeArray[0] = datetime.datetime.timestamp(prevDate) # outFlow[0][0] = matrixData[0][6]*self.surfaceDrained/3.6 outFlow[0][0] = matrixData[0][6] for i in range(1,len(matrixData)): currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=int(matrixData[i][3]), minute=int(matrixData[i][4]), second=int(matrixData[i][5]), microsecond=0, tzinfo=datetime.timezone.utc) currDate -= tzDelta prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=int(matrixData[i-1][3]), minute=int(matrixData[i-1][4]), second=int(matrixData[i-1][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta diffDate = currDate - prevDate diffTimeInSeconds = diffDate.days*secondsInDay + diffDate.seconds timeArray[i] = datetime.datetime.timestamp(currDate) # timeArray[i] = timeArray[i-1] + diffTimeInSeconds # outFlow[i][0] = matrixData[i][6]*self.surfaceDrained/3.6 outFlow[i][0] = matrixData[i][6] # timeArray[-1] = timeArray[-2] + diffTimeInSeconds # The last date is not taken into account in hydro as the last date rain and evap is needed for implicit simulations # if(self.dateEnd-diffDate!=currDate): # print("ERROR: The last date in hydro data does not coincide with the one expected!") # sys.exit() if(self.dateEnd!=currDate): print("ERROR: The last date in hydro data does not coincide with the one expected!") print("Date read = ", currDate) print("Date expected = ", self.dateEnd) sys.exit() if(self.deltaT!=diffTimeInSeconds): print("Delta t read = ", diffTimeInSeconds) print("Delta t expected = ", self.deltaT) print("ERROR: The last timestep in hydro data does not coincide with the one expected!") sys.exit() # Save time array if it does not exist yet # Otherwise, check the consistency of the array with the time array of the object if(self.time is None): self.time=timeArray elif((self.time!=timeArray).all()): print("ERROR: the dates read are not consitent with the dates already recored in this subbasin!") sys.exit() elif(self.model==cst.measures): print("Reading the measurements outlet file...") if(type(fileNames)!=str): print("ERROR: Expecting only 1 file name for measurements!") sys.exit() fileName = os.path.join(workingDir,fileNames) nbCl = 0 with open(fileName, newline = '') as fileID: data_reader = csv.reader(fileID, delimiter=' ',skipinitialspace=True) list_data = [] i=0 for raw in data_reader: if i>3: list_data.append(raw[0:nbCl]) if i==2: nbCl = int(raw[0]) i += 1 matrixData = np.array(list_data).astype("float") # Init the time properties if not already done if self.dateBegin is None or self.dateEnd is None or self.deltaT == 0: if nbCl==5: self.dateBegin = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), tzinfo=datetime.timezone.utc) self.dateEnd = datetime.datetime(year=int(matrixData[-1][2]), month=int(matrixData[-1][1]), day=int(matrixData[-1][0]), hour=int(matrixData[-1][3]), tzinfo=datetime.timezone.utc) self.deltaT = (datetime.datetime(year=int(matrixData[1][2]), month=int(matrixData[1][1]), day=int(matrixData[1][0]), hour=int(matrixData[1][3]), tzinfo=datetime.timezone.utc) - self.dateBegin).total_seconds() if nbCl==7: self.dateBegin = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), minute=int(matrixData[0][4]), second=int(matrixData[0][5]), microsecond=0, tzinfo=datetime.timezone.utc) self.dateEnd = datetime.datetime(year=int(matrixData[-1][2]), month=int(matrixData[-1][1]), day=int(matrixData[-1][0]), hour=int(matrixData[-1][3]), minute=int(matrixData[-1][4]), second=int(matrixData[-1][5]), microsecond=0, tzinfo=datetime.timezone.utc) self.deltaT = (datetime.datetime(year=int(matrixData[1][2]), month=int(matrixData[1][1]), day=int(matrixData[1][0]), hour=int(matrixData[1][3]), minute=int(matrixData[1][4]), second=int(matrixData[1][5]), microsecond=0, tzinfo=datetime.timezone.utc) - self.dateBegin).total_seconds() # Init of the outflow array timeInterval = self.dateEnd-self.dateBegin+datetime.timedelta(seconds=self.deltaT) outFlow = np.zeros(int(timeInterval.total_seconds()/self.deltaT),dtype=ct.c_double, order='F') timeArray = np.zeros(int(timeInterval.total_seconds()/self.deltaT)) # From the measurements file, we will only read the desired data and save it in outflow prevDate = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), tzinfo=datetime.timezone.utc) prevDate -= tzDelta index = 0 add1Hour = datetime.timedelta(hours=1) secondsInDay = 24*60*60 # Verification if(datetime.datetime.timestamp(prevDate)>datetime.datetime.timestamp(self.dateBegin)): logging.error("ERROR: the first hydro data element is posterior to dateBegin!") print("Date read = ", prevDate) print("Date expected = ", self.dateBegin) sys.exit() if(nbCl==5): # Caution : the index of the loop start at 24 because the timestamp function # does not work until the 2/01/1970 at 03:00:00. => Je ne sais pas pourquoi ?! for i in range(25,len(matrixData)): # The hours are written in the file in [1,24] instead of [0,23]. Conversion below: if(int(matrixData[i][3])==24): currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=23, tzinfo=datetime.timezone.utc) + add1Hour else: currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=int(matrixData[i][3]), tzinfo=datetime.timezone.utc) currDate -= tzDelta if(int(matrixData[i-1][3])==24): prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=23, tzinfo=datetime.timezone.utc) + add1Hour else: prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=int(matrixData[i-1][3]), tzinfo=datetime.timezone.utc) prevDate -= tzDelta # Start at dateBegin and go to the element before dateEnd. Because the last date is needed for rain and evap in implicit simulations. if(datetime.datetime.timestamp(currDate)>=datetime.datetime.timestamp(self.dateBegin) and \ datetime.datetime.timestamp(currDate)<=datetime.datetime.timestamp(self.dateEnd)): outFlow[index] = matrixData[i][4] diffDate = currDate - prevDate diffTimeInSeconds = diffDate.days*secondsInDay + diffDate.seconds timeArray[index] = datetime.datetime.timestamp(currDate) # timeArray[index] = timeArray[index-1] + diffTimeInSeconds index += 1 elif(nbCl==7): for i in range(len(matrixData)): # The hours are written in the file in [1,24] instead of [0,23]. Conversion below: currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=int(matrixData[i][3]), minute=int(matrixData[i][4]), second=int(matrixData[i][5]),tzinfo=datetime.timezone.utc) currDate -= tzDelta prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=int(matrixData[i-1][3]), minute=int(matrixData[i-1][4]), second=int(matrixData[i-1][5]),tzinfo=datetime.timezone.utc) prevDate -= tzDelta # Start at dateBegin and go to the element before dateEnd. Because the last date is needed for rain and evap in implicit simulations. if(datetime.datetime.timestamp(currDate)>=datetime.datetime.timestamp(self.dateBegin) and \ datetime.datetime.timestamp(currDate)<=datetime.datetime.timestamp(self.dateEnd)): if(matrixData[i][6]<0): outFlow[index] = 0.0 else: outFlow[index] = matrixData[i][6] outFlow[index] = matrixData[i][6] diffDate = currDate - prevDate diffTimeInSeconds = diffDate.days*secondsInDay + diffDate.seconds timeArray[index] = datetime.datetime.timestamp(currDate) # timeArray[index] = timeArray[index-1] + diffTimeInSeconds index += 1 # The last date is not taken into account in hydro as the last date rain and evap is needed for implicit simulations diffDate = currDate - prevDate # Add the last element in the time matrix as its size is 1 element bigger than outlet # timeArray[-1] = timeArray[-2] + diffTimeInSeconds if(self.deltaT!=diffDate.seconds): print("ERROR: The last timestep in hydro data does not coincide with the one expected!") print("Delta t read = ", diffDate.seconds) print("Delta t expected = ", self.deltaT) sys.exit() # Save time array if it does not exist yet # Otherwise, check the consistency of the array with the time array of the object if(self.time is None): self.time=timeArray elif(self.time!=timeArray): print("ERROR: the dates read are not consitent with the dates already recored in this subbasin!") sys.exit() elif(self.model==cst.compare_opti): print("Reading the measurements outlet file...") if(type(fileNames)!=str): print("ERROR: Expecting only 1 file name for measurements!") sys.exit() fileName = os.path.join(workingDir, fileNames) nbCl = 0 with open(fileName, newline = '') as fileID: data_reader = csv.reader(fileID, delimiter='\t',skipinitialspace=True) list_data = [] i=0 for raw in data_reader: if i==0: nbL = int(raw[0]) nbCl = int(raw[1]) else: list_data.append(raw[0:nbCl]) i += 1 matrixData = np.array(list_data).astype("float") # Init the time properties if not already done if self.dateBegin is None or self.dateEnd is None or self.deltaT == 0: self.dateBegin = datetime.datetime.fromtimestamp(matrixData[0][0], tz=datetime.timezone.utc) self.dateEnd = datetime.datetime.fromtimestamp(matrixData[-1][0], tz=datetime.timezone.utc) self.deltaT = matrixData[1][0]-matrixData[0][0] # Init of the outflow array timeInterval = self.dateEnd-self.dateBegin+datetime.timedelta(seconds=self.deltaT) # outFlow = np.zeros(int(timeInterval.total_seconds()/self.deltaT),dtype=ct.c_double, order='F') outFlow = np.zeros(len(matrixData),dtype=ct.c_double, order='F') timeArray = np.zeros(len(matrixData)) # From the measurements file, we will only read the desired data and save it in outflow prevDate = int(matrixData[0][0]) outFlow[0] = matrixData[0][1] timeArray[0] = prevDate index = 1 add1Hour = datetime.timedelta(hours=1) secondsInDay = 24*60*60 # Verification if(prevDate>datetime.datetime.timestamp(self.dateBegin)): logging.error("ERROR: the first hydro data element is posterior to dateBegin!") print("Date read = ", prevDate) print("Date expected = ", self.dateBegin) sys.exit() for i in range(1,len(matrixData)): currDate = int(matrixData[i][0]) prevDate = int(matrixData[i-1][0]) # Start at dateBegin and go to the element before dateEnd. Because the last date is needed for rain and evap in implicit simulations. if(currDate>=datetime.datetime.timestamp(self.dateBegin) and \ currDate<=datetime.datetime.timestamp(self.dateEnd)): outFlow[index] = matrixData[i][1] diffDate = currDate - prevDate timeArray[index] = currDate index += 1 # The last date is not taken into account in hydro as the last date rain and evap is needed for implicit simulations diffDate = currDate - prevDate # Add the last element in the time matrix as its size is 1 element bigger than outlet # timeArray[-1] = timeArray[-2] + diffTimeInSeconds if(self.deltaT!=diffDate): print("WARNING: The last timestep in hydro data does not coincide with the one expected!") print("Delta t read = ", diffDate) print("Delta t expected = ", self.deltaT) print("Replacing the deltaT by the one read on the file!") self.deltaT = diffDate # sys.exit() # Save time array if it does not exist yet # Otherwise, check the consistency of the array with the time array of the object if(self.time is None): self.time=timeArray elif(self.time!=timeArray): print("ERROR: the dates read are not consitent with the dates already recored in this subbasin!") sys.exit() return timeArray[:index], outFlow[:index] return timeArray, outFlow
[docs] def get_outFlow_noDelay(self, unit='m3/s'): """ This function returns the total outlet of the basin and considers t0=0 at the outlet of the subbasin without considering timeDelay (the time of the real outlet of the whole potential catchment) """ logging.warning("This function is operational but obsolate since version" + str(cst.VERSION_WOLFHYDRO)) # nameOut = list(self.outFlow.items())[0][0] # myOutFlow = self.outFlow[nameOut]["Net"] # tmpHydro = np.zeros(len(myOutFlow)) # index = math.floor(self.timeDelay/self.deltaT) # if(index==0): # tmpHydro = myOutFlow.copy() # elif(index<len(myOutFlow)): # tmpHydro[:-index] = myOutFlow[index:] # else: # print("Warning: the simulation time is not long enough for this subbasin to be taken into account") # print("Error informations : ") # print("Function name : get_outFlow_noDelay()") # print("index = ", index) # print("len(myOutFlow) = ", len(myOutFlow)) # print("self.timeDelay = ", self.timeDelay) # return tmpHydro # if unit=='mm/h': # tmpHydro *= 3.6/self.surfaceDrainedHydro return self.get_outFlow(unit=unit)
[docs] def get_outFlowRaw_noDelay(self, unit='m3/s'): """ This function returns the total raw outlet of the basin and considers t0=0 at the outlet of the subbasin without considering timeDelay (the time of the real outlet of the whole potential catchment) """ logging.warning("This function is operational but obsolate since version" + str(cst.VERSION_WOLFHYDRO)) # nameOut = list(self.outFlow.items())[0][0] # myOutFlow = self.outFlow[nameOut]["Raw"] # tmpHydro = np.zeros(len(myOutFlow)) # index = math.floor(self.timeDelay/self.deltaT) # if(index==0): # tmpHydro = myOutFlow.copy() # elif(index<len(myOutFlow)): # tmpHydro[:-index] = myOutFlow[index:] # else: # logging.error("ERROR: the simulation time is not long enough for this subbasin to be taken into account") # logging.error("Error informations : ") # logging.error("Function name : get_outFlowRaw_noDelay()") # logging.error("index = " + str(index)) # logging.error("len(myOutFlow) = " + str(len(myOutFlow))) # logging.error("self.timeDelay = " + str(self.timeDelay)) # return # if unit=='mm/h': # tmpHydro *= 3.6/self.surfaceDrainedHydro return self.get_outFlow(typeOutFlow="Raw", unit=unit)
[docs] def get_inlets_noDelay(self, unit='m3/s'): """ This function returns the total inlets of the basin and considers t0=0 at the outlet of the subbasin without considering timeDelay (the time of the real outlet of the whole potential catchment) """ logging.warning("This function is operational but obsolate since version" + str(cst.VERSION_WOLFHYDRO)) # nameOut = list(self.outFlow.items())[0][0] # myInlets = self.inlets # tmpHydro = np.zeros(len(myInlets)) # index = math.floor(self.timeDelay/self.deltaT) # if(index==0): # tmpHydro = myInlets.copy() # elif(index<len(myInlets)): # tmpHydro[:-index] = myInlets[index:] # else: # print("Warning: the simulation time is not long enough for this subbasin to be taken into account") # print("Error informations : ") # print("Function name : get_inlets_noDelay()") # print("index = ", index) # print("len(myInlets) = ", len(myInlets)) # print("self.timeDelay = ", self.timeDelay) # return # if unit=='mm/h': # tmpHydro *= 3.6/self.surfaceDrainedHydro return self.inlets
[docs] def add_name(self, myName): "this function add a name to the subasin" self.name = myName
[docs] def increment_level(self): "This procedure increment the level in the Topo dictionary" self.myLevel += 1
[docs] def set_level(self, level): self.myLevel = level
[docs] def add_inlet(self, toPoint, name="DEFAULT"): "This procedure link the inlets to the object" self.intletsObj[name] = toPoint
[docs] def add_downstreamObj(self, toPoint, name="DEFAULT"): "This procedure link the downstream element to the object" if toPoint is not None: self.downstreamObj[name] = toPoint self._outFlow[name] = {} self._outFlow[name]["Net"] = [] self._outFlow[name]["Raw"] = []
[docs] def compute_hydro(self): """ This procedure computes the total hydrograph and raw hydrograph of subbasin The total hydrograph $q_{tot} is obtained with the formula: \f[ q_{tot} = \sum q_{\text{inlets}} + q_{\text{me}}$ \f] , with $q_{\text{me}}$ the hydrograph of the subbasin alone. Internal variable changed: outFlowRaw, outFlow, inletsRaw CAUTION: - Discussion about the ceil or the floor for the timeDelay indice!!! - UPDATE 2023.1 now the outFlow are not delayed anymore !!!! -> IMPORTANT UPDATE """ if(len(self._outFlow)==0): self._outFlow["DEFAULT"] = {} nameOutFlow = list(self._outFlow.items())[0][0] # Sum all the inlets hydrographs self.sum_inlets() if(self.model==cst.tom_UH or self.model==cst.measures or self.model==cst.tom_GR4 or self.model==cst.compare_opti): tmpHydro = np.zeros(len(self.myHydro),dtype=ct.c_double, order='F') if(self.model==cst.tom_GR4): tmpHydro = self.myHydro[:,0]*self.surfaceDrained/3.6 else: # tmpHydro = self.myHydro*self.surfaceDrained/3.6 tmpHydro = self.get_myHydro(unit="m3/s") # Raw hydrograp self._outFlow[nameOutFlow]["Raw"] = self.inletsRaw + tmpHydro # Real hydrograph self._outFlow[nameOutFlow]["Net"] = self.inlets + tmpHydro elif(self.model==cst.tom_VHM or self.model==cst.tom_2layers_linIF or self.model==cst.tom_2layers_UH): tmpOutFlow = np.sum(self.myHydro,1)*self.surfaceDrained/3.6 # Raw hydrograph self._outFlow[nameOutFlow]["Raw"] = np.zeros(len(tmpOutFlow)) self._outFlow[nameOutFlow]["Raw"] = self.inletsRaw + tmpOutFlow # for i in range(len(self.myHydro)): # self.outFlowRaw[i] = self.inletsRaw[i] + np.sum(self.myHydro[i]) # Real hydrograph self._outFlow[nameOutFlow]["Net"] = np.zeros(len(self.myHydro)) self._outFlow[nameOutFlow]["Net"] = self.inlets + tmpOutFlow
# for i in range(len(self.myHydro)): # self.outFlow[i] = self.inlets[i] + np.sum(self.myHydro[i])
[docs] def sum_inlets(self): """ Sum all the inlet hydrographs of a subbasin. Return an array of zeros otherwise. Internal variable changed: self.inlets, self.inletsRaw """ if(self.haveInlets): nameInlet = list(self.intletsObj.items())[0][0] curObj = self.intletsObj[nameInlet] timeInlet = curObj.timeDelay deltaTr = timeInlet - self.timeDelay self.inlets = curObj.get_outFlow(typeOutFlow="Net", whichOutFlow=nameInlet, lag=deltaTr) self.inletsRaw = curObj.get_outFlow(typeOutFlow="Raw", whichOutFlow=nameInlet, lag=deltaTr) for i in range(1,len(self.intletsObj)): nameInlet = list(self.intletsObj.items())[i][0] curObj = self.intletsObj[nameInlet] timeInlet = curObj.timeDelay deltaTr = timeInlet - self.timeDelay self.inlets += curObj.get_outFlow(typeOutFlow="Net", whichOutFlow=nameInlet, lag=deltaTr) self.inletsRaw += curObj.get_outFlow(typeOutFlow="Raw", whichOutFlow=nameInlet, lag=deltaTr) else: self.inlets = np.zeros(len(self.myHydro), dtype=ct.c_double, order='F') self.inletsRaw = np.zeros(len(self.myHydro), dtype=ct.c_double, order='F')
[docs] def add_rain(self, workingDir, fileName=None, tzDelta=datetime.timedelta(hours=0)): """ This procedure - reads: the time, rain in the rain file - saves: the rain of the subbasin, sum of the rain's inlets - returns: the time array read. - Variables modified: self.rain, self.myRain """ # Reading and saving the rain's basin if(fileName is None): fileName = 'Subbasin_'+str(self.iDSorted)+'/simul_lumped_rain.txt' with open(os.path.join(workingDir,fileName), newline = '') as fileID2: data_reader = csv.reader(fileID2, delimiter='\t') list_data = [] i=0 for raw in data_reader: if i>1: list_data.append(raw) i += 1 matrixData = np.array(list_data).astype("float") rainAll = np.zeros(len(matrixData)) timeAll = np.zeros(len(matrixData)) secondsInDay = 24*60*60 prevDate = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), minute=int(matrixData[0][4]), second=int(matrixData[0][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta timeAll[0] = datetime.datetime.timestamp(prevDate) timeI = datetime.datetime.timestamp(self.dateBegin) timeF = datetime.datetime.timestamp(self.dateEnd) if(timeI<timeAll[0]): print("ERROR: The rain dates in simul_lumped_rain are not compatible with the beginning of the simulation !") print("Date read = ", prevDate) print("Starting date of the simulation = ", self.dateBegin) sys.exit() for i in range(1,len(matrixData)): currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=int(matrixData[i][3]), minute=int(matrixData[i][4]), second=int(matrixData[i][5]), microsecond=0, tzinfo=datetime.timezone.utc) currDate -= tzDelta prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=int(matrixData[i-1][3]), minute=int(matrixData[i-1][4]), second=int(matrixData[i-1][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta diffDate = currDate - prevDate diffTimeInSeconds = diffDate.days*secondsInDay + diffDate.seconds timeAll[i] = datetime.datetime.timestamp(currDate) # time[i] = time[i-1] + diffTimeInSeconds rainAll[i] = matrixData[i][6] # Check the compatibility of the time steps with the hydro simulation if(self.deltaT!=diffTimeInSeconds): print("ERROR: The last timestep in rain data does not coincide with the one expected!") print("Delta t read = ", diffTimeInSeconds) print("Delta t expected = ", self.deltaT) sys.exit() # Extract just the part of the rain contained in the simulation range condTime = (timeAll>=timeI) & (timeAll<=timeF) time = np.extract(condTime, timeAll) rain = np.extract(condTime, rainAll) # Test of the code good procedure -> to remove if(timeF!=time[-1]): print("ERROR: The last date in rain data does not coincide with the one expected!") print("Date timestamp read = ", time[-1]) print("Date timestamp expected = ", self.dateEnd) sys.exit() # Save the time if it does not exist yet if(self.time is None): self.time=time print("Time didn't exist before, therefore it is save now according to rain data time serie!") elif not(np.array_equal(time,self.time)): print('Time arrays are not the same! Please check your answers.') self.myRain = rain # Unit conversion to [m^3/s] if(self.surfaceDrained==0): print("WARNING : surfaceDrained=0! It should not be the case to apply correctly this step. Please check your precedure and define its value!") rain = rain*10**(-3)*self.surfaceDrained*10**(6)/3600.0 # Sum of the rain of all the inlets to get the total rain for iInlet in self.intletsObj: rain += self.intletsObj[iInlet].rain self.rain = rain return time
[docs] def add_evap(self, workingDir, fileName=None, tzDelta=datetime.timedelta(hours=0)): """ This procedure - reads: the time, evapotranspiration in the evap file - saves: the evapotranspiration of the subbasin, sum of the evapotranspiration's inlets -> to correct with surface of the basin - returns: the time array read. - Variables modified: self.evap, self.myEvap """ # Reading and saving the evap's basin if(fileName is None): fileName = 'Subbasin_'+str(self.iDSorted)+'/simul_lumped_evap.txt' with open(workingDir+fileName, newline = '') as fileID2: data_reader = csv.reader(fileID2, delimiter='\t') list_data = [] i=0 for raw in data_reader: if i>1: list_data.append(raw) i += 1 matrixData = np.array(list_data).astype("float") evapAll = np.zeros(len(matrixData)) timeAll = np.zeros(len(matrixData)) secondsInDay = 24*60*60 prevDate = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), minute=int(matrixData[0][4]), second=int(matrixData[0][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta timeAll[0] = datetime.datetime.timestamp(prevDate) timeI = datetime.datetime.timestamp(self.dateBegin) timeF = datetime.datetime.timestamp(self.dateEnd) # Check the coherence between the first date in rain and the simualtion range. It can have more rain data than simulation but not the contrary! if(timeI<time[0]): print("ERROR: The rain dates in simul_lumped_rain are not compatible with the beginning of the simulation !") print("Date read = ", prevDate) print("Starting date of the simulation = ", self.dateBegin) sys.exit() for i in range(1,len(matrixData)): currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=int(matrixData[i][3]), minute=int(matrixData[i][4]), second=int(matrixData[i][5]), microsecond=0, tzinfo=datetime.timezone.utc) currDate -= tzDelta prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=int(matrixData[i-1][3]), minute=int(matrixData[i-1][4]), second=int(matrixData[i-1][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta diffDate = currDate - prevDate diffTimeInSeconds = diffDate.days*secondsInDay + diffDate.seconds timeAll[i] = datetime.datetime.timestamp(currDate) # time[i] = time[i-1] + diffTimeInSeconds evapAll[i] = matrixData[i][6] # Check the compatibility of the time steps with the hydro simulation if(self.deltaT!=diffTimeInSeconds): print("ERROR: The last timestep in rain data does not coincide with the one expected!") print("Delta t read = ", diffTimeInSeconds) print("Delta t expected = ", self.deltaT) sys.exit() # Extract just the part of the rain contained in the simulation range condTime = (timeAll>=timeI) & (timeAll<=timeF) time = np.extract(condTime, timeAll) evap = np.extract(condTime, evapAll) # Test of the code good procedure -> to remove if(timeF!=time[-1]): print("ERROR: The last date in rain data does not coincide with the one expected!") print("Date read = ", time[-1]) print("Date expected = ", self.dateEnd) sys.exit() # Save the time if it does not exist yet if(self.time is None): self.time=time print("Time didn't exist before, therefore it is saved now according to rain data time serie!") elif not(np.array_equal(time,self.time)): print('Time arrays are not the same! Please check your answers.') self.myEvap = evap # Unit conversion to [m^3/s] if(self.surfaceDrained==0): print("WARNING : surfaceDrained=0! It should not be the case to apply correctly this step. Please check your precedure and define its value!") evap = evap*10**(-3)*self.surfaceDrained*10**(6)/3600.0 # Sum of the evap of all the inlets to get the total evap for i in range(len(self.intletsObj)): evap += self.intletsObj[i].evap self.evap = evap return time
[docs] def add_temp(self, workingDir, fileName=None, tzDelta=datetime.timedelta(hours=0)): """ This procedure - reads: the time, mean temperature in a day in the Temp file - saves: the temperatures of the subbasin - returns: the time array read. - Variables modified: self.myTemp """ # Reading and saving the temperature's basin if(fileName is None): fileName = 'Subbasin_'+str(self.iDSorted)+'/simul_lumped_Temp.txt' with open(workingDir+fileName, newline = '') as fileID2: data_reader = csv.reader(fileID2, delimiter='\t') list_data = [] i=0 for raw in data_reader: if i>1: list_data.append(raw) i += 1 matrixData = np.array(list_data).astype("float") temp = np.zeros(len(matrixData)) time = np.zeros(len(matrixData)) secondsInDay = 24*60*60 prevDate = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), minute=int(matrixData[0][4]), second=int(matrixData[0][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta if(self.dateBegin!=prevDate): print("ERROR: The first date in temperature data does not coincide with the one expected!") print("Date read = ", prevDate) print("Date expected = ", self.dateBegin) sys.exit() time[0] = datetime.datetime.timestamp(prevDate) for i in range(1,len(matrixData)): currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=int(matrixData[i][3]), minute=int(matrixData[i][4]), second=int(matrixData[i][5]), microsecond=0, tzinfo=datetime.timezone.utc) currDate -= tzDelta prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=int(matrixData[i-1][3]), minute=int(matrixData[i-1][4]), second=int(matrixData[i-1][5]), microsecond=0, tzinfo=datetime.timezone.utc) prevDate -= tzDelta diffDate = currDate - prevDate diffTimeInSeconds = diffDate.days*secondsInDay + diffDate.seconds time[i] = datetime.datetime.timestamp(currDate) # time[i] = time[i-1] + diffTimeInSeconds temp[i] = matrixData[i][6] if(self.dateEnd!=currDate): print("ERROR: The last date in temperature data does not coincide with the one expected!") print("Date read = ", currDate) print("Date expected = ", self.dateEnd) sys.exit() if(self.deltaT!=diffTimeInSeconds): print("ERROR: The last timestep in temperature data does not coincide with the one expected!") print("Delta t read = ", diffTimeInSeconds) print("Delta t expected = ", self.deltaT) sys.exit() if(self.time is None): self.time = time elif not(np.array_equal(time,self.time)): print('Time arrays are not the same! Please check your answers.') sys.exit() self.myTemp = temp return time
[docs] def read_dbfFile(self, fileName): dbfDict = DBF(fileName, load=True) return dbfDict
[docs] def add_hyeto(self, workingDir, hyetoDict): """Add hyetographs to the subbasin TO DO: Adapt the code to find automatically the .dbf files. E.G. when data are read in NetCDF files. """ fileName = Path(workingDir) / ('Subbasin_' +str(self.iDSorted)) / 'simul_rain_geom.vec.dbf' fileName2 = Path(workingDir) / ('Subbasin_' +str(self.iDSorted)) /'/simul_geom.vec.dbf' if(fileName.exists()): dbDict = self.read_dbfFile(str(fileName)) for i in range(len(dbDict.records)): idHyeto = int(dbDict.records[i]['data']) # idMyHyeto = hyetoDict['Ordered To Nb'][idHyeto] self.myHyetoDict[idHyeto] = hyetoDict['Hyetos'][idHyeto] elif(fileName2.exists()): dbDict = self.read_dbfFile(str(fileName2)) for i in range(len(dbDict.records)): idHyeto = int(dbDict.records[i]['data']) # idMyHyeto = hyetoDict['Ordered To Nb'][idHyeto] self.myHyetoDict[idHyeto] = hyetoDict['Hyetos'][idHyeto] else: print("WARNING: No dbf file")
[docs] def plot(self, workingDir, plotRaw=False, axis="Hours", yAdd=[], yAddName=[], rangeData=[], deltaMajorTicks=12.0*3600.0, deltaMinorTicks=3600.0, tzPlot=0): """ This procedure plots: - the inlets: in color chosen randomly by matplotlib - the outlet: in black solid line - the raw outlet: in black dashed line """ if(self.model==cst.tom_UH or self.model==cst.tom_2layers_linIF or self.model==cst.tom_2layers_UH or self.model==cst.tom_GR4): # x = self.time/3600.0 if(axis=="Hours"): x = (self.time[:]-self.time[0])/3600.0 else: tzDelta = datetime.timedelta(seconds=tzPlot*3600.0) timeDelayDelta = datetime.timedelta(seconds=self.timeDelay) beginDate = datetime.datetime.fromtimestamp(self.time[0], tz=datetime.timezone.utc)+tzDelta-timeDelayDelta endDate = datetime.datetime.fromtimestamp(self.time[-1], tz=datetime.timezone.utc)+tzDelta-timeDelayDelta dt = self.time[1]-self.time[0] time_delta = datetime.timedelta(seconds=dt) if(rangeData==[]): rangeData = [beginDate,endDate] x_date = drange(beginDate, endDate+time_delta, time_delta) font11 = FontProperties() font11.set_family('serif') font11.set_name('Euclid') font11.set_size(11) font14 = FontProperties() font14.set_family('serif') font14.set_name('Euclid') font14.set_size(14) # fig = plt.figure(figsize=(11.7,8.3)) fig = plt.figure(figsize=(10.4,6.25)) ax1 = plt.subplot(1,1,1) ax1.grid() if(axis=="Hours"): ax1.set_xlabel('Temps [h]', fontproperties=font11) else: ax1.set_xlabel('Dates (GMT+'+str(tzPlot)+')', fontproperties=font11) ax1.set_ylabel('Débits [m³/s]', fontproperties=font11) # fig.legend(loc="best") fig.legend() for iInlet in self.intletsObj: y = self.intletsObj[iInlet].get_outFlow_global(typeOutFlow="Net", whichOutFlow=iInlet) name = self.intletsObj[iInlet].name if(axis=="Hours"): ax1.plot(x, y, label = name) else: if(len(x_date)-1==len(y)): # print("ERROR: dimension of dates 1 elements greater than data! This could be a problem induced by drange() ... To investigate...") # toContinue = input("Do you still want to continue nonetheless? Y-[Yes] N-[No]: ") logging.error("ERROR: dimension of dates 1 elements greater than data! This could be a problem induced by drange() ... To investigate...") x_date = x_date[:-1] ax1.plot_date(x_date, y, '-', label = name) if yAdd!=[]: # ######## nbyAdd = np.shape(yAdd)[0] for i in range(nbyAdd): y = yAdd[i][:] name = yAddName[i] if(axis=="Hours"): ax1.plot(x, y, label = name) else: ax1.plot_date(x_date, y, '-', label = name) if(self.model==cst.tom_UH or self.model==cst.tom_GR4): # y = self.myHydro tmpHydro = [] tmpHydro.append(np.zeros(len(self.myHydro))) index = math.floor(self.timeDelay/self.deltaT) if(index==0): if(self.model==cst.tom_GR4): tmpHydro = self.myHydro[:,0]*self.surfaceDrained/3.6 else: tmpHydro[0] = self.myHydro*self.surfaceDrained/3.6 elif(index<len(self.myHydro)): if(self.model==cst.tom_GR4): tmpHydro = self.myHydro[:,0]*self.surfaceDrained/3.6 else: tmpHydro[0][index:] = self.myHydro[:-index]*self.surfaceDrained/3.6 # tmpHydro[index:] = self.myHydro[:-index] elif(self.model==cst.tom_2layers_linIF or self.model==cst.tom_2layers_UH): tmpHydro = [] for i in range(2): tmpHydro.append(np.zeros(len(self.myHydro))) index = math.floor(self.timeDelay/self.deltaT) for i in range(2): if(index==0): tmpHydro[i] = self.myHydro[:,i]*self.surfaceDrained/3.6 elif(index<len(self.myHydro)): tmpHydro[i][index:] = self.myHydro[:-index,i]*self.surfaceDrained/3.6 myLabels = [] if(self.model==cst.tom_UH): myLabels.append(self.name+' raw') elif(self.model==cst.tom_2layers_linIF or self.model==cst.tom_2layers_UH): myLabels.append(self.name+' overland flow') myLabels.append(self.name+' interflows') for i in range(len(tmpHydro)): y = tmpHydro[i] if(axis=="Hours"): ax1.plot(x, y, label = self.name) else: ax1.plot_date(x_date, y, '-', label = self.name) if(axis=="Hours"): if(self.model==cst.tom_UH): y = self.myHydro*self.surfaceDrained/3.6 if(axis=="Hours"): ax1.plot(x, y,'--',label = self.name+' local') else: ax1.plot_date(x_date, y,'--',label = self.name+' local') elif(self.model==cst.tom_2layers_linIF or self.model==cst.tom_2layers_UH): for i in range(2): y = self.myHydro[:,i]*self.surfaceDrained/3.6 if(axis=="Hours"): ax1.plot(x, y,'--',label = self.name+ myLabels[i]+' raw') else: ax1.plot_date(x_date, y,'--',label = self.name+ myLabels[i]+' raw') y = self.get_outFlow_global(typeOutFlow="Net") # plt.plot(x, y, label = 'Outlet '+self.name, color='k') if(axis=="Hours"): ax1.plot(x, y, label = 'Outlet', color='k') else: ax1.plot_date(x_date, y, '-', label = 'Outlet', color='k') if(plotRaw): y = self.get_outFlow_global(typeOutFlow="Raw") if(axis=="Hours"): ax1.plot(x, y, '--', label = 'Outlet '+self.name+' Raw', color='k') else: ax1.plot_date(x_date, y, '--', label = 'Outlet '+self.name+' Raw', color='k') ax1.set_title(self.name + " Hydrogrammes écrêtés", fontproperties=font14) else: ax1.set_title(self.name + " Hydrogrammes", fontproperties=font14) if(axis=="Hours"): ax1.set_xlim(x[0], x[-1]) else: ax1.set_xlim(rangeData[0], rangeData[1]-time_delta) for label in ax1.get_xticklabels(): label.set_rotation(30) label.set_horizontalalignment('right') ax1.tick_params(axis='y',labelcolor='k') if(deltaMajorTicks>0): if(axis=="Datetime"): majorTicks = HourLocator(interval=math.floor(deltaMajorTicks/3600),tz=datetime.timezone.utc) ax1.xaxis.set_major_locator(majorTicks) ax1.grid(which='major', alpha=1.0) if(deltaMinorTicks>0): if(axis=="Datetime"): ax1.minorticks_on() minorTicks = MicrosecondLocator(interval=deltaMinorTicks*1E6,tz=datetime.timezone.utc) ax1.xaxis.set_minor_locator(minorTicks) ax1.grid(which='minor', alpha=0.2) else: ax1.grid() # fig.legend(prop=font11,loc="best") fig.legend(prop=font11) if(plotRaw): plt.savefig(os.path.join(workingDir,'PostProcess/QT_HydroEcrete_'+self.name+'.pdf')) else: plt.savefig(workingDir+'PostProcess/QT_Hydro_'+self.name+'.pdf') # if(plotRaw): # plt.savefig(workingDir+'QT_HydroEcrete_'+self.name+'.pdf') # else: # plt.savefig(workingDir+'QT_Hydro_'+self.name+'.pdf') elif(self.model==cst.tom_VHM): print("ERROR: the plot for VHM is not implemented yet!") sys.exit() else: print("ERROR: the plot for this option is not implemented yet!") sys.exit()
[docs] def plot_myBasin(self, Measures=None, rangeData=[], yrangeRain=[], yrangeData=[], factor=1.5, graph_title='', withEvap=False, writeFile='', withCt=False, figure=None): "This procedure plots its own hydrographs and hyetographs" # Determine the number of elements according to the model chosen if(self.model==cst.tom_UH): nbElements = 1 lastElement = 0 elif(self.model==cst.tom_VHM): nbElements = 4 lastElement = 0 elif(self.model==cst.tom_GR4): nbElements = 1 lastElement = 0 elif(self.model==cst.tom_2layers_linIF or self.model==cst.tom_2layers_UH): nbElements = 3 lastElement = 0 # Construction of the list of element to plot on the main hydrograph myOutFlow = self.outFlow_global tmpSum = np.zeros(len(myOutFlow)-lastElement) y = np.zeros((len(myOutFlow)-lastElement,nbElements)) if(self.model==cst.tom_UH): y[:,0] = self.myHydro[:]*self.surfaceDrained/3.6 elif(self.model==cst.tom_VHM or self.model==cst.tom_2layers_linIF or self.model==cst.tom_2layers_UH): for i in range(nbElements-1+lastElement): y[:,i] = self.myHydro[:,i]*self.surfaceDrained/3.6 tmpSum += self.myHydro[:,i]*self.surfaceDrained/3.6 y[:,-1] = tmpSum elif(self.model==cst.tom_GR4): y[:,0] = self.myHydro[:,0]*self.surfaceDrained/3.6 else: print("ERROR: this model was not implemented yet!") sys.exit() # Add the measures if available if(Measures is not None): myMeasure = Measures.myHydro # label on x-axis x_title = "dates" # label on y-axis y_titles = [] if(self.model==cst.tom_VHM): y_titles.append("Overland flow") y_titles.append("Interflow") y_titles.append("Baseflow") y_titles.append("Total") elif(self.model==cst.tom_UH): y_titles.append('') elif(self.model==cst.tom_GR4): y_titles.append("GR4 flow") elif(self.model==cst.tom_2layers_linIF or self.model==cst.tom_2layers_UH): y_titles.append("Overland flow") y_titles.append("Interflow") y_titles.append("Total") if(Measures is not None): y_titles.append("Measures") # Colors of the plot myColors = [] for i in range(nbElements): myColors.append('') if(Measures is not None): myColors.append('k') # Type of trait in the plot myTraits = [] for i in range(nbElements): myTraits.append('-') if(Measures is not None): myTraits.append('--') # The additional plots to add # Evapotranspiration z = [] y_labelAddPlot = [] haveUpperPlot = False nbAddPlot = 0 if(withEvap): z.append(self.myEvap) y_labelAddPlot.append('Evapotranpiration [mm/h]') haveUpperPlot = True nbAddPlot += 1 if(withCt): myCt = self.compute_runnoff_Ct_coeffs() z.append(myCt) y_labelAddPlot.append('$C_t$ [-]') haveUpperPlot = True nbAddPlot += 1 # Graph title: if(graph_title==''): if(self.name is not None): graph_title = "Hydrogramme de " + self.name # Range to consider if(rangeData==[]): rangeData = [self.dateBegin, self.dateEnd] if(factor!=1.5 and yrangeRain!=[]): print("WARNING: factor and range cannot be specified at the same time. Only factor will be taken into account.") yrangeRain=[] if(factor!=1.5 and yrangeData!=[]): print("WARNING: factor and range cannot be specified at the same time. Only factor will be taken into account.") yrangeData=[] if(writeFile!=""): writeFile = writeFile + "Subbasin_hydro" # Check if the intervals is too wild to impose the deltaMajorTicks timeInterval = self.dateEnd-self.dateBegin if timeInterval.days < 100: deltaMajorTicks = 86400 deltaMinorTicks = 3600 else: deltaMajorTicks = -1 deltaMinorTicks = -1 # Launch the procedure if(Measures is not None): ph.plot_hydro(nbElements,y,self.myRain,x_title=x_title,y_titles='', beginDate=self.dateBegin,endDate=self.dateEnd, dt=self.deltaT,graph_title=graph_title,y_labels=y_titles,rangeData=rangeData,y_rain_range=yrangeRain,y_data_range=yrangeData,factor_RH=factor,myColors=myColors,typeOfTraits=myTraits, measures=myMeasure,beginDateMeasure=Measures.dateBegin, endDateMeasure=Measures.dateEnd, dtMeasure=Measures.deltaT, upperPlot=haveUpperPlot,nbAddPlot=nbAddPlot,z=z,y_labelAddPlot=y_labelAddPlot,writeFile=writeFile,deltaMajorTicks=deltaMajorTicks,deltaMinorTicks=deltaMinorTicks, figure=figure) else: ph.plot_hydro(nbElements,y,self.myRain,x_title=x_title,y_titles='', beginDate=self.dateBegin,endDate=self.dateEnd, dt=self.deltaT,graph_title=graph_title,y_labels=y_titles,rangeData=rangeData,y_rain_range=yrangeRain,y_data_range=yrangeData,myColors=myColors,typeOfTraits=myTraits, upperPlot=haveUpperPlot,nbAddPlot=nbAddPlot,z=z,y_labelAddPlot=y_labelAddPlot,writeFile=writeFile,deltaMajorTicks=deltaMajorTicks,deltaMinorTicks=deltaMinorTicks, figure=figure)
# x = self.time/3600.0 # [h] # y1 = self.myHydro # y2 = self.rain # # Figure Rain on a first y axis # fig,ax1=plt.subplots() # ax1.set_xlabel('Temps [h]') # ax1.set_ylabel('Débits [m³/s]',color='k') #Color express in %RGB: (1,1,1) # ax1.set_ylim(0, self.myHydro.max()*2) # # ax1.hist(data1,color=(0,0,1),edgecolor='black',linewidth=1.2) # ax1.plot(x,y1, color='k') # ax1.tick_params(axis='y',labelcolor='k') # # Figure Hydro on a second y axis # ax2=ax1.twinx() # ax2.set_ylabel('Précipitations [mm/h]',color='b') # ax2.set_ylim(self.rain.max()*3, 0) # ax2.plot(x,y2,color='b') # ax2.fill_between(x, y2, 0, color='b') # ax2.tick_params(axis='y',labelcolor='b') # fig.tight_layout()
[docs] def plot_outlet(self, Measures=None, rangeData=[], yrangeRain=[], yrangeData=[], ylabel=[],addData=[], dt_addData=[], beginDates_addData=[], endDates_addData=[],\ label_addData=[], color_addData=[],factor=1.5, graph_title='', withEvap=False, writeFile='', withDelay=True, deltaMajorTicks=-1,deltaMinorTicks=-1, tzPlot=0, Measure_unit="m3/s", addTable=False, figure=None): "This procedure plots its own hydrographs and hyetographs" # Determine the number of elements according to the model chosen if(self.model==cst.tom_UH): nbElements = 1 ### # nbElements = 2 ### lastElement = 1 elif(self.model==cst.tom_VHM): # nbElements = 4 nbElements = 1 lastElement = 0 elif(self.model==cst.tom_GR4): nbElements = 1 lastElement = 0 elif(self.model==cst.tom_2layers_linIF or self.model==cst.tom_2layers_UH): # nbElements = 3 nbElements = 1 lastElement = 0 # Take into account any additionnal data given and add it to plot nbCol_addData = 0 if(addData!=[]): shape_addData = np.shape(addData) if(len(shape_addData)==1): if(dt_addData==[]): nbCol_addData = 1 nbElements += nbCol_addData elif(type(addData[0])==list or type(addData[0])==np.ndarray): nbCol_addData = len(addData) nbElements = nbElements + nbCol_addData else: nbCol_addData = 1 nbElements += nbCol_addData elif(len(shape_addData)==2): if(type(addData)==list): nbCol_addData = len(addData) nbElements = nbElements + nbCol_addData else: nbCol_addData = np.shape(addData)[1] nbElements = nbElements + nbCol_addData else: print("ERROR : the array additional data (addData) can only be a vector or a matrix!") print("Type of additional data = ", type(addData)) print("Shape = ", shape_addData) print(addData) sys.exit() # nbElements = nbElements + nbCol_addData if(dt_addData!=[]): dt = [] beginDate = [] endDate = [] dt.append(self.deltaT) beginDate.append(self.dateBegin+datetime.timedelta(hours=tzPlot)) endDate.append(self.dateEnd+datetime.timedelta(hours=tzPlot)) for i in range(nbCol_addData): dt.append(dt_addData[i]) beginDate.append(beginDates_addData[i]+datetime.timedelta(hours=tzPlot)) endDate.append(endDates_addData[i]+datetime.timedelta(hours=tzPlot)) else: dt = [self.deltaT] beginDate = [self.dateBegin+datetime.timedelta(hours=tzPlot)] endDate = [self.dateEnd+datetime.timedelta(hours=tzPlot)] else: dt = [self.deltaT] beginDate = [self.dateBegin+datetime.timedelta(hours=tzPlot)] endDate = [self.dateEnd+datetime.timedelta(hours=tzPlot)] # Conversion rain from [m³/s] to [mm/h] rain = self.rain/self.surfaceDrainedHydro*3.6 # Construction of the list of element to plot on the main hydrograph myOutFlow = self.outFlow_global tmpSum = np.zeros(len(myOutFlow)-lastElement) # y = np.zeros((len(self.outFlow)-lastElement,nbElements)) y = [] if(self.model==cst.tom_UH or self.model==cst.tom_2layers_linIF or self.model==cst.tom_2layers_UH or cst.tom_GR4): if(withDelay): # y[:,0] = self.outFlow[:-1] y.append(myOutFlow[:]) else: tmpSum = self.get_outFlow_noDelay() # y[:,0] = tmpSum[:-1] y.append(tmpSum[:]) # cumul_rain = datt.cumul_data(self.rain,self.deltaT, self.deltaT) ### # y[:,1] = cumul_rain[:-1]/cumul_rain[-1]*np.max(self.outFlow) ### if nbCol_addData==1: # y[:,2] = addData ### # y[:,1] = addData ### # y.append(addData) ### if(type(addData)==list): y.append(addData[0]) else: y.append(addData[:,0]) else: if(type(addData)==list): for col in range(nbCol_addData): # y[:,1+col] = addData[col] ### # y[:,2+col] = addData[:,col] ### y.append(addData[col]) ### elif(type(addData)==np.ndarray): for col in range(nbCol_addData): # y[:,1+col] = addData[:,col] ### # y[:,2+col] = addData[:,col] ### y.append(addData[:,col]) ### elif(self.model==cst.tom_VHM): print("ERROR : VHM not implemented yet! Please check the code") sys.exit() for i in range(nbElements-1+lastElement): y[:,i] = self.myHydro[:,i] tmpSum += self.myHydro[:,i] y[:,-1] = tmpSum elif(self.model==cst.tom_GR4): print("ERROR : GR4 not implemented yet! Please check the code") sys.exit() y[:,0] = self.outFlow[:,0] else: print("ERROR: this model was not implemented yet!") sys.exit() # Add the measures if available if(Measures!=None): if Measure_unit=="mm/h": myMeasure = Measures.myHydro*self.surfaceDrainedHydro/3.6 else: myMeasure = Measures.myHydro # label on x-axis x_title = 'Dates (GMT+'+str(tzPlot)+')' # label on y-axis y_titles = [] if(self.model==cst.tom_VHM): # y_titles.append("Overland flow") # y_titles.append("Interflow") # y_titles.append("Baseflow") # y_titles.append("Total") y_titles.append('Débits simulés') elif(self.model==cst.tom_UH or self.model==cst.tom_2layers_linIF or self.model==cst.tom_2layers_UH): if(ylabel==[]): y_titles.append('Débits simulés') # y_titles.append('Avec reconstruction Qout B. Vesdre') # y_titles.append('Avec Qout décrit par Le Soir au B. Vesdre') # y_titles.append('Débits nuls aux barrages') # avec Qout décrit par Le Soir B. Vesdre # y_titles.append('Débits décrits dans Le Soir') # y_titles.append('Cumulated rain') ### else: y_titles.append(ylabel) # if(label_addData !=[]): # for ii in label_addData : # y_titles.append(ii) # y_titles.append(label_addData) elif(self.model==cst.tom_GR4): if(ylabel==[]): y_titles.append("GR4 flow") else: y_titles.append(ylabel) if(label_addData !=[]): for ii in label_addData : y_titles.append(ii) if(Measures is not None): # y_titles.append("Measures") y_titles.append("Mesures") # y_titles.append("Débits entrant reconstruits") # Colors of the plot myColors = [] for i in range(nbElements): # myColors.append('') if(color_addData!=[]): if(i>=nbElements-nbCol_addData): myColors.append(color_addData[i+nbCol_addData-nbElements]) # myColors.append(color_addData) else: myColors.append('') else: myColors.append('') if(Measures is not None): myColors.append('k') # Type of trait in the plot myTraits = [] for i in range(nbElements): myTraits.append('-') if(Measures is not None): myTraits.append('--') # The additional plots to add # Evapotranspiration z = [] y_labelAddPlot = [] haveUpperPlot = False if(withEvap): z.append(self.myEvap) y_labelAddPlot.append('Evapotranpiration [mm/h]') haveUpperPlot = True # Graph title: if(graph_title==''): if(self.name is not None): graph_title = "Hydrogramme de " + self.name # Range to consider if(rangeData==[]): rangeData = [self.dateBegin, self.dateEnd] if(factor!=1.5 and yrangeRain!=[]): print("WARNING: factor and range cannot be specified at the same time. Only factor will be taken into account.") yrangeRain=[] if(factor!=1.5 and yrangeData!=[]): print("WARNING: factor and range cannot be specified at the same time. Only factor will be taken into account.") yrangeData=[] if addTable: allSurfaces = [self.surfaceDrainedHydro] if Measures != None: surfaceMeasure = Measures.surfaceDrained if surfaceMeasure <=0: surfaceMeasure = self.surfaceDrainedHydro addMeasfInTab=True else: allSurfaces = [] surfaceMeasure=-1.0 addMeasfInTab = False # Launch the procedure if(Measures is not None): ph.plot_hydro(nbElements,y,rain,x_title=x_title,y_titles='',beginDate=beginDate,endDate=endDate, dt=dt,graph_title=graph_title,y_labels=y_titles,rangeData=rangeData,y_rain_range=yrangeRain,y_data_range=yrangeData,factor_RH=factor,myColors=myColors,typeOfTraits=myTraits, measures=myMeasure,beginDateMeasure=Measures.dateBegin+datetime.timedelta(hours=tzPlot), endDateMeasure=Measures.dateEnd+datetime.timedelta(hours=tzPlot), dtMeasure=Measures.deltaT, upperPlot=haveUpperPlot,nbAddPlot=1,z=z,y_labelAddPlot=y_labelAddPlot,writeFile=writeFile,deltaMajorTicks=deltaMajorTicks,deltaMinorTicks=deltaMinorTicks, addTable=addTable,allSurfaces=allSurfaces,surfaceMeasure=surfaceMeasure,addMeasfInTab=addMeasfInTab,figure=figure) else: ph.plot_hydro(nbElements,y,rain,x_title=x_title,beginDate=beginDate,endDate=endDate, dt=dt,graph_title=graph_title,y_labels=y_titles,rangeData=rangeData,y_rain_range=yrangeRain,y_data_range=yrangeData,myColors=myColors,typeOfTraits=myTraits, upperPlot=haveUpperPlot,nbAddPlot=1,z=z,y_labelAddPlot=y_labelAddPlot,writeFile=writeFile,deltaMajorTicks=deltaMajorTicks,deltaMinorTicks=deltaMinorTicks, addTable=addTable,allSurfaces=allSurfaces, figure=figure)
[docs] def create_histo(self, time, hyeto): "Transform the hyeto data and its assiciated time in a histogram" size = len(hyeto) hyeto2 = np.zeros(size*2) time2 = np.zeros(size*2) for i in range(size): time2[i*2+1] = time[i] hyeto2[i*2+1] = hyeto[i] time2[0] = 0 hyeto2[0] = hyeto2[1] for i in range(size-1): time2[i*2+2] = time2[i*2+1] hyeto2[i*2+2] = hyeto2[i*2+3] plt.figure() plt.grid() plt.xlabel('temps [h]') plt.ylabel('intensité $[mm^3/s]$') plt.legend(loc="best") plt.title("Hyétogrammes") plt.plot(time2, hyeto2) plt.plot(time2, hyeto2) plt.plot(time2, hyeto2) plt.plot(time2, hyeto2) plt.xlim(0,time2[len(time2)-1])
[docs] def read_myMainCharacteristics(self, workingDir:str, fileNameList:list=[]): """ This procedure read the main characteristics of the subbasin and hydro subbasin TO COMPLETE ... """ if fileNameList==[]: fileName = "Subbasin_" + str(self.iDSorted) + "/" + "simul_subbasin.avrg_caractSubBasin" fileNameHydro = "Subbasin_" + str(self.iDSorted) + "/" + "simul_subbasin.avrg_caractWholeHydroSubBasin" else: assert len(fileNameList) == 2, ("If the fileNameList provided, it must be dimension 2") fileName = fileNameList[0] fileNameHydro = fileNameList[1] with open(os.path.join(workingDir,fileName), newline = '') as fileID2: data_reader = csv.reader(fileID2, delimiter='\t') list_data = [] i=0 for raw in data_reader: if i>0: list_data.append(raw) i += 1 tmp = '' for i in range(len(list_data)): if(list_data[i][0][:4]=="Area"): tmp = list_data[i][0].split() self.surfaceDrained = float(tmp[1].split("[")[0]) self.mainCharactDict["Area"] = {} self.mainCharactDict["Area"]["value"] = self.surfaceDrained self.mainCharactDict["Area"]["unit"] = "["+tmp[1].split("[")[1] elif(list_data[i][0][:9]=="Perimeter"): tmp = list_data[i][0].split() self.mainCharactDict["Perimeter"] = {} self.mainCharactDict["Perimeter"]["value"] = float(tmp[1].split("[")[0]) self.mainCharactDict["Perimeter"]["unit"] = "["+tmp[1].split("[")[1] elif(list_data[i][0][:13]=="Average slope"): tmp = list_data[i][0].split() self.mainCharactDict["Average slope"] = {} self.mainCharactDict["Average slope"]["value"] = float(tmp[2].split("[")[0]) self.mainCharactDict["Average slope"]["unit"] = "["+tmp[2].split("[")[1] elif(list_data[i][0][:35]=="Compactness coefficient (Gravelius)"): tmp = list_data[i][0].split() self.mainCharactDict["Compactness coefficient (Gravelius)"] = {} self.mainCharactDict["Compactness coefficient (Gravelius)"]["value"] = float(tmp[3].split("[")[0]) self.mainCharactDict["Compactness coefficient (Gravelius)"]["unit"] = "[-]" elif(list_data[i][0][:12]=="Max lag time"): tmp = list_data[i][0].split() self.mainCharactDict["Max lag time"] = {} self.mainCharactDict["Max lag time"]["value"] = float(tmp[3].split("[")[0]) self.mainCharactDict["Max lag time"]["unit"] = "["+tmp[3].split("[")[1] elif(list_data[i][0][:12] == "Min lag time"): tmp = list_data[i][0].split() self.timeDelay = float(tmp[3].split("[")[0]) self.mainCharactDict["Min lag time"] = {} self.mainCharactDict["Min lag time"]["value"] = float(tmp[3].split("[")[0]) self.mainCharactDict["Min lag time"]["unit"] = "["+tmp[3].split("[")[1] elif(list_data[i][0][:12]=="Max altitude"): tmp = list_data[i][0].split() self.mainCharactDict["Max altitude"] = {} self.mainCharactDict["Max altitude"]["value"] = float(tmp[2].split("[")[0]) self.mainCharactDict["Max altitude"]["unit"] = "["+tmp[2].split("[")[1] elif(list_data[i][0][:12]=="Min altitude"): tmp = list_data[i][0].split() self.mainCharactDict["Min altitude"] = {} self.mainCharactDict["Min altitude"]["value"] = float(tmp[2].split("[")[0]) self.mainCharactDict["Min altitude"]["unit"] = "["+tmp[2].split("[")[1] elif(list_data[i][0][:21]=="Fraction of landuse n"): tmp = list_data[i][0].split() self.mainCharactDict["Fraction of landuse n "+tmp[4]] = {} self.mainCharactDict["Fraction of landuse n "+tmp[4]]["value"] = float(tmp[5].split("[")[0]) self.mainCharactDict["Fraction of landuse n "+tmp[4]]["unit"] = "["+tmp[5].split("[")[1] data_reader = None list_data = [] # fileName = "/Subbasin_" + str(self.iDSorted) + "/" + "simul_subbasin.avrg_caractWholeHydroSubBasin" with open(os.path.join(workingDir,fileNameHydro), newline = '') as fileID2: data_reader = csv.reader(fileID2, delimiter='\t') list_data = [] i=0 for raw in data_reader: if i>0: list_data.append(raw) i += 1 tmp = '' for i in range(len(list_data)): if(list_data[i][0][:4]=="Area"): tmp = list_data[i][0].split() self.surfaceDrainedHydro = float(tmp[1].split("[")[0]) self.mainCharactDictWholeHydro["Area"] = {} self.mainCharactDictWholeHydro["Area"]["value"] = self.surfaceDrainedHydro self.mainCharactDictWholeHydro["Area"]["unit"] = "["+tmp[1].split("[")[1] elif(list_data[i][0][:9]=="Perimeter"): tmp = list_data[i][0].split() self.mainCharactDictWholeHydro["Perimeter"] = {} self.mainCharactDictWholeHydro["Perimeter"]["value"] = float(tmp[1].split("[")[0]) self.mainCharactDictWholeHydro["Perimeter"]["unit"] = "["+tmp[1].split("[")[1] elif(list_data[i][0][:13]=="Average slope"): tmp = list_data[i][0].split() self.mainCharactDictWholeHydro["Average slope"] = {} self.mainCharactDictWholeHydro["Average slope"]["value"] = float(tmp[2].split("[")[0]) self.mainCharactDictWholeHydro["Average slope"]["unit"] = "["+tmp[2].split("[")[1] elif(list_data[i][0][:35]=="Compactness coefficient (Gravelius)"): tmp = list_data[i][0].split() self.mainCharactDictWholeHydro["Compactness coefficient (Gravelius)"] = {} self.mainCharactDictWholeHydro["Compactness coefficient (Gravelius)"]["value"] = float(tmp[3].split("[")[0]) self.mainCharactDictWholeHydro["Compactness coefficient (Gravelius)"]["unit"] = "[-]" elif(list_data[i][0][:12]=="Max lag time"): tmp = list_data[i][0].split() self.mainCharactDictWholeHydro["Max lag time"] = {} self.mainCharactDictWholeHydro["Max lag time"]["value"] = float(tmp[3].split("[")[0]) self.mainCharactDictWholeHydro["Max lag time"]["unit"] = "["+tmp[3].split("[")[1] elif(list_data[i][0][:12] == "Min lag time"): tmp = list_data[i][0].split() self.timeDelay = float(tmp[3].split("[")[0]) self.mainCharactDictWholeHydro["Min lag time"] = {} self.mainCharactDictWholeHydro["Min lag time"]["value"] = float(tmp[3].split("[")[0]) self.mainCharactDictWholeHydro["Min lag time"]["unit"] = "["+tmp[3].split("[")[1] elif(list_data[i][0][:12]=="Max altitude"): tmp = list_data[i][0].split() self.mainCharactDictWholeHydro["Max altitude"] = {} self.mainCharactDictWholeHydro["Max altitude"]["value"] = float(tmp[2].split("[")[0]) self.mainCharactDictWholeHydro["Max altitude"]["unit"] = "["+tmp[2].split("[")[1] elif(list_data[i][0][:12]=="Min altitude"): tmp = list_data[i][0].split() self.mainCharactDictWholeHydro["Min altitude"] = {} self.mainCharactDictWholeHydro["Min altitude"]["value"] = float(tmp[2].split("[")[0]) self.mainCharactDictWholeHydro["Min altitude"]["unit"] = "["+tmp[2].split("[")[1] elif(list_data[i][0][:21]=="Fraction of landuse n"): tmp = list_data[i][0].split() self.mainCharactDictWholeHydro["Fraction of landuse n "+tmp[4]] = {} self.mainCharactDictWholeHydro["Fraction of landuse n "+tmp[4]]["value"] = float(tmp[5].split("[")[0]) self.mainCharactDictWholeHydro["Fraction of landuse n "+tmp[4]]["unit"] = "["+tmp[5].split("[")[1]
[docs] def get_flood(self, path='', check_coherence=False): if(path==''): path = self.fileNameRead paramFile = Wolf_Param(to_read=False,toShow=False) paramFile.ReadFile(path +'simul_flood.out.param') nbFloods = int(paramFile.myparams['Floods characteristics']['nb'][key_Param.VALUE]) dt = float(paramFile.myparams['Floods characteristics']['dt'][key_Param.VALUE]) filePre = "simul_flood" floodData = [] dateMask = [] myOutFlow = self.outFlow_global mask = np.full((len(myOutFlow)), False, dtype=bool) j_saved = 0 for i in range(nbFloods): floodData.append([]) fileName = filePre + str(i+1) + ".dat" floodData[i] = rd.read_bin(path,fileName) dateMask.append([]) # dateMask[i].append(datetime.datetime(year=int(floodData[i][0][2]), month=int(floodData[i][0][1]), day=int(floodData[i][0][0]), hour=int(floodData[i][0][3]), minute=int(floodData[i][0][4]), second=int(floodData[i][0][5]), microsecond=0, tzinfo=datetime.timezone.utc)) # dateMask[i].append(datetime.datetime(year=int(floodData[i][-1][2]), month=int(floodData[i][-1][1]), day=int(floodData[i][-1][0]), hour=int(floodData[i][-1][3]), minute=int(floodData[i][-1][4]), second=int(floodData[i][-1][5]), microsecond=0, tzinfo=datetime.timezone.utc)) tStart = datetime.datetime.timestamp(datetime.datetime(year=int(floodData[i][0][2]), month=int(floodData[i][0][1]), day=int(floodData[i][0][0]), hour=int(floodData[i][0][3]), minute=int(floodData[i][0][4]), second=int(floodData[i][0][5]), microsecond=0, tzinfo=datetime.timezone.utc)) nbElements = len(floodData[i]) for j in range(j_saved,len(myOutFlow)): if(self.time[j]>=tStart): mask[j:j+nbElements] = np.full(nbElements, True, dtype=bool) j_saved = j break effFlood = np.ma.array(myOutFlow,mask=mask) return effFlood
[docs] def get_Nash_Flood(self, measures, tMeasures, dateBegin=None, dateEnd=None, path=''): if(dateBegin is None): dateBegin = self.dateBegin if(dateEnd is None): dateEnd = self.dateEnd if(path==''): path = self.fileNameRead effFlood = self.get_flood(path=path) Nash = datt.evaluate_Nash(effFlood.data, self.time, measures, tMeasures, dateBegin, dateEnd, mask=effFlood.mask) return Nash
[docs] def get_outFlow(self, typeOutFlow:str="Net", unit:str='m3/s', whichOutFlow="", lag:float=0.0): if lag < 0.0: logging.error("TimeDelay difference cannot be negative for a SubBasin!!") logging.warning("Therefore, lag will be imposed to zero! This might create some mistakes!") lag = 0.0 myOutFlow = np.zeros(len(self.myHydro),dtype=ct.c_double, order='F') index = math.floor(lag/(self.time[1]-self.time[0])) nameOut = list(self._outFlow.items())[0][0] if whichOutFlow!="" and whichOutFlow!=nameOut: logging.error("ERROR : the key argument of the ouFlow is not the same of the one in this SubBasin!") logging.error("Its original time will be applied ") curOutFlow = self._outFlow[nameOut][typeOutFlow] if(index==0): myOutFlow = curOutFlow.copy() elif(index<len(myOutFlow)): myOutFlow[index:] = curOutFlow[:-index].copy() else: logging.error("Warning: the simulation time is not long enough for this subbasin to be taken into account") logging.error("Error informations : ") logging.error("Function name : get_outFlow_noDelay()") logging.error("index = " + str(index)) logging.error("len(myOutFlow) = " + str(len(myOutFlow))) logging.error("Lag = " + str(lag)) return myOutFlow if unit=='mm/h': myOutFlow *= 3.6/self.surfaceDrainedHydro return myOutFlow
[docs] def get_inlets(self, unit:str='m3/s', lag:float=0.0): if lag < 0.0: logging.error("TimeDelay difference cannot be negative for a SubBasin!!") logging.warning("Therefore, lag will be imposed to zero! This might create some mistakes!") lag = 0.0 myInlet = np.zeros(len(self.myHydro),dtype=ct.c_double, order='F') index = math.floor(lag/(self.time[1]-self.time[0])) if(index==0): myInlet = self.inlets.copy() elif(index<len(myInlet)): myInlet[index:] = self.inlets[:-index].copy() else: logging.error("Warning: the simulation time is not long enough for this subbasin to be taken into account") logging.error("Error informations : ") logging.error("Function name : get_outFlow_noDelay()") logging.error("index = " + str(index)) logging.error("len(myOutFlow) = " + str(len(myInlet))) logging.error("Lag = " + str(lag)) return myInlet if unit=='mm/h': myInlet *= 3.6/self.surfaceDrainedHydro return myInlet
[docs] def convert_data_global_to_local(self, dataGlob): dataLoc = np.zeros(len(dataGlob)) myIndex = math.floor(self.timeDelay/(self.time[1]-self.time[0])) if(myIndex==0): dataLoc = dataGlob[:] elif(myIndex<len(dataLoc)): dataLoc[:-myIndex] = dataGlob[myIndex:] else: print("ERROR: the simulation time is not long enough for this subbasin to be taken into account") print("Error informations : ") print("Function name : convert_data_global_to_local()") print("index = ", myIndex) print("len(dataLoc) = ", len(dataLoc)) print("self.timeDelay = ", self.timeDelay) sys.exit() return dataLoc
[docs] def compute_runnoff_Ct_coeffs(self, method=-1): subBasinName = 'Subbasin_' + str(self.iDSorted) + '/' typeOfFileName = 'simul_soil.param' fileName = os.path.join(self.fileNameRead, subBasinName, typeOfFileName) wolf_If = Wolf_Param(to_read=False,toShow=False) wolf_If.ReadFile(fileName) runnofCt = np.zeros(len(self.myRain[:])) if method == -1 : # detect the method on the params files -> TO DO myMethod = int(wolf_If.get_param("Distributed production model parameters","Type of infiltration")) if(myMethod==cst.tom_infil_Horton): myParams = [] myParams.append(float(wolf_If.get_param("Horton parameters","F0"))) myParams.append(float(wolf_If.get_param("Horton parameters","Fc"))) myParams.append(float(wolf_If.get_param("Horton parameters","k"))) uMax = float(wolf_If.get_param("Distributed production model parameters","Umax")) p = self.myRain[:] nb_timesteps = len(p) timeLag = float(wolf_If.get_param("Distributed production model parameters","Time lag")) try: timeSpan = float(wolf_If.get_param("Distributed production model parameters","Time span soil")) nbIntervals = math.ceil(timeSpan/self.deltaT) except: nbIntervals = len(self.myRain) nbLagSteps = math.ceil(timeLag/self.deltaT*3600.0) nbLagSteps = min(max(0,nbLagSteps),nb_timesteps) u = np.zeros(len(self.myRain)) if(nbIntervals<len(self.myRain)): for iel in range(len(self.myRain)): u[iel] = np.sum(self.myRain[max((iel-nbIntervals)+1,0):iel+1])*self.deltaT/3600.0 else: u = np.cumsum(self.myRain)*self.deltaT/3600.0 infil = datt.Horton_function(u, uMax, myParams) if nbLagSteps != 0: runnofCt[:nbLagSteps] = 1.0-myParams[0] runnofCt[nbLagSteps:] = (1.-infil[:-nbLagSteps]) else: runnofCt[:] = 1.-infil[:] else: print("ERROR: this infil. model is not recognised or not implemented yet! ") sys.exit() return runnofCt
[docs] def plot_diff_cumulRain_with_lagtime(self, interval, lagTime, graph_title="", factor=1.5, writeDir="", lawNetRain=cst.tom_netRain_no, netRainParams={}): """ @var interval interval to consider in the gliding sum [sec] @var lagTime time to skip before applyihng the current rain [sec] """ nbIntervals = math.ceil(interval/self.deltaT) nbLagSteps = math.ceil(lagTime/self.deltaT) if(lawNetRain==cst.tom_netRain_no): rain = self.myRain elif(lawNetRain==cst.tom_netRain_storage): Hs = 0.0 Ts = 0.0 S0 = 0.0 if("Hs" in netRainParams): Hs = netRainParams["Hs"] if("Ts" in netRainParams): Ts = netRainParams["Ts"] if("S0" in netRainParams): S0 = netRainParams["S0"] rain = self.apply_stock_reservoir(Hs=Hs, Ts=Ts, curS0=S0) else: print("ERROR: Not the correct type of rain preprocess model. Please check argument 'lawNetRain' ") tmpCumul = np.zeros(len(rain)) tmpCumul2 = np.zeros(len(rain)) # if(nbIntervals<len(self.myRain)): # for iel in range(len(self.myRain)): # tmpCumul[iel] = np.sum(self.myRain[max((iel-nbIntervals)+1,0):iel+1])*self.deltaT/3600.0 # else: # tmpCumul = np.cumsum(self.myRain)*self.deltaT/3600.0 # Cumulated sum over the given interval -> fast running procedure kernel = np.ones(nbIntervals) tmpCumul = np.convolve(rain,kernel)[:-nbIntervals+1]*self.deltaT/3600.0 if(nbLagSteps==0): tmpCumul2[:] = tmpCumul[:].copy() tmpCumul = np.convolve(self.myRain,kernel)[:-nbIntervals+1]*self.deltaT/3600.0 else: tmpCumul2[nbLagSteps:] = tmpCumul[:-nbLagSteps] # Determine the number of elements to plot nbElements = 3 lastElement = 0 # Construction of the list of element to plot on the main hydrograph myOutFlow = self.outFlow_global tmpSum = np.zeros(len(myOutFlow)-lastElement) y = np.zeros((len(myOutFlow)-lastElement,nbElements)) y[:,0] = tmpCumul[:] y[:,1] = tmpCumul2[:] y[:,2] = tmpCumul[:] - tmpCumul2[:] # label on x-axis x_title = "dates : GMT+" + str(self.tz) # label on y-axis y_title = "Volume [mm]" # label on y-axis y_titles = [] if(lawNetRain==cst.tom_netRain_no): y_titles.append("Cumulated Rain") y_titles.append("Cumulated Rain with lag time") elif(lawNetRain==cst.tom_netRain_storage): y_titles.append("Cumulated Raw Rain") y_titles.append("Cumulated Net Rain") y_titles.append("Delta Cumulated Rain") # Colors of the plot myColors = [] for i in range(nbElements): myColors.append('') # Type of trait in the plot myTraits = [] myTraits.append('--') myTraits.append('--') myTraits.append('-') # The additional plots to add # Evapotranspiration z = [] y_labelAddPlot = [] haveUpperPlot = False nbAddPlot = 0 # Graph title: if(graph_title==''): if(self.name is not None): graph_title = "Cumulated volume in " + self.name + " : step = " + str(interval/3600) + " [h]" # Range to consider rangeData = [self.dateBegin, self.dateEnd] yrangeRain = [] yrangeData = [min(y[:,2]),max(max(y[:,0]),max(y[:,2]))*1.5] if writeDir == "": writeFile = "" else: writeFile = writeDir + "CumulRain_Delta_" + self.name + "_" + str(int(interval/3600)) + "_" + str(int(lagTime/3600))+".png" # if(factor!=1.5 and yrangeRain!=[]): # print("WARNING: factor and range cannot be specified at the same time. Only factor will be taken into account.") # yrangeRain=[] # if(factor!=1.5 and yrangeData!=[]): # print("WARNING: factor and range cannot be specified at the same time. Only factor will be taken into account.") # yrangeData=[] # Launch the procedure ph.plot_hydro(nbElements,y,rain,x_title=x_title,y_titles=y_title, beginDate=self.dateBegin,endDate=self.dateEnd, dt=self.deltaT,graph_title=graph_title,y_labels=y_titles,rangeData=rangeData,y_rain_range=yrangeRain,y_data_range=yrangeData,myColors=myColors,typeOfTraits=myTraits, upperPlot=haveUpperPlot,nbAddPlot=nbAddPlot,z=z,y_labelAddPlot=y_labelAddPlot,deltaMajorTicks=86400,deltaMinorTicks=3600, writeFile=writeFile)
[docs] def find_outFlow_peak(self): myOutFlow = self.outFlow_global maxIndex = np.argmax(myOutFlow) maxValue = myOutFlow[maxIndex] maxTime = self.time[maxIndex] maxDatetime = datetime.datetime.fromtimestamp(maxTime, tz=datetime.timezone.utc) self.peakVal = maxValue self.peakTime = maxDatetime
[docs] def get_outFlow_peak(self, noDelay=False): if(self.peakVal==0.0 or self.peakTime is None): self.find_outFlow_peak() maxValue = self.peakVal maxDatetime = self.peakTime if(noDelay): detlaTimeDelay = datetime.timedelta(seconds=self.timeDelay) maxDatetime -= self.timeDelay return maxValue, maxDatetime
## This function returns the net rain vector after applying a storage reservoir. # @var H_s max height of the reservoir [mm] # @var T_s time to empty completely the reservoir [h]
[docs] def apply_stock_reservoir(self, Hs, Ts, curS0=0.0): """ This function returns the net rain vector after applying a storage reservoir. @var H_s max height of the reservoir [mm] @var T_s time to empty completely the reservoir [h] """ Pnet = np.zeros_like(self.myRain) curS = curS0 Qs = Hs/Ts dt = self.deltaT/3600.0 for curT in range(len(self.myRain)): curS += (self.myRain[curT]-Qs)*dt Pnet[curT] = max(0.0, curS-Hs)/dt curS = max(0.0,min(curS,Hs)) return Pnet
[docs] def unuse(self, mask=[]): self.alreadyUsed = False for element in self.intletsObj: self.intletsObj[element].unuse(mask=mask)
[docs] def activate(self, effSubs:list=[], effSubsSort:list=[], mask:list=[], onlyItself:bool=False): self.isActivated = True if self.alreadyUsed == False: self.alreadyUsed = True effSubs.append(int(self.iD.replace("ss",""))) effSubsSort.append(self.iDSorted) if onlyItself == False: for element in self.intletsObj: effSubs, effSubsSort = self.intletsObj[element].activate(mask=mask, effSubs=effSubs, effSubsSort=effSubsSort, onlyItself=onlyItself) # effSubs.extend(tmpSub) # effSubsSort.extend(tmpSubSort) return effSubs, effSubsSort
[docs] def reset_timeDelay(self, keepDelta=False, keepDeltaAll=False, upStreamTime=-1.0): # This step reset the timeDelay but still keep the time delta between modules if keepDelta: curTime = self.timeDelay self.timeDelay = self.timeDelay-upStreamTime else: curTime = self.timeDelay if(upStreamTime>=0): self.timeDelay = 0.0 for element in self.intletsObj: self.intletsObj[element].reset_timeDelay(keepDelta=keepDeltaAll, keepDeltaAll=keepDeltaAll, upStreamTime=curTime)
[docs] def add_timeDelay(self, deltaT=0.0, reset=False, resetAll=False): if reset: self.timeDelay = deltaT else: self.timeDelay += deltaT for element in self.intletsObj: self.intletsObj[element].add_timeDelay(self.timeDelay, reset=resetAll, resetAll=resetAll)
[docs] def get_inletsName(self): allInlets = [] for element in self.intletsObj: allInlets.append(str(self.intletsObj[element].name)) return allInlets
[docs] def get_timeDelays(self, timeDelays={}): timeDelays[self.name] = self.timeDelay for element in self.intletsObj: timeDelays = self.intletsObj[element].get_timeDelays(timeDelays) return timeDelays
[docs] def get_timeDelays_inlets(self): return {el.name: el.timeDelay-self.timeDelay for el in self.intletsObj.values()}
[docs] def get_surface_proportions(self, show=True): print("To DO!!!")
## This procedure initialises the time delays (time to transfer hydrographs) with respect to the outlet
[docs] def init_timeDelay(self): workinDir = os.path.join(self.fileNameRead, "Subbasin_"+str(self.iDSorted)) fileName = "transfer.param" fileTitle = os.path.join(workinDir,fileName) if os.path.exists(fileTitle): self.transferParam = Wolf_Param(filename=fileTitle,toShow=False) transferModel = int(self.transferParam.get_param("General properties","Type of model")) if transferModel == cst.tom_transf_no: print("The time estimation will be taken into account for timeDelay!") elif transferModel == cst.tom_transf_cst: # Update timeDelay from transfer file self.timeDelay = self.transferParam.get_param("Constant", "Time Delay") else: print("ERROR in 'init_timeDelay' -> SubBasin : this model is not implemented yet!") print("Model required = ", transferModel) else: print("WARNING : transfer file not present! The time estimation will be taken into account for timeDelay!")
## This function returns the value read at the outlet on a Wolf map
[docs] def get_value_outlet(self, wolfarray:WolfArray): # If no map, not possible to go further with this method if type(wolfarray) != WolfArray: print("ERROR : wolfarray not a WolfArray type!") print("Type given : ", type(wolfarray)) return None value = wolfarray.get_value(self.x, self.y) if value == -99999: print("ERROR : timeDelay not found for this element : ") print("Name = ", self.name) print("File = ", wolfarray.filename) print("===================") return None return value
[docs] def get_iDSorted(self): return self.iDSorted
# ## Set all desired timeDelays in the network # def set_timeDelay(self, method:str="wolf_array", wolfarray:WolfArray=None, tRef_old:float=0.0, tRef_new:float=0.0, updateDownstream:bool=True): # if method.lower() == "wolf_array": # timeDelay = self.get_value_outlet(wolfarray) # if timeDelay is None: # return None # self.timeDelay = timeDelay # else: # print("ERROR: This method to set timeDelay is not recognised!") # return # if updateDownstream: # for element in self.intletsObj: # curObj = self.intletsObj[element] # curObj. # # curObj = self.add_timeDelay ## This procedure save in a "transfer.param" file the timeDelay of the SubBasin
[docs] def save_timeDelay(self, changeTofMod=True): print("Saving timeDelay from SubBasin", self.name) workinDir = os.path.join(self.fileNameRead, "Subbasin_"+str(self.iDSorted)) fileName = "transfer.param" fileTitle = os.path.join(workinDir,fileName) if self.transferParam is None: self.transferParam = Wolf_Param(filename=fileTitle,toShow=False) self.transferParam.change_param("Constant","Time Delay", self.timeDelay) if changeTofMod: self.transferParam.change_param("General properties","Type of model", cst.tom_transf_cst) self.transferParam.SavetoFile(None) self.transferParam.Reload(None)
## This procedure save in a "transfer.param" file the timeDelay of the SubBasin and all its upstream elements
[docs] def save_timeDelays(self): self.save_timeDelay() for element in self.intletsObj: self.intletsObj[element].save_timeDelays()
[docs] def get_myHydro(self, unit:str="mm/h") -> np.ndarray: if unit=="m3/s" or unit=="m^3/s": if self.model == cst.measures: # FIXME we consider so far that myHydro of a measures are in m^3/h myHydro = self.myHydro elif self.surfaceDrained<=0.0: logging.error("The surface drained is negative or equal to zero! myHydro will be given in mm/h!") if len(np.shape(self.myHydro)) == 1: myHydro = self.myHydro.copy() else: myHydro = np.sum(self.myHydro,1) else: if len(np.shape(self.myHydro)) == 1: myHydro = self.myHydro*self.surfaceDrained/3.6 else: myHydro = np.sum(self.myHydro,1)*self.surfaceDrained/3.6 else: if len(np.shape(self.myHydro)) == 1: myHydro = self.myHydro.copy() else: myHydro = np.sum(self.myHydro,1) return myHydro
## This function is getting the name/meaning behind landuse indices # It will then fill the given or not dict of landuse with a name property
[docs] def get_landuse_index_transform_default(self, landuseDict:dict={}) -> dict: if landuseDict==None: landuseDict = {} for key in cst.DEFAULT_LANDUSE: if not key in landuseDict: landuseDict[key] = {} landuseDict[key]["name"] = cst.DEFAULT_LANDUSE[key] return landuseDict
## This function is getting the name/meaning behind landuse indices # It will then fill the given or not dict of landuse with a name property
[docs] def get_landuse_index_transform(self, directory:str, landuseDict:dict={}) -> dict: if directory=="": landuseDict = self.get_landuse_index_transform_default(landuseDict=landuseDict) return else: fileName = "Landuse_index_transform.txt" fileName = os.path.join(directory,fileName) if not(os.path.exists(fileName)): logging.error("This path does not exist : ") logging.error(fileName) logging.error("get_landuse_index_transform() aborded !") return None toStart = False with open(fileName, newline = '') as fileID: data_reader = csv.reader(fileID, delimiter='=') list_data = [] for curData in data_reader: if curData == []: pass elif toStart and len(curData)==2: list_data.append(curData) elif len(curData[0])>13: if curData[0][:13] == "Signification": toStart = True for line in list_data: if len(line)>1: key = int(line[0]) if not key in landuseDict: landuseDict[key] = {} landuseDict[key]["name"] = line[1].strip() return landuseDict
## This function is reading the string of the landuse in the avrg_charact file
[docs] def read_landuses(self, fileName:str="", onlySub:bool=True, landuseName:str="", landuse_index_transform:str="", toSave:bool=True) -> dict: # Choose if it's only the SubBasin of the hydro SubBasin if onlySub: avrgDict = self.mainCharactDict if avrgDict == {}: self.read_myMainCharacteristics(self.fileNameRead) avrgDict else: avrgDict = self.mainCharactDictWholeHydro if avrgDict == {}: self.read_myMainCharacteristics(self.fileNameRead) # Get the names of all landuses landuseDict = {} landuseDict = self.get_landuse_index_transform(landuse_index_transform, landuseDict=landuseDict) if landuseDict == None: landuseDict = self.get_landuse_index_transform_default(landuseDict=landuseDict) # Get the surfaces and units of all landuses for key in landuseDict: landuseDict[key]["surface"] = avrgDict["Fraction of landuse n "+str(key)]["value"] landuseDict[key]["unit"] = avrgDict["Fraction of landuse n "+str(key)]["unit"] if toSave: if onlySub: self.landuseDict = landuseDict else: self.landuseHydroDict = landuseDict return landuseDict
[docs] def get_landuses(self, onlySub:bool=True) -> dict: if onlySub: return self.landuseDict else: return self.landuseHydroDict
[docs] def plot_landuses(self, onlySub:bool=True, figure=None, toShow=False, writeFile=""): landuseDict = self.get_landuses(onlySub=onlySub) if landuseDict == {}: logging.error("No landuse dict found !") logging.error("Please use first the function read_landuse()! ") logging.error("Plot aborted !") return # Creation of data and legend data = [] names = [] for key in landuseDict: data.append(landuseDict[key]["surface"]) names.append(landuseDict[key]["name"]) # Creation of colors colorDict = {} colorDict["forêt"] = 'tab:green' colorDict["prairie"] = 'tab:olive' colorDict["culture"] = 'tab:brown' colorDict["pavés/urbain"] = 'tab:gray' colorDict["rivière"] = 'tab:cyan' colorDict["plan d'eau"] = 'tab:blue' colors = [] for i in range(len(names)): key = names[i] if key in colorDict: colors.append(colorDict[key]) # Creation of title title = "Landuses in " if onlySub: typeOfSub = "subbasin " else: typeOfSub = "hydrological subbasin " title += typeOfSub + "in " + self.name if writeFile == "": writeFile = os.path.join(self.fileNameWrite, "PostProcess", title.replace(" ", "_")) else: writeFile = writeFile ph.plot_piechart(data,legend=names, title=title, colors=colors, autopct="", figure=figure, writeFile=writeFile, toShow=toShow) if toShow: plt.show()
[docs] def get_outFlow_names(self)->list: return list(self._outFlow.keys())
[docs] def change_version(self, newVersion=None): if newVersion == None: self._version = float(cst.VERSION_WOLFHYDRO) elif type(newVersion) == str: self._version = float(newVersion) else: self._version = newVersion return
[docs] def get_version(self): return self._version
## This procedure is updating all the hydrographs of all upstream elements imposing limits # @var level_min integer that specify the potential level at which the update should be stopped.
[docs] def update_upstream_hydro(self, level_min:int=1, update_upstream:bool=True): for key in self.intletsObj: curObj = self.intletsObj[key] if curObj.myLevel>=level_min: curObj.update_hydro(update_upstream=True, level_min=level_min)
## This procedure is updating all the hydrographs and possibly all upstream elements imposing limits # @var update_upstream boolean that specify whether the upstream elements should also be updated # @var level_min integer that specify the potential level at which the update should be stopped.
[docs] def update_hydro(self, update_upstream:bool=True, level_min:int=1): if update_upstream: self.update_upstream_hydro(level_min=level_min) self.compute_hydro()
[docs] def get_outFlow_global(self, whichOutFlow="", typeOutFlow="Net"): if typeOutFlow == "Net": return self.outFlow_global elif typeOutFlow =="Raw": return self.outFlowRaw_global else: logging.error("Not a recognised typeOutFlow ! ") return None
@property
[docs] def outFlow(self)->np.ndarray: """The outFlow property.""" return self.get_outFlow(typeOutFlow="Net", unit="m³/s")
@property
[docs] def outFlowRaw(self): """The outFlow property.""" return self.get_outFlow(typeOutFlow="Raw", unit="m³/s")
@property
[docs] def outFlow_global(self): """The outFlow global property. Returns the outFlow in the global time, i.e. the hydrograph to which the timeDelay is applied. """ gOutFlow = np.zeros(len(self.myHydro),dtype=ct.c_double, order='F') index = math.floor(self.timeDelay/self.deltaT) if(index==0): gOutFlow = self.outFlow elif(index<len(self.myHydro)): gOutFlow[index:] = self.outFlow[:-index] else: logging.error("ERROR: the simulation time is not long enough for this subbasin to be taken into account") logging.error("Error informations : ") logging.error("Function name : outFlow_global()") logging.error("Name = "+ self.name) logging.error("ID = "+ str(self.iD)) logging.error("index = " + str(index)) logging.error("len(myOutFlow) = "+ str(len(self.myHydro))) logging.error("self.timeDelay = "+ str(self.timeDelay)) logging.error("All inlets timeDelay :") for element in self.intletsObj: curObj = self.intletsObj[element] logging.error(str(curObj.timeDelay)) logging.error("=================") return gOutFlow
@property
[docs] def outFlowRaw_global(self): """The outFlow global property. Returns the outFlow in the global time, i.e. the hydrograph to which the timeDelay is applied. """ gOutFlow = np.zeros(len(self.myHydro),dtype=ct.c_double, order='F') index = math.floor(self.timeDelay/self.deltaT) if(index==0): gOutFlow = self.outFlow elif(index<len(self.myHydro)): gOutFlow[index:] = self.outFlowRaw[:-index] else: logging.error("ERROR: the simulation time is not long enough for this subbasin to be taken into account") logging.error("Error informations : ") logging.error("Function name : outFlow_global()") logging.error("Name = "+ self.name) logging.error("ID = "+ str(self.iD)) logging.error("index = " + str(index)) logging.error("len(myOutFlow) = "+ str(len(self.myHydro))) logging.error("self.timeDelay = "+ str(self.timeDelay)) logging.error("All inlets timeDelay :") for element in self.intletsObj: curObj = self.intletsObj[element] logging.error(str(curObj.timeDelay)) logging.error("=================") return gOutFlow
@property
[docs] def cumul_rain(self) -> np.array: return np.cumsum(self.myRain)
[docs] def evaluate_Nash(self, measure, intervals:list[tuple[datetime.datetime]]=[]) -> list[float]: ns = [] if intervals == []: ns.append( datt.evaluate_Nash(self.outFlow, self.time, measures=measure.get_myHydro(), tMeasures=measure.time, dateBegin=self.dateBegin, dateEnd=self.dateEnd) ) return tuple(ns) # for el in intervals: # ns.append( datt.evaluate_Nash(self.outFlow, self.time, # measures=measure.get_myHydro(), tMeasures=measure.time, # dateBegin=el[0], dateEnd=el[1]) ) ns = [ datt.evaluate_Nash(self.outFlow, self.time, measures=measure.get_myHydro(), tMeasures=measure.time, dateBegin=el[0], dateEnd=el[1]) for el in intervals ] return tuple(ns)
[docs] def get_peak(self, intervals:list[tuple[datetime.datetime]]=[]) -> list[float]: peak_s = [] for element in intervals: # We conisder the indice to form complete intervals simul_i = math.ceil((element[0]-self.dateBegin).total_seconds()/self.deltaT) simul_f = math.floor((element[1]-self.dateBegin).total_seconds()/self.deltaT) # meas_i = math.floor((element[0]-measure.dateBegin).total_seconds/measure.deltaT) # meas_f = math.floor((element[1]-measure.dateBegin).total_seconds/measure.deltaT) if simul_i<0 or simul_f>len(self.time): continue peak_s.append(self.outFlow[simul_i:simul_f+1].max()) return peak_s
[docs] def collect_x_from_production(self) -> dict[str,np.array]: """ This procedure is collecting all the time series fractions of each outflow of the hydrological production models written in Fortran Returns: dict[str, np.array]: A dictionary containing the fractions of each outflow. """ all_x = {} if self.model == cst.tom_VHM: all_x = self.collect_x_VHM() elif self.model == cst.tom_GR4: all_x = self.collect_x_GR4() elif self.model == cst.tom_2layers_linIF: all_x = self.collect_x_2layers() return all_x
[docs] def collect_fractions(self) -> dict[str,np.array]: """ This procedure is collecting all the fractions of each outflow of the hydrological production models. Returns: dict[str, np.array]: A dictionary containing the fractions of each outflow. """ all_x = self.collect_x_from_production() if self.model == cst.tom_VHM: all_f = self._collect_fractions_VHM(all_x) elif self.model == cst.tom_GR4: all_f = self._collect_fractions_GR4(all_x) elif self.model == cst.tom_2layers_linIF: all_f = self._collect_fractions_2layers(all_x) return all_f
[docs] def collect_all_internal_variables(self) -> dict[str,np.array]: """ This procedure is collecting all internal variables of the hydrological production models. Returns: dict[str, np.array]: A dictionary containing the fractions of each outflow. """ all_iv = {} if self.model == cst.tom_VHM: all_iv = self.collect_iv_VHM() elif self.model == cst.tom_GR4: all_iv = self.collect_iv_GR4() elif self.model == cst.tom_2layers_linIF: all_iv = self.collect_iv_2layers() return all_iv
[docs] def activate_all_internal_variables(self): """ This procedure is activating all internal variables of all the hydrological modules. """ if self.model == cst.tom_VHM: self.activate_all_iv_VHM() elif self.model == cst.tom_GR4: self.activate_all_iv_GR4() elif self.model == cst.tom_2layers_linIF: self.activate_all_iv_2layers()
[docs] def collect_x_VHM(self) -> dict[str,np.array]: """ This procedure is collecting all the fractions of each outflow of the VHM model. Returns: - all_x: A dictionary containing the fractions of each outflow of the VHM model. """ list_keys = ["x", "U"] files_per_keys = [["xbf", "xif", "xof", "xu"],[]] group = "Internal variables to save" param = "simul_soil" all_x = self.collect_internal_variables(list_keys, files_per_keys, group_name=group, param_name=param) return all_x
[docs] def _collect_fractions_VHM(self, all_x:dict[str,np.array]) -> dict[str,np.array]: """ This procedure is collecting all the fractions of each outflow of the VHM model. Returns: - all_f: A dictionary containing the fractions of each outflow of the VHM model. """ all_f = {} if all_x=={}: return all_f condition = self.myRain > 0.0 all_f["% qof"] = np.where(condition, all_x["xof"] * 100.0, np.nan) all_f["% qif"] = np.where(condition, all_x["xif"] * 100.0, np.nan) all_f["% qbf"] = np.where(condition, all_x["xbf"] * 100.0, np.nan) all_f["% loss"] = np.where(condition, all_x["xu"] * 100.0, np.nan) return all_f
[docs] def collect_iv_VHM(self) -> dict[str,np.array]: """ This procedure is collecting all internal variables of the VHM model in each module. Returns: - all_iv: A dictionary containing all internal variables of the VHM model. """ list_keys = ["x", "U"] files_per_keys = [[],["U"]] group = "Internal variables to save" param = "simul_soil" all_iv = self.collect_internal_variables(list_keys, files_per_keys, group_name=group, param_name=param) return all_iv
[docs] def activate_all_iv_VHM(self): """ This procedure is activating all internal variables of the VHM model in each module. """ list_keys = ["x", "U"] group = "Internal variables to save" param = "simul_soil" self.activate_internal_variables(list_keys, group_name=group, param_name=param)
[docs] def collect_x_GR4(self) -> dict[str,np.array]: """ This procedure is collecting the fractions of each outflow of the GR4 model. Returns: dict[str, np.array]: A dictionary containing all fractions of each outflow of the GR4 model. """ all_x = {} return all_x
[docs] def _collect_fractions_GR4(self, all_x:dict[str,np.array]) -> dict[str,np.array]: """ This procedure is collecting all the fractions of each outflow of the GR4 model. Returns: - all_f: A dictionary containing the fractions of each outflow of the GR4 model. """ all_f = {} return all_f
[docs] def collect_iv_GR4(self) -> dict[str,np.array]: """ This procedure is collecting all internal variables of the GR4 model in each module. Returns: - all_iv: A dictionary containing all internal variables of the GR4 model. """ all_iv = {} return all_iv
[docs] def activate_all_iv_GR4(self): """ This procedure is activating all internal variables of the GR4 model in each module. """ return
[docs] def collect_x_2layers(self) -> dict[str,np.array]: """ This procedure is collecting the fractions of each outflow of the 2 layers model. Returns: A dictionary containing the collected fractions of each outflow variables. """ list_keys = ["x", "U", "Reservoir"] files_per_keys = [["xif"], [], ["xp"]] group = "Internal variables to save" param = "simul_soil" all_x = self.collect_internal_variables(list_keys, files_per_keys, group_name=group, param_name=param) return all_x
[docs] def _collect_fractions_2layers(self, all_x:dict[str,np.array]) -> dict[str,np.array]: """ This procedure is collecting all the fractions of each outflow of the 2 layers model. Returns: - all_f: A dictionary containing the fractions of each outflow of the 2 layers model. """ all_f = {} if all_x=={}: return all_f condition = self.myRain > 0.0 f_if = np.where(condition, all_x["xp"] * all_x["xif"], np.nan) all_f["% qof"] = (all_x["xp"] - f_if) * 100.0 all_f["% qif"] = f_if * 100.0 all_f["% loss"] = np.where(condition, (1.0 - all_x["xp"]) * 100.0, np.nan) return all_f
[docs] def collect_iv_2layers(self) -> dict[str,np.array]: """ This procedure is collecting all internal variables of the 2 layers model in each module. Returns: - all_iv: A dictionary containing the fractions all internal variables of the 2 layers model. """ list_keys = ["x", "U", "Reservoir"] files_per_keys = [[], ["U"], ["S"]] group = "Internal variables to save" param = "simul_soil" all_iv = self.collect_internal_variables(list_keys, files_per_keys, group_name=group, param_name=param) return all_iv
[docs] def activate_all_iv_2layers(self): """ This procedure is activating all internal variables of the 2 layers model in each module. """ list_keys = ["x", "U", "Reservoir"] group = "Internal variables to save" param = "simul_soil" self.activate_internal_variables(list_keys, group_name=group, param_name=param)
[docs] def collect_internal_variables(self, list_keys:list[str], files_per_keys:list[list[str]], group_name:str="Internal variables to save", param_name:str="simul_soil" )-> dict[str,np.array]: """ Collects all the internal variables of the 2 layers model. Parameters: - list_keys (list[str]): List of keys representing the internal variables to collect. - files_per_keys (list[list[str]]): List of lists containing the file names associated with each key. - group_name (str, optional): Name of the group containing the internal variables to save. Default is "Internal variables to save". - production_name (str, optional): Name of the production file. Default is "simul_soil". Returns: - dict[str,np.array]: A dictionary containing the collected internal variables, where the keys are the variable names and the values are numpy arrays. """ all_iv = {} production_file = ".".join([param_name,"param"]) cur_dir = os.path.join(self.fileNameRead, "Subbasin_"+str(self.iDSorted)) param_fileName = os.path.join(cur_dir, production_file) wolf_soil = Wolf_Param(to_read=True, filename=param_fileName,toShow=False, init_GUI=False) for index, curKey in enumerate(list_keys): ok_IV = wolf_soil.get_param(group_name, curKey, default_value=0) if ok_IV == 1: for curVar in files_per_keys[index]: ts_file = "".join([param_name, "_", curVar, ".dat"]) isOk, tmp = rd.check_path(os.path.join(cur_dir, ts_file)) if isOk<0: logging.warning("The file : " + ts_file + " does not exist!") continue time, cur_iv = rd.read_hydro_file(cur_dir, ts_file) all_iv[curVar] = cur_iv else: logging.warning("Please activate the interval variable : " + curKey + "to have access the following fraction of outlets : ") return all_iv
[docs] def activate_internal_variables(self, list_keys:list[str], group_name:str="Internal variables to save", param_name:str="simul_soil"): """ Activates all the internal variables of the 2 layers model. Parameters: - list_keys (list[str]): List of keys representing the internal variables to collect. """ production_file = ".".join([param_name,"param"]) cur_dir = os.path.join(self.fileNameRead, "Subbasin_"+str(self.iDSorted)) param_fileName = os.path.join(cur_dir, production_file) wolf_soil = Wolf_Param(to_read=True, filename=param_fileName,toShow=False, init_GUI=False) for curKey in list_keys: wolf_soil.change_param(group_name, curKey, 1) wolf_soil.SavetoFile(None) wolf_soil.Reload(None) return
[docs] def get_summary_fractions(self, summary:str="mean", all_f:dict={}, interval:list[tuple[datetime.datetime, datetime.datetime]]=None) -> dict[str, np.array]: """ This procedure is returning a summary of the fractions of the current module. Parameters: - summary (str): The type of summary to return. - interval (list[datetime.datetime], optional): The interval of time to consider. Default is None. Returns: - dict: A dictionary containing the summary of the fractions of the current module. """ if all_f == {}: all_f = self.collect_fractions() if interval is not None: interv = np.zeros(len(self.time), dtype=bool) for el in interval: date_i = datetime.datetime.timestamp(el[0]) date_f = datetime.datetime.timestamp(el[1]) interv += (self.time>=date_i) & (self.time<=date_f) else: interv = np.ones(len(self.time), dtype=bool) if summary == "mean": return {key: np.nanmean(all_f[key], where=interv) for key in all_f} elif summary == "median": return {key: np.nanmedian(all_f[key][interv]) for key in all_f} elif summary == "std": return {key: np.nanstd(all_f[key][interv]) for key in all_f} elif summary == "min": return {key: np.nanmin(all_f[key], where=interv) for key in all_f} elif summary == "max": return {key: np.nanmax(all_f[key], where=interv, initial=0.0) for key in all_f} else: logging.error("The summary type is not recognised!") return []
[docs] def get_volume_fractions(self, all_f:dict={}, interval:list[tuple[datetime.datetime, datetime.datetime]]=None) -> dict[str, np.array]: """ This procedure is returning a summary of the fractions of the current module. Parameters: - summary (str): The type of summary to return. - interval (list[datetime.datetime], optional): The interval of time to consider. Default is None. Returns: - dict: A dictionary containing the summary of the fractions of the current module. """ if all_f == {}: all_f = self.collect_fractions() all_v = {key: val/100.0*self.myRain for key, val in all_f.items()} if interval is not None: interv = np.zeros(len(self.time), dtype=bool) for el in interval: date_i = datetime.datetime.timestamp(el[0]) date_f = datetime.datetime.timestamp(el[1]) interv += (self.time>=date_i) & (self.time<=date_f) else: interv = np.ones(len(self.time), dtype=bool) tot_rain = np.nansum(self.myRain[interv])*self.deltaT/3600 return {key+" volume": (np.nansum(all_v[key][interv])*self.deltaT/3600)/tot_rain*100 for key in all_v}
[docs] def check_presence_of_iv(self): """ This procedure is checking the presence of internal variables in the current module. """ # TODO return
[docs] def get_all_Qtest(self, nb_atttempts=-1, typeOutFlow:str="Net", unit:str='m3/s', whichOutFlow="", lag:float=0.0) -> np.array: """ This function returns the Qtest hydrograph of the current module. Parameters: - which (str, optional): The type of hydrograph to return. Default is "Net". Returns: - np.array: The Qtest hydrograph of the current module. """ # FIXME Take into account all the possible types of hydrographs and units file_debug_info = "simul_GR4_out_debuginfo.txt" prefix = "simul_GR4_out" working_dir = os.path.join(self.fileNameRead, 'Subbasin_' + str(self.iDSorted) + '/') q_test = [] file_debug_info = "_".join([prefix,"_debuginfo.txt"]) if nb_atttempts < 0: with open(os.path.join(working_dir,file_debug_info), 'r') as file: lines = file.readline() items = lines.split('\t') nb_init = int(items[0]) nb_max = int(items[0]) nb_atttempts = nb_max all_files = [os.path.join(working_dir,"".join([prefix,str(i+1)+".dat"])) for i in range(nb_atttempts)] areOk = [(rd.check_path(file)[0])>=0 for file in all_files] max_index = next((i for i, x in enumerate(areOk) if x == False), len(areOk)) q_test = [rd.read_hydro_file(working_dir, file_name)[1]*self.surfaceDrained/3.6 for file_name in all_files[:max_index]] # for i in range(nb_atttempts): # file_name = "".join([prefix,str(i+1)+".dat"]) # isOk, full_name = rd.check_path(os.path.join(working_dir, file_name)) # if isOk<0: # break # t, cur_q = rd.read_hydro_file(working_dir, file_name) # cur_q = cur_q*self.surfaceDrained/3.6 # q_test.append(cur_q) return q_test
[docs] def plot_all_fractions(self, all_fractions:dict[str:np.array]={},figure=None, to_show:bool=False, writeDir:str="", range_data:list[datetime.datetime]=[]) -> None: if writeDir == "": writeFile = os.path.join(self.fileNameWrite, "PostProcess", "_".join(["Q_fractions", self.name])) else: writeFile = os.path.join(writeDir, "_".join(["Q_fractions", self.name])) if all_fractions == {}: all_fractions = self.collect_fractions() if all_fractions == {}: logging.warning("No fractions found!") return elif self.name in all_fractions: all_fractions = all_fractions[self.name] else: all_fractions = {} logging.warning("The name of the current module is not in the dictionary of fractions!") nb_elements = len(all_fractions) if nb_elements == 0: logging.warning("No fractions found!") return y = [el for el in all_fractions.values()] x = [el for el in all_fractions.keys()] y_label = "Fractions of the outflows [%]" graph_title = "Fractions of the outflows of " + self.name ph.plot_hydro(nb_elements, y,time=self.time, y_titles=y_label, y_labels=x, writeFile=writeFile, figure=figure, graph_title=graph_title, rangeData=range_data) if to_show: plt.show()
[docs] def evaluate_objective_function(self, unit="mm/h")->np.ndarray: """ This procedure is evaluating the objective function of the current module. Returns: - np.ndarray: The objective function of the current module. """ # FIXME unit='mm/h' return self.get_outFlow(unit=unit)
[docs] def import_from_pandas_Series(self, data:pd.Series, which="outFlow"): time = data.index.values.astype(np.int64) // 10 ** 9 if which == "outFlow": self._outFlow["Net"] = data.values.astype(dtype=ct.c_double, order='F') elif which == "outFlowRaw": self._outFlow["Raw"] = data.values.astype(dtype=ct.c_double, order='F') elif which == "myHydro": self.myHydro = data.values.astype(dtype=ct.c_double, order='F') elif which == "myRain": data = data.values.astype(dtype=np.double, order='F') elif which == "cumul_rain": data = data.values.astype(dtype=np.double, order='F') else: logging.error("Not a recognised 'which' argument!") logging.error("Try the following : 'ouflow', 'outFlowRaw', 'myHydro', 'myRain', 'cumul_rain'") return
[docs] def export_to_pandas_Series(self, which="outFlow"): idx = pd.to_datetime(self.time, unit='s', utc=True) if which == "outFlow": data = self.outFlow elif which == "outFlowRaw": data = self.outFlowRaw elif which == "myHydro": data = self.myHydro elif which == "myRain": data = self.myRain elif which == "cumul_rain": data = self.cumul_rain else: logging.error("Not a recognised 'which' argument!") logging.error("Try the following : 'ouflow', 'outFlowRaw', 'myHydro', 'myRain', 'cumul_rain'") return None tserie = pd.Series(data, index=idx, copy=True, name=" ".join([self.name,which])) return tserie
# def plot_Nash_vs_Qexcess(self, figure:plt.axis=None, toShow:bool=False, writeFile:str=""): # FIXME Remove the lines below when it is confirmed # | # | # V # ## Function computing the cumulative volume of a given flow # # @var flow the flow to treat. Units: [m^3/s] # # @var dtData time step of the argument 'flow'. Units: [s] # # @var dtOut time step of the desired cumulative volume. It should be a multiple of 'dtData'. Units: [s] # # \undeline{Caution}: Take care to the units of dtData and dtOut according to the flow units. # # E.g. Hyeto and Evap in [mm/h] => dtData in [h] # # \underline{But}: outflow and myHydro in [m^3/s] => dtData in [sec] # # Returns the cumulative volume. Units: [m^3] # # TO do: ajouter interval de temps # def construct_cumulVolume(self, flow, dtData, dtOut): # # Check validity of the arguments # if(dtOut%dtData!=0): # print("ERROR: the time step of the desired output is not compatible with the data timestep!") # sys.exit() # else: # factor = int(dtOut/dtData) # conversion factor from data timestep and cumul time step # cumul = np.zeros(int(len(flow)/factor)) # cumul[0] = flow[0] # for i in range(1,int(len(flow)/factor)): # cumul[i] = cumul[i-1] + np.sum(flow[i*factor: (i+1)*factor])*dtData # return cumul