Source code for wolfhece.hydrology.data_treatment

"""
Author: HECE - University of Liege, Pierre Archambeau, Christophe Dessers
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import sys                              # module to stop the program when an error is encountered
import numpy as np
import math
import datetime
import os
import csv
import logging

import ctypes as ct

from ..PyTranslate import _

if not '_' in __builtins__:
    import gettext
[docs] _=gettext.gettext
## Function computing the cumulative volume of a given flow # @var flow the flow to treat. Units: [m^3/s] # @var dtData time step of the argument 'flow'. Units: [s] # @var dtOut time step of the desired cumulative volume. It should be a multiple of 'dtData'. Units: [s] # @var tData time array containing the timestamps of the data # @var tRange optional array of datetime objects containing the range [tmin, tmax] in which the cumul sum will be carried out # \undeline{Caution}: Take care to the units of dtData and dtOut according to the flow units. # E.g. Hyeto and Evap in [mm/h] => dtData in [h] # \underline{But}: outflow and myHydro in [m^3/s] => dtData in [sec] # Returns the cumulative volume. Units: [m^3] # TO DO: - ajouter interval de temps # - ajouter un argument 'unité'
[docs] def cumul_data(data, dtData, dtOut, tData=None, beginDate=None, tRange=None, noDataValue=None): # Check validity of the arguments if(dtOut%dtData!=0): print("ERROR: the time step of the desired output is not compatible with the data timestep!") sys.exit() else: factor = dtOut/dtData # conversion factor from data timestep and cumul time step add_1_cell = 0 if(tRange is None): # To detect the difference between a rain and an hydrograph data if(len(data)%int(factor)!=0): add_1_cell = 1 cumul = np.zeros(int(len(data)/factor)+add_1_cell) iInit = 0 iEnd = int(len(data)/factor) elif(tData is not None): # No distinction between rain and hydro needed here as the range is directly specified -> TO DISCUSS!!! tMin = datetime.datetime.timestamp(tRange[0]) tMax = datetime.datetime.timestamp(tRange[1]) if((tMax-tMin)%factor!=0): print("ERROR: the interval given does not coincide with the timestep of the desired output!") sys.exit() # We check whether the range is within data datesOk = False for i in range(len(data)): if(tMin==tData[i]): if((i+(tMax-tMin)/dtData)>len(data)-1): print("ERROR: the upper bound of the desired range in out of data bounds!") sys.exit() else: datesOk = True if(datesOk==False): print("ERROR: lower bound of the range in out of the data bounds!") sys.exit() elif(beginDate is not None): tMin = tRange[0] tMax = tRange[1] iInit = index_from_date(beginDate, tMin, dtData, dtUnit="sec", roundType="ceil") iInit = math.ceil(iInit/factor) iEnd = index_from_date(beginDate, tMax, dtData, dtUnit="sec", roundType="floor") iEnd = math.floor(iEnd/factor) cumul = np.zeros(int((iEnd-iInit))) # Check whether there are holes in data. dataCorr = data.copy() if(noDataValue is not None): if(np.all(dataCorr>noDataValue)): holesInData = False else: holesInData = True isaved = -1 for i in range(int((iInit)*factor), int(iEnd*factor)): if(dataCorr[i]<=noDataValue and isaved==-1): isaved = i-1 elif(isaved==-1): continue elif(dataCorr[i]>noDataValue): slope = (dataCorr[i]-dataCorr[isaved])/(i-isaved) for j in range(isaved,i): dataCorr[j] = dataCorr[isaved] + slope*(j-isaved) isaved = -1 if(isaved!=-1): lastIndex = math.floor(isaved/factor)-iInit else: lastIndex = len(cumul) else: holesInData = False lastIndex = len(cumul) cumul[0] = np.sum(dataCorr[iInit*int(factor):iInit*int(factor)+int(factor)])/factor*dtOut for i in range(iInit+1,iEnd): cumul[i-iInit] = cumul[i-iInit-1] + np.sum(dataCorr[i*int(factor): (i+1)*int(factor)])/factor*dtOut if(tRange is None): i = int(len(dataCorr)/factor) if(len(dataCorr)%int(factor)!=0): cumul[i] = cumul[int(len(dataCorr)/factor)-1] + dataCorr[i*int(factor)]/factor*dtOut if(holesInData): return cumul,lastIndex else: return cumul
## Function computing the cumulative volume of a given flow and for each season # This function does not compute the volumes for incomplete seasons (i.e. at the beginning or at the end of the interval) #
[docs] def cumul_data_per_season(data, dtData, dtOut, tData): # Check the validity of the arguments if(dtOut%dtData!=0): print("ERROR: the time step of the desired output is not compatible with the data timestep!") sys.exit() else: factor = dtOut/dtData # conversion factor from data timestep and cumul time step ## Definition of the ongoing year tmpDate = datetime.datetime.fromtimestamp(tData[0], tz=datetime.timezone.utc) myYear = tmpDate.year ## Definition of : # - tOut array : time array of the output # - cumul list : list of cumul volume computed at each season tOut = [np.zeros(0), np.zeros(0), np.zeros(0), np.zeros(0)] cumul = [np.zeros(0), np.zeros(0), np.zeros(0), np.zeros(0)] ## Definition of the constants # @var mySeason Integer which is # =0, spring # =1, summer # =2, fall # =3, winter mySeason = 0 # The dates for the beginning of each season dSeasons= [] # Spring -> [0] dSeasons.append(datetime.datetime(year=myYear, month=3, day=21, tzinfo=datetime.timezone.utc)) # Summer -> [1] dSeasons.append(datetime.datetime(year=myYear, month=6, day=21, tzinfo=datetime.timezone.utc)) # Fall -> [2] dSeasons.append(datetime.datetime(year=myYear, month=9, day=21, tzinfo=datetime.timezone.utc)) # Winter -> [3] dSeasons.append(datetime.datetime(year=myYear, month=12, day=21, tzinfo=datetime.timezone.utc)) # Timestamps for the beginning of each season tSeasons = np.zeros(len(dSeasons)) for i in range(len(dSeasons)): tSeasons[i] = datetime.datetime.timestamp(dSeasons[i]) ## Initialisation # Evaluation of the current season for i in range(len(tSeasons)): if(tData[0]<=tSeasons[i]): if(i<3): mySeason = i break else: mySeason = 0 myYear += 1 # Update of the seasons dates and times for j in range(dSeasons): dSeasons[j] = dSeasons[j].replace(dSeasons[j].year+1) for j in range(len(tSeasons)): tSeasons[j] = datetime.datetime.timestamp(dSeasons[j]) break # Identification of the first element to treat firstIndex = int((tSeasons[mySeason]-tData[0])/dtData) if((tSeasons[mySeason]-tData[0])%dtData!=0): print("ERROR: this case was not considered in the code yet!") sys.exit() # Identification of the last element to treat lastSeason = 0 lastTime = tData[-1] tmpDate = datetime.datetime.fromtimestamp(lastTime, tz=datetime.timezone.utc) lastYear = tmpDate.year tmpSeasons = [] tmpSeasons.append(datetime.datetime.timestamp(datetime.datetime(year=lastYear, month=3, day=21, tzinfo=datetime.timezone.utc))) tmpSeasons.append(datetime.datetime.timestamp(datetime.datetime(year=lastYear, month=6, day=21, tzinfo=datetime.timezone.utc))) tmpSeasons.append(datetime.datetime.timestamp(datetime.datetime(year=lastYear, month=9, day=21, tzinfo=datetime.timezone.utc))) tmpSeasons.append(datetime.datetime.timestamp(datetime.datetime(year=lastYear, month=12, day=21, tzinfo=datetime.timezone.utc))) # The incomplete seasons are removed from the calculus for i in range(len(tmpSeasons)-1,0-1,-1): if(tData[-1]>=tmpSeasons[i]): if(i<0): lastSeason = tmpSeasons[i-1] break else: lastYear -= 1 lastSeason = datetime.datetime.timestamp(datetime.datetime(year=lastYear-1, month=12, day=21, tzinfo=datetime.timezone.utc)) break elif(i==0): lastYear -= 1 lastSeason = datetime.datetime.timestamp(datetime.datetime(year=lastYear-1, month=12, day=21, tzinfo=datetime.timezone.utc)) break ## Main loop # ajouter l'init the tStart ou tEnd (ou les 2) while(tSeasons[mySeason]<=lastSeason): # Indentification of the interval of the seasons tStart = tSeasons[mySeason] if(mySeason<3): nextSeason = mySeason+1 tEnd = tSeasons[nextSeason] else: # Update of the season's dates and times for j in range(len(dSeasons)): dSeasons[j] = dSeasons[j].replace(dSeasons[j].year+1) for j in range(len(tSeasons)): tSeasons[j] = datetime.datetime.timestamp(dSeasons[j]) nextSeason = 0 tEnd = tSeasons[nextSeason] # Identification of the number of time steps in this interval nbElementsIn = int((tEnd-tStart)/dtData) nbElementsOut = int((tEnd-tStart)/dtOut) tmpCumul = np.zeros(nbElementsOut) tmptOut = np.zeros(nbElementsOut) # Computation of the cumul volume for the current season if(len(cumul[mySeason])==0): tmpCumul[0] = np.sum(data[firstIndex:firstIndex+int(factor)])/factor*dtOut else: tmpCumul[0] = cumul[mySeason][-1] + (np.sum(data[firstIndex:firstIndex+int(factor)]))/factor*dtOut tmptOut[0] = tData[firstIndex] for i in range(1,nbElementsOut): tmpCumul[i] = tmpCumul[i-1] + np.sum(data[firstIndex+i*int(factor) : firstIndex+(i+1)*int(factor)])/factor*dtOut tmptOut[i] = tData[firstIndex+i*int(factor)] # Small test to check if all the elements were used in the data if((i+1)*int(factor) != nbElementsIn): print("ERROR: in the number of element In considered!!") sys.exit() firstIndex += (i+1)*int(factor) # Addition of the new cumul array to the previous one cumul[mySeason] = np.append(cumul[mySeason], tmpCumul) tOut[mySeason] = np.append(tOut[mySeason], tmptOut) mySeason = nextSeason return tOut, cumul
## Function decomposing data per each season and returns an array per season. # This function does not decompose data for incomplete seasons (i.e. at the beginning or at the end of the interval) #
[docs] def decompose_data_per_season(data, dtData, dtOut, tData): ## Definition of the ongoing year tmpDate = datetime.datetime.fromtimestamp(tData[0], tz=datetime.timezone.utc) myYear = tmpDate.year ## Definition of : # - tOut array : time array of the output # - newData list : list of data decomposed into each season tOut = [np.zeros(0), np.zeros(0), np.zeros(0), np.zeros(0)] newData = [np.zeros(0), np.zeros(0), np.zeros(0), np.zeros(0)] ## Definition of the constants # @var mySeason Integer which is # =0, spring # =1, summer # =2, fall # =3, winter mySeason = 0 # The dates for the beginning of each season dSeasons= [] # Spring -> [0] dSeasons.append(datetime.datetime(year=myYear, month=3, day=21, tzinfo=datetime.timezone.utc)) # Summer -> [1] dSeasons.append(datetime.datetime(year=myYear, month=6, day=21, tzinfo=datetime.timezone.utc)) # Fall -> [2] dSeasons.append(datetime.datetime(year=myYear, month=9, day=21, tzinfo=datetime.timezone.utc)) # Winter -> [3] dSeasons.append(datetime.datetime(year=myYear, month=12, day=21, tzinfo=datetime.timezone.utc)) # Timestamps for the beginning of each season tSeasons = np.zeros(len(dSeasons)) for i in range(len(dSeasons)): tSeasons[i] = datetime.datetime.timestamp(dSeasons[i]) ## Initialisation # Evaluation of the current season for i in range(len(tSeasons)): if(tData[0]<=tSeasons[i]): if(i<3): mySeason = i break else: mySeason = 0 myYear += 1 # Update of the seasons dates and times for j in range(dSeasons): dSeasons[j] = dSeasons[j].replace(dSeasons[j].year+1) for j in range(len(tSeasons)): tSeasons[j] = datetime.datetime.timestamp(dSeasons[j]) break # Identification of the first element to treat firstIndex = int((tSeasons[mySeason]-tData[0])/dtData) if((tSeasons[mySeason]-tData[0])%dtData!=0): print("ERROR: this case was not considered in the code yet!") sys.exit() # Identification of the last element to treat lastSeason = 0 lastTime = tData[-1] tmpDate = datetime.datetime.fromtimestamp(lastTime, tz=datetime.timezone.utc) lastYear = tmpDate.year tmpSeasons = [] tmpSeasons.append(datetime.datetime.timestamp(datetime.datetime(year=lastYear, month=3, day=21, tzinfo=datetime.timezone.utc))) tmpSeasons.append(datetime.datetime.timestamp(datetime.datetime(year=lastYear, month=6, day=21, tzinfo=datetime.timezone.utc))) tmpSeasons.append(datetime.datetime.timestamp(datetime.datetime(year=lastYear, month=9, day=21, tzinfo=datetime.timezone.utc))) tmpSeasons.append(datetime.datetime.timestamp(datetime.datetime(year=lastYear, month=12, day=21, tzinfo=datetime.timezone.utc))) # The incomplete seasons are removed from the calculus for i in range(len(tmpSeasons)-1,0-1,-1): if(tData[-1]>=tmpSeasons[i]): if(i<0): lastSeason = tmpSeasons[i-1] break else: lastYear -= 1 lastSeason = datetime.datetime.timestamp(datetime.datetime(year=lastYear-1, month=12, day=21, tzinfo=datetime.timezone.utc)) break elif(i==0): lastYear -= 1 lastSeason = datetime.datetime.timestamp(datetime.datetime(year=lastYear-1, month=12, day=21, tzinfo=datetime.timezone.utc)) break ## Main loop # ajouter l'init the tStart ou tEnd (ou les 2) while(tSeasons[mySeason]<=lastSeason): # Indentification of the interval of the seasons tStart = tSeasons[mySeason] if(mySeason<3): nextSeason = mySeason+1 tEnd = tSeasons[nextSeason] else: # Update of the season's dates and times for j in range(len(dSeasons)): dSeasons[j] = dSeasons[j].replace(dSeasons[j].year+1) for j in range(len(tSeasons)): tSeasons[j] = datetime.datetime.timestamp(dSeasons[j]) nextSeason = 0 tEnd = tSeasons[nextSeason] # Identification of the number of time steps in this interval nbElements = int((tEnd-tStart)/dtData) tmpdata = np.zeros(nbElements) tmptOut = np.zeros(nbElements) # Computation of the cumul volume for the current season if(len(newData[mySeason])==0): tmpdata[0] = data[firstIndex] else: tmpdata[0] = data[firstIndex] tmptOut[0] = tData[firstIndex] for i in range(1,nbElements): tmpdata[i] = tmpdata[i-1] + data[firstIndex+i] tmptOut[i] = tData[firstIndex+i] # Small test to check if all the elements were used in the data if((i+1) != nbElements): print("ERROR: in the number of element In considered!!") sys.exit() firstIndex += (i+1) # Addition of the new cumul array to the previous one newData[mySeason] = np.append(newData[mySeason], tmpdata) tOut[mySeason] = np.append(tOut[mySeason], tmptOut) mySeason = nextSeason return tOut, newData
## cst = [m^3/s] # dt = [sec] # Attention si c'est: - une pluie -> ok # - un débit -> enlever une taille
[docs] def cumul_fromCst(cst, dateBegin, dateEnd, dt, isFlow=False): timeLength = dateEnd-dateBegin if(dt==3600): if(timeLength.total_seconds()%3600==0): mySize = int(timeLength.total_seconds()/3600) else: print("ERROR: the number of hours not an integer!") sys.exit() elif(timeLength.total_seconds()%3600==0): mySize = int(timeLength.total_seconds()/dt) if(isFlow==False): mySize +=1 # else: # mySize +=1 cumul = np.zeros(mySize) cumul[0] = cst for i in range(1,mySize): cumul[i] = cumul[i-1] + cst*dt return cumul
[docs] def evaluate_Nash(simul, tSimul, measures, tMeasures, dateBegin, dateEnd, mask=[]): """ Function evaluating the Nash-Suttcliff coeff Caution: So far, if a measure is 0, it is not considered in the N-S evaluation. """ # Nash–Sutcliffe model efficiency coefficient print("TO CORRECT -> Check 'Hello !' !!!!!") # Definition of the time step # Test of the constant time step - for simulation dtS = tSimul[1]-tSimul[0] for i in range(2,len(measures)): tmp_dt = tSimul[i]-tSimul[i-1] # Hello ! A corriger!!!!!!!!! # if(tmp_dt!=dtS): # print("WARNING: the time step is not regular and constant in the simulation data !") # print("Two timestamps can be equal when there is time changeover.") # sys.exit() # Test of the constant time step - for measures dtM = tMeasures[1]-tMeasures[0] for i in range(2,len(measures)): tmp_dt = tSimul[i]-tSimul[i-1] # if(tmp_dt!=dtM): # print("WARNING: the time step is not regular and constant in the measures!") # print("Two timestamps can be equal when there is time changeover.") # sys.exit() # Definition of the factor between both time steps if(dtM%dtS!=0): print("ERROR: the time step of the desired output is not compatible with the data timestep!") sys.exit() else: factor = dtM/dtS # conversion factor from data timestep and cumul time step # Definition of the first elements ti = datetime.datetime.timestamp(dateBegin) tend = datetime.datetime.timestamp(dateEnd) # - simulation: first_el_simul = -9999 last_el_simul = -9999 for i in range(len(tSimul)): if(tSimul[i]==ti): first_el_simul = i if(tSimul[i]==tend): last_el_simul = i break if(first_el_simul==-9999 or last_el_simul==-9999): print("ERROR: the simulation data are out of range!") sys.exit() nb_el_simul = last_el_simul - first_el_simul # - measures: first_el_measures = -9999 last_el_measures = -9999 for i in range(len(tSimul)): if(tMeasures[i]==ti): first_el_measures = i if(tMeasures[i]==tend): last_el_measures = i break if(first_el_measures==-9999 or last_el_measures==-9999): print("ERROR: the simulation data are out of range!") sys.exit nb_el_measures = last_el_measures - first_el_measures # Verification of the proportionality between simulations and measures if(nb_el_simul%nb_el_measures!=0): print("ERROR: proportionality between simulations and measures!") sys.exit() sumNum = 0.0 sumDen = 0.0 meanMeasures = 0.0 if mask==[]: # meanMeasures = np.mean(measures[first_el_measures:last_el_measures+1]) counter = 0 mask = [False for i in range(nb_el_measures)] for i in range(nb_el_measures): if(measures[first_el_measures+i]!=0.0): meanMeasures += measures[first_el_measures+i] mask[i]= True counter += 1 if counter == 0: return 0.0 meanMeasures = meanMeasures/counter for i in range(nb_el_measures): if(mask[i]==True): sumNum += (simul[first_el_simul+i*int(factor)]-measures[first_el_measures+i])**2.0 sumDen += (measures[first_el_measures+i]-meanMeasures)**2.0 else: counter = 0 for i in range(nb_el_measures): if(mask[first_el_simul+i*int(factor) and measures[first_el_measures+i]!=0]): meanMeasures += measures[first_el_measures+i] counter += 1 meanMeasures = meanMeasures/counter for i in range(nb_el_measures): if(mask[first_el_simul+i*int(factor)]): sumNum += (simul[first_el_simul+i*int(factor)]-measures[first_el_measures+i])**2.0 sumDen += (measures[first_el_measures+i]-meanMeasures)**2.0 NSE = 1.0 - (sumNum/sumDen) # sumNum = 0.0 # sumDen = 0.0 # meanMeasures = np.mean(measures) # for i in range(len(measures)): # sumNum += (outflow[(first_el)*4+4*i]-measures[i])**2.0 # sumDen += (measures[i]-meanMeasures)**2.0 # NSE = 1.0 - (sumNum/sumDen) return NSE
[docs] def evaluate_logNash(simul, tSimul, measures, tMeasures, dateBegin, dateEnd): # Nash–Sutcliffe model efficiency coefficient print("TO CORRECT -> Check 'Hello !' !!!!!") # Definition of the time step # Test of the constant time step - for simulation dtS = tSimul[1]-tSimul[0] for i in range(2,len(measures)): tmp_dt = tSimul[i]-tSimul[i-1] # Hello ! A corriger!!!!!!!!! # if(tmp_dt!=dtS): # print("WARNING: the time step is not regular and constant in the simulation data !") # print("Two timestamps can be equal when there is time changeover.") # sys.exit() # Test of the constant time step - for measures dtM = tMeasures[1]-tMeasures[0] for i in range(2,len(measures)): tmp_dt = tSimul[i]-tSimul[i-1] # if(tmp_dt!=dtM): # print("WARNING: the time step is not regular and constant in the measures!") # print("Two timestamps can be equal when there is time changeover.") # sys.exit() # Definition of the factor between both time steps if(dtM%dtS!=0): print("ERROR: the time step of the desired output is not compatible with the data timestep!") sys.exit() else: factor = dtM/dtS # conversion factor from data timestep and cumul time step # Definition of the first elements ti = datetime.datetime.timestamp(dateBegin) tend = datetime.datetime.timestamp(dateEnd) # - simulation: first_el_simul = -9999 last_el_simul = -9999 for i in range(len(tSimul)): if(tSimul[i]==ti): first_el_simul = i if(tSimul[i]==tend): last_el_simul = i break if(first_el_simul==-9999 or last_el_simul==-9999): print("ERROR: the simulation data are out of range!") sys.exit() nb_el_simul = last_el_simul - first_el_simul # - measures: first_el_measures = -9999 last_el_measures = -9999 for i in range(len(tSimul)): if(tMeasures[i]==ti): first_el_measures = i if(tMeasures[i]==tend): last_el_measures = i break if(first_el_measures==-9999 or last_el_measures==-9999): print("ERROR: the simulation data are out of range!") sys.exit nb_el_measures = last_el_measures - first_el_measures # Verification of the proportionality between simulations and measures if(nb_el_simul%nb_el_measures!=0): print("ERROR: proportionality between simulations and measures!") sys.exit() sumNum = 0.0 sumDen = 0.0 meanMeasures = np.mean(measures[first_el_measures:last_el_measures+1]) for i in range(nb_el_measures): sumNum += (np.log(simul[first_el_simul+i*int(factor)])-np.log(measures[first_el_measures+i]))**2.0 sumDen += (np.log(measures[first_el_measures+i])-np.log(meanMeasures))**2.0 logNSE = 1.0 - (sumNum/sumDen) return logNSE
## Function to convert rain data from rain .dat file into another data matrix with a new timestep # @var data Matrix containing all data in a rain file. Units: [day]-[month]-[year]-[hour]-[min]-[sec]-[mm/h]
[docs] def convert_inother_timestep(data, dtIn, dtOut): """ data Check script Test_convert_dt_data for more information Function to correct!!! Only valid for rain data in [mm/h] with dtIn=1h -> dtOut=1day """ if(dtIn%dtOut==0): # dtIn is a multiple of dtOut print("This case was not considered yet. Please complete it!") sys.exit() return dataOut elif(dtOut%dtIn==0): # dtOut is a multiple of dtIn factor = int(dtOut/dtIn) dataOut = np.zeros((len(data)*factor,7)) for i in range(len(data)): for j in range(factor): dataOut[i*factor+j][:] = data[i][:] dataOut[i*factor+j][3] = j return dataOut else: print("This case was not considered yet. Please complete it!") sys.exit()
## Function to convert data with a smaller timestep into data with a greater timestep # @var data vector of data to convert # @var dtIn time step of the data # @var dtOut time step of the data to return # @var method string containing the method to use to aggregate the data
[docs] def aggregate(data, dtIn, dtOut, method="mean"): if(dtOut%dtIn!=0): print("ERROR: the data time step is not a multiple of the desired timestep") sys.exit() factor = int(dtOut/dtIn) nbIn = len(data) nbOut = int(nbIn/factor) dataOut = np.zeros(len(data)/factor) if(method=="mean"): for i in range(nbOut): for j in range(factor): dataOut[i] += data[i*factor+j]/factor return dataOut elif(method=="sum"): print("ERROR: Not implemented yet!") return dataOut
[docs] def index_from_date(beginDate, currentDate, dt, dtUnit="sec", roundType="nearest"): deltaDate = currentDate - beginDate deltaSec = deltaDate.seconds deltaDay = deltaDate.days if(dtUnit=="sec" or dtUnit=="SEC"): delta = deltaSec + deltaDay*3600.0*24.0 elif(dtUnit=="hour" or dtUnit=="HOUR"): delta = deltaSec/60 + deltaDay*24 elif(dtUnit=="days" or dtUnit=="DAYS"): delta = deltaSec/(60.0*3600.0) + deltaDay if(roundType=="nearest"): myIndex = int(round(delta/dt)) elif(roundType=="floor"): myIndex = math.floor(delta/dt) elif(roundType=="ceil"): myIndex = math.ceil(delta/dt) return myIndex
[docs] def Horton_function(u, uMax, infilParams): f0 = infilParams[0] fc = infilParams[1] k = infilParams[2] infil = fc + (f0-fc)*np.exp(-k*u/uMax) return infil
[docs] def read_C_hyd(fileName, path, dateBegin=None, dateEnd=None, deltaT=None, tzDelta=datetime.timedelta(hours=0)): if(type(fileName)!=str): print("ERROR: Expecting only 1 file name for measurements!") sys.exit() fileName = os.path.join(path,fileName) nbCl = 0 with open(fileName, newline = '') as fileID: data_reader = csv.reader(fileID, delimiter=' ',skipinitialspace=True) list_data = [] i=0 for raw in data_reader: if i>3: list_data.append(raw[0:nbCl]) if i==2: nbCl = int(raw[0]) elif i==3: nbLines = int(raw[0]) i += 1 matrixData = np.array(list_data).astype("float") # Init of the outflow array if dateBegin!=None and dateEnd!=None and deltaT!=None: timeInterval = dateEnd-dateBegin+datetime.timedelta(seconds=deltaT) outFlow = np.zeros(int(timeInterval.total_seconds()/deltaT),dtype=ct.c_double, order='F') timeArray = np.zeros(int(timeInterval.total_seconds()/deltaT)) ti = datetime.datetime.timestamp(dateBegin) tf = datetime.datetime.timestamp(dateEnd) else: outFlow = np.zeros(nbLines,dtype=ct.c_double, order='F') timeArray = np.zeros(nbLines) ti = 0.0 tf = sys.float_info.max # From the measurements file, we will only read the desired data and save it in outflow prevDate = datetime.datetime(year=int(matrixData[0][2]), month=int(matrixData[0][1]), day=int(matrixData[0][0]), hour=int(matrixData[0][3]), tzinfo=datetime.timezone.utc) prevDate -= tzDelta index = 0 add1Hour = datetime.timedelta(hours=1) secondsInDay = 24*60*60 # Verification if(datetime.datetime.timestamp(prevDate)>ti): print("ERROR: the first hydro data element is posterior to dateBegin!") sys.exit() if(nbCl==5): # Caution : the index of the loop start at 24 because the timestamp function # does not work until the 2/01/1970 at 03:00:00. => Je ne sais pas pourquoi ?! for i in range(25,len(matrixData)): # The hours are written in the file in [1,24] instead of [0,23]. Conversion below: if(int(matrixData[i][3])==24): currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=23, tzinfo=datetime.timezone.utc) + add1Hour else: currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=int(matrixData[i][3]), tzinfo=datetime.timezone.utc) currDate -= tzDelta if(int(matrixData[i-1][3])==24): prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=23, tzinfo=datetime.timezone.utc) + add1Hour else: prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=int(matrixData[i-1][3]), tzinfo=datetime.timezone.utc) prevDate -= tzDelta # Start at dateBegin and go to the element before dateEnd. Because the last date is needed for rain and evap in implicit simulations. if(datetime.datetime.timestamp(currDate)>=ti and \ datetime.datetime.timestamp(currDate)<=tf): outFlow[index] = matrixData[i][4] diffDate = currDate - prevDate diffTimeInSeconds = diffDate.days*secondsInDay + diffDate.seconds timeArray[index] = datetime.datetime.timestamp(currDate) # timeArray[index] = timeArray[index-1] + diffTimeInSeconds index += 1 elif(nbCl==7): for i in range(len(matrixData)): # The hours are written in the file in [1,24] instead of [0,23]. Conversion below: currDate = datetime.datetime(year=int(matrixData[i][2]), month=int(matrixData[i][1]), day=int(matrixData[i][0]), hour=int(matrixData[i][3]), minute=int(matrixData[i][4]), second=int(matrixData[i][5]),tzinfo=datetime.timezone.utc) currDate -= tzDelta prevDate = datetime.datetime(year=int(matrixData[i-1][2]), month=int(matrixData[i-1][1]), day=int(matrixData[i-1][0]), hour=int(matrixData[i-1][3]), minute=int(matrixData[i-1][4]), second=int(matrixData[i-1][5]),tzinfo=datetime.timezone.utc) prevDate -= tzDelta # Start at dateBegin and go to the element before dateEnd. Because the last date is needed for rain and evap in implicit simulations. if(datetime.datetime.timestamp(currDate)>=ti and \ datetime.datetime.timestamp(currDate)<=tf): if(matrixData[i][6]<0): outFlow[index] = 0.0 else: outFlow[index] = matrixData[i][6] outFlow[index] = matrixData[i][6] diffDate = currDate - prevDate diffTimeInSeconds = diffDate.days*secondsInDay + diffDate.seconds timeArray[index] = datetime.datetime.timestamp(currDate) index += 1 # The last date is not taken into account in hydro as the last date rain and evap is needed for implicit simulations diffDate = currDate - prevDate # Add the last element in the time matrix as its size is 1 element bigger than outlet if(deltaT!=diffDate.seconds): print("ERROR: The last timestep in hydro data does not coincide with the one expected!") sys.exit() return timeArray, outFlow
[docs] def write_compare_file(data:list, fileName, delimter="\t" ): nbL = len(data) nbC = 2 for element in data: if len(element) != nbC: logging.error("The number of columns expected (here 2) are not coherent with the data !") header = [nbL, nbC] data.insert(0, header) f = open(fileName, "w") for element in data: line = delimter.join(element) f.write(line) f.close()
[docs] def convert_to_compareData(time, hydro, unitsOut="mm/h", unitsIn="m^3/s", DrainageSurface=0.0): if len(time) == len(hydro): nbLines = len(time) else: print("ERROR : time and data vecteur does not have the same dimensions!") myData = np.zeros(nbLines,2) # Timestamp of data myData[:,0] = time # Hydro construction if unitsOut == unitsIn: convFactor = 1.0 elif unitsIn=="m^3/s" and unitsOut=="mm/h": convFactor = DrainageSurface/3.6 # A VERIFIER!!! # A VERIFIER!!! myData[:,1] = hydro * convFactor return myData