"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
from os.path import join,exists,normpath
import requests
import pandas as pd
import numpy as np
from calendar import month, monthrange
import pandas as pd
import datetime as dt
from datetime import timedelta
import matplotlib.pyplot as plt
from sympy import lowergamma
import time
import re
import logging
from pathlib import Path
from typing import Literal
from tqdm import tqdm
from dataclasses import dataclass
from shapely.geometry import Point, Polygon
from time import sleep
from ..PyTranslate import _
from ..PyVertexvectors import vector, wolfvertex as wv
try:
from ..hydrometry_hece.kiwis_hece import hydrometry_hece as hydrometry
except:
logging.debug("Impossible to import hydrometry_hece.kiwis_hece, trying to import hydrometry.kiwis")
try:
from .kiwis import hydrometry
except:
logging.error("Impossible to import hydrometry.kiwis, trying to import hydrometry_hece.kiwis_hece")
raise ImportError("Impossible to import hydrometry.kiwis or hydrometry_hece.kiwis_hece, please check that the hydrometry module is correctly installed and accessible in your Python environment")
from .kiwis import station_fields as sf, timeseries_fields as ts, kiwis_default_q as def_q, kiwis_default_h as def_h, kiwis_default_rain as def_rain
@dataclass
[docs]
class GlobalStats:
[docs]
total_num_of_values: int
def __str__(self):
return f"Mean: {self.mean:.2f}\nMedian: {self.median:.2f}\nStd: {self.std:.2f}\nMin: {self.min:.2f}\nMax: {self.max:.2f}\nQ25: {self.q25:.2f}\nQ75: {self.q75:.2f}\nSkewness: {self.skewness:.2f}\nKurtosis: {self.kurtosis:.2f}\nNumber of values: {self.num_of_values}\nTotal number of values: {self.total_num_of_values}\nCoverage: {self.coverage:.2%}"
@dataclass
[docs]
class MissingValues:
def __str__(self):
return f"Number: {self.number}\nPercentage: {self.percentage:.2%}"
@dataclass
[docs]
class DateRange:
[docs]
all_ranges: list[tuple[pd.Timestamp, pd.Timestamp]]
def __str__(self):
return f"Start: {self.start}\nEnd: {self.end}\nAll ranges: {self.all_ranges}"
@dataclass
[docs]
class YearlyStats:
def __str__(self):
return f"Mean: {self.mean:.2f}\nMedian: {self.median:.2f}\nStd: {self.std:.2f}\nMin: {self.min:.2f}\nMax: {self.max:.2f}\nQ25: {self.q25:.2f}\nQ75: {self.q75:.2f}\nSkewness: {self.skewness:.2f}\nKurtosis: {self.kurtosis:.2f}"
@dataclass
[docs]
class MonthlyStats:
def __str__(self):
return f"Mean: {self.mean:.2f}\nMedian: {self.median:.2f}\nStd: {self.std:.2f}\nMin: {self.min:.2f}\nMax: {self.max:.2f}\nQ25: {self.q25:.2f}\nQ75: {self.q75:.2f}\nSkewness: {self.skewness:.2f}\nKurtosis: {self.kurtosis:.2f}"
@dataclass
@dataclass
[docs]
class StatisticsSeries:
[docs]
time_step: dt.timedelta = None
[docs]
global_stats: GlobalStats = None
[docs]
missing_values: MissingValues = None
[docs]
date_range: DateRange = None
[docs]
years: YearsStats = None
[docs]
monthly_statistics: dict[int, MonthlyStats] = None
[docs]
yearly_statistics: dict[int, YearlyStats] = None
def __str__(self):
return f"Statistics for {self.name}:\n" \
f"Global stats: {self.global_stats}\n" \
f"Missing values: {self.missing_values}\n" \
f"Date range: {self.date_range}\n" \
f"Years: {self.years}"
[docs]
STATS_HOURS_IRM = np.asarray([1,2,3,6,12,24,2*24,3*24,4*24,5*24,7*24,10*24,15*24,20*24,25*24,30*24],dtype=np.int32)
[docs]
STATS_MINUTES_IRM = np.asarray(STATS_HOURS_IRM)*60
[docs]
class SPW_pluviographs():
"""
Management of the rain data from the SPW-MI website through its data download interface
The data are stored in a local directory in csv or parquet format, and can be loaded in memory as pandas Series for analysis and plotting.
"""
def __init__(self, variable:str, timestep:str, credential:str = None, store_dir:str | Path = None) -> None:
#Création de 2 dictionnaires de recherche sur base de la chaîne
[docs]
self._timestep_dl = None
[docs]
self._store_dir = Path(store_dir) if store_dir is not None else None
[docs]
self._variable = variable
[docs]
self._timestep = timestep
dir_hydrometry = Path(__file__).parent / '..' / 'data' / 'hydrometry'
if credential is not None:
self._hydrometry = hydrometry(credential=credential, dir=dir_hydrometry)
else:
self._hydrometry = hydrometry(dir=dir_hydrometry)
# Get **ACTIVE** stations for the given variable and timestep
[docs]
self._stations = self._hydrometry.get_timeseries_group(variable, timestep)
[docs]
def _get_inactive_stations(self) -> pd.DataFrame:
""" All the stations available in the SPW-MI website for the given variable and timestep, but not present in self._stations,
which are the active stations for the given variable and timestep.
This can be useful to identify stations that have data available but are not currently active for the given variable and timestep,
for example to check if there are new stations that have been added since the last update of the hydrometry structure,
or if there are stations that have been deactivated for some reason.
"""
ret = self._hydrometry.stations
# Keep only stations not present in self._stations, which are the active stations for the given variable and timestep
# Test lowercase to avoid issues with accents and case sensitivity in station names
inactive_stations = ret[~ret[sf.STATION_NAME.value].str.lower().isin(self._stations[sf.STATION_NAME.value].str.lower())]
inactive_stations = inactive_stations[inactive_stations['site_no'].isin(['DGH', 'DCENN'])]
names = []
ids = []
for name in inactive_stations[sf.STATION_NAME.value].values:
ts_id = self._hydrometry.timeseries_list(name)[1]
if self._variable == 'flowrate':
ts_id = ts_id[ts_id['parametertype_name'] == 'Q']
if self._timestep == '1h':
ts_id = ts_id[ts_id[ts.TS_NAME.value].str.contains(def_q.Q_1H.value)]
elif self._timestep == 'highres':
ts_id = ts_id[ts_id[ts.TS_NAME.value].str.contains(def_q.Q_FULL.value)]
elif self._variable == 'rain':
ts_id = ts_id[ts_id['parametertype_name'] == 'R']
if self._timestep == '1h':
ts_id = ts_id[ts_id[ts.TS_NAME.value].str.contains(def_rain.R_1H.value)]
elif self._timestep == 'highres':
ts_id = ts_id[ts_id[ts.TS_NAME.value].str.contains(def_rain.R_FULL.value)]
elif self._variable == 'waterdepth':
ts_id = ts_id[ts_id['parametertype_name'] == 'H']
if self._timestep == '1h':
ts_id = ts_id[ts_id[ts.TS_NAME.value].str.contains(def_h.H_1H.value)]
elif self._timestep == 'highres':
ts_id = ts_id[ts_id[ts.TS_NAME.value].str.contains(def_h.H_FULL.value)]
if len(ts_id) == 1:
names.append(name)
ids.append(ts_id[ts.TS_ID.value].values[0])
return pd.DataFrame({sf.STATION_NAME.value: names, ts.TS_ID.value: ids})
[docs]
def force_update_hydrometry_structure(self):
""" Force the update of the hydrometry structure to get the latest available stations
and timeseries from the SPW-MI website.
This can be useful if new stations or timeseries have been added since the last update,
or if there have been changes in the station metadata (e.g., name, code, location, etc.)
that are not yet reflected in the local structure."""
self._hydrometry.update_forced()
self._stations = self._hydrometry.get_timeseries_group(self._variable, self._timestep)
def __getitem__(self, key):
if key in self._data:
return self._data[key]
elif str(key).lower() in self._get_names_lower():
ts_id = self.get_tsid_from_name(key)
if ts_id in self._data:
return self._data[ts_id]
else:
logging.warning(_("The data for station {} (TS_ID {}) is not yet loaded in memory").format(key, ts_id))
return None
elif key in self.get_codes():
ts_id = self.get_tsid_from_code(key)
if ts_id in self._data:
return self._data[ts_id]
else:
logging.warning(_("The data for station {} (TS_ID {}) is not yet loaded in memory").format(key, ts_id))
return None
[docs]
def get_all_series(self, key:Literal['tsid', 'name'] = 'name', to_sort:bool = True) -> dict[str, pd.Series]:
""" Get all the series currently loaded in memory, with station names as keys and pandas Series as values
"""
if to_sort:
if key == 'tsid':
return dict(sorted(self._data.items(), key=lambda item: self.get_name_from_tsid(item[0]), reverse=True))
elif key == 'name':
all_series = {}
for key, value in self._data.items():
station_name = self.get_name_from_tsid(key)
all_series[station_name] = value
return dict(sorted(all_series.items(), key=lambda item: item[0], reverse=True))
else:
logging.error(_("Search key {} not recognized, available keys are 'tsid' and 'name'").format(key))
else:
if key == 'tsid':
return self._data
elif key == 'name':
all_series = {}
for key, value in self._data.items():
station_name = self.get_name_from_tsid(key)
all_series[station_name] = value
return all_series
else:
logging.error(_("Search key {} not recognized, available keys are 'tsid' and 'name'").format(key))
[docs]
def get_station_names_inside_polygon(self, polygon: vector | Polygon) -> list[str]:
""" Get the names of the stations inside a given polygon
:param polygon: a shapely Polygon or a vector of vertices defining the polygon
"""
if isinstance(polygon, vector):
polygon = vector.polygon
station_names = []
for idx, row in self._stations.iterrows():
station_point = Point(row[sf.STATION_LOCAL_X.value], row[sf.STATION_LOCAL_Y.value])
if polygon.contains(station_point):
station_names.append(row[sf.STATION_NAME.value])
return station_names
[docs]
def get_nearest_station_name(self, point: wv | Point | list[float, float]) -> str:
""" Get the name of the nearest station to a given point
:param point: a shapely Point, a vector of coordinates, or a list of coordinates [x, y]
"""
if isinstance(point, wv):
point = Point(point.x, point.y)
elif isinstance(point, list) and len(point) == 2:
point = Point(point[0], point[1])
nearest_station = None
min_distance = float('inf')
for idx, row in self._stations.iterrows():
station_point = Point(row[sf.STATION_LOCAL_X.value], row[sf.STATION_LOCAL_Y.value])
distance = point.distance(station_point)
if distance < min_distance:
min_distance = distance
nearest_station = row[sf.STATION_NAME.value]
return nearest_station
[docs]
def resample(self, rain:pd.Series, new_timestep:dt.timedelta, method:str='sum'):
if method=='sum':
return rain.resample(new_timestep).sum()
elif method=='mean':
return rain.resample(new_timestep).mean()
elif method=='max':
return rain.resample(new_timestep).max()
else:
logging.error(_("Resampling method {} not recognized, available methods are 'sum', 'mean', and 'max'").format(method))
return None
[docs]
def get_names(self):
return list(self._stations[sf.STATION_NAME.value].values)
[docs]
def _get_names_lower(self):
return {name.lower(): name for name in self.get_names()}
[docs]
def get_codes(self):
return list(self._stations[sf.STATION_NO.value].values)
[docs]
def get_tsid_from_name(self, name:str):
ret = self._stations[self._stations[sf.STATION_NAME.value].str.lower()==name.lower()]
if len(ret) == 1:
return ret[ts.TS_ID.value].values[0]
return None
[docs]
def get_tsid_from_code(self, code:str):
return self._stations[self._stations[sf.STATION_NO.value]==code][ts.TS_ID.value].values[0]
[docs]
def get_tsid(self, name:str='', code:str=''):
if name != '':
return self.get_tsid_from_name(name)
elif code != '':
return self.get_tsid_from_code(code)
else:
logging.error(_("The station name or code must be provided to obtain its TS_ID"))
return None
[docs]
def get_name_from_tsid(self, ts_id:str):
if not ts_id in self._stations[ts.TS_ID.value].values:
logging.error(_("The TS_ID {} does not exist in the available stations").format(ts_id))
return None
return self._stations[self._stations[ts.TS_ID.value]==ts_id][sf.STATION_NAME.value].values[0]
[docs]
def get_code_from_tsid(self, ts_id:str):
if not ts_id in self._stations[ts.TS_ID.value].values:
logging.error(_("The TS_ID {} does not exist in the available stations").format(ts_id))
return None
return self._stations[self._stations[ts.TS_ID.value]==ts_id][sf.STATION_NO.value].values[0]
[docs]
def get_data(self, fromyear:int, toyear:int,
code:str='', name:str='',
filterna:bool=True, timezone:str='GMT+1'):
""" Get the rain data from the SPW-MI website for a given station code or name, and for a given period of years
If filterna is True, the NaN values are replaced by 0. Otherwise, they are kept as NaN.
:param fromyear: the starting year of the period (inclusive)
:param toyear: the ending year of the period (inclusive)
:param code: the station code (if name is not provided)
:param name: the station name (if code is not provided)
:param filterna: whether to replace NaN values by 0 (default: True)
:param timezone: the timezone of the data (default: 'GMT+1')
:return: a pandas Series containing the rain data for the given station and period, indexed by datetime
"""
if name != '':
try:
ts_id = self.get_tsid(name)
rain = self._hydrometry.timeseries(stationname=name,
fromdate=dt.datetime(fromyear,1,1),
todate=dt.datetime(toyear,12,31,23,59,59) ,
ts_id=ts_id,
timezone=timezone,
interval=self._timestep_dl.seconds)
except Exception as e:
logging.error(_("Error while retrieving rain data for station {} between {} and {}: {}").format(name, fromyear, toyear, str(e)))
return pd.Series(dtype=np.float64)
if not isinstance(rain, pd.Series):
logging.error(_("The retrieved rain data for station {} between {} and {} is not a pandas Series").format(name, fromyear, toyear))
return pd.Series(dtype=np.float64)
return rain
[docs]
def get_year_data(self, year:int=2021,
code:str='', name:str='',
filterna:bool=True, timezone:str='GMT+1'):
""" Get the rain data from the SPW-MI website for a given station code or name, and for a given year
:param year: the year of the data to retrieve
:param code: the station code (if name is not provided)
:param name: the station name (if code is not provided)
:param filterna: whether to replace NaN values by 0 (default: True)
:param timezone: the timezone of the data (default: 'GMT+1')
:return: a pandas Series containing the rain data for the given station and year, indexed by datetime
"""
if name != '':
try:
ts_id = self.get_tsid(name)
rain = self._hydrometry.timeseries(stationname=name,
fromdate=dt.datetime(year,1,1),
todate=dt.datetime(year,12,31,23,59,59),
ts_id=ts_id,
timezone=timezone,
interval=self._timestep_dl.seconds)
except Exception as e:
logging.error(_("Error while retrieving rain data for station {} in {}: {}").format(name, year, str(e)))
return pd.Series(dtype=np.float64)
return rain
[docs]
def get_month_data(self, month:int=7, year:int=2021,
code:str='', name:str='',
timezone:str='GMT+1'):
"""Retrieving hourly data from the SPW website (GMT+1)
:param month: the month of the year for which to retrieve data (1-12)
:param year: the year for which to retrieve data
:param code: the station code (if name is not provided)
:param name: the station name (if code is not provided)
:param timezone: the timezone of the data (default: 'GMT+1') -- ('UTC', 'GMT+1', 'GMT+2', etc.)
"""
if name != '':
nd_days = monthrange(year, month)[1]
try:
ts_id = self.get_tsid(name)
rain = self._hydrometry.timeseries(stationname=name,
fromdate=dt.datetime(year, month, 1),
todate=dt.datetime(year, month, 1)+dt.timedelta(days=nd_days) - self._timestep_dl,
ts_id=ts_id,
timezone=timezone,
interval=self._timestep_dl.seconds)
except Exception as e:
logging.error(_("Error while retrieving rain data for station {} in {}-{}: {}").format(name, month, year, str(e)))
return pd.Series(dtype=np.float64)
return rain
@classmethod
[docs]
def compute_stats_Q(cls, rain:pd.Series, listhours:list[int]) -> np.ndarray:
"""
Computes the maximum cumulative rainfall for different durations based on convolution with a vector of number of hours
Unity : mm
:param rain: the time series of rainfall to analyze
:param listhours: the list of durations in hours for which to calculate the stats (e.g., [1,2,3,6,12,24])
:return: a numpy array containing the maximum values for each duration
"""
max_quantity = np.zeros(len(listhours), dtype=np.float64)
k=0
for locstat in listhours:
a = np.ones(locstat)
max_quantity[k]=np.max(np.convolve(rain.to_numpy(), a, 'same'))
k+=1
return max_quantity
@classmethod
[docs]
def compute_stats_i(cls, rain:pd.Series, listhours:list[int]) -> np.ndarray:
"""
Computes the maximum average intensities for different durations based on convolution with a vector of number of hours
Unity : mm/h
:param rain: the time series of rainfall to analyze
:param listhours: the list of durations in hours for which to calculate the stats (e.g., [1,2,3,6,12,24])
:return: a numpy array containing the maximum values for each duration
"""
mean_intensity = cls.compute_stats_Q(rain,listhours) / np.asarray(listhours, dtype=np.float64)
return mean_intensity
@classmethod
[docs]
def plot(cls,
data:pd.Series,
toshow=False,
xbounds=None,
ticks='M',
label:str = None,
figax=None):
if figax is None:
fig,ax = plt.subplots(1,1,figsize=(15,8))
else:
fig=figax[0]
ax=figax[1]
x = data.index
if not xbounds is None:
minyear = xbounds[0].year
maxyear = xbounds[1].year+1
if ticks=='M':
xticks = [xbounds[0]+pd.DateOffset(months=k) for k in range(int((xbounds[1]-xbounds[0]).days/30)+1)]
elif ticks=='D':
xticks = [xbounds[0]+pd.DateOffset(days=k) for k in range(int((xbounds[1]-xbounds[0]).days))]
elif ticks=='2H':
xticks = [xbounds[0]+pd.DateOffset(hours=k) for k in range(0,int((xbounds[1]-xbounds[0]).days*24),2)]
elif ticks=='H':
xticks = [xbounds[0]+pd.DateOffset(hours=k) for k in range(int((xbounds[1]-xbounds[0]).days*24))]
else:
minyear = x[0].year
maxyear = x[-1].year+1
xticks = [dt.datetime(minyear,1,1, tzinfo=data.index.tz) + pd.DateOffset(months=k) for k in range(0,(maxyear-minyear)*12+1,3)]
ax.fill_between(data.index,data.values,step='pre',alpha=0.7)
ax.step(data.index,data.values,where='pre',label=label)
ax.set_xticks(xticks)
if ticks=='M':
ax.set_xticklabels([curtick.strftime('%b/%Y') for curtick in xticks],rotation=45, fontsize=8)
elif ticks=='D':
ax.set_xticklabels([curtick.strftime('%d/%m/%Y') for curtick in xticks],rotation=45, fontsize=8)
elif ticks=='2H' or ticks=='H':
ax.set_xticklabels([curtick.strftime('%d/%m %H:%M') for curtick in xticks],rotation=45, fontsize=8)
ax.set_xlabel('Date')
ax.set_ylabel('Précipitation moyenne horaire [mm/h]')
ax.legend()
if not xbounds is None:
ax.set_xlim(xbounds)
if toshow:
fig.show()
return fig,ax
@classmethod
[docs]
def plot_periodic(cls,
data:pd.Series,
origin:dt.datetime,
length_in_months:int=12,
toshow:bool=False,
figax:tuple[plt.Figure, plt.Axes]=None) -> tuple[plt.Figure, plt.Axes]:
"""
Comparison of several years over the same horizon of a total year starting from a given date, for example to compare the summer periods of several years starting from the 1st of July. The x-axis is in days from the origin date, and the y-axis is the rainfall in mm/h.
:param data: the time series of rainfall to plot
:param origin: the start date of the period to plot (e.g., July 1, 2021)
:param offset_in_months: the number of months to plot from the start date (e.g., 3 to plot from July 1 to September 30)
:param toshow: if True, displays the plot at the end of the function (default: False)
:param figax: a tuple (fig, ax) of matplotlib to plot on an existing figure (default: None)
:return: a tuple (fig, ax) of matplotlib containing the created or modified plot
"""
if figax is None:
fig,ax = plt.subplots(1,1,figsize=(15,8))
else:
fig=figax[0]
ax=figax[1]
end = data.index[-1]
# check if origin is tz-naive, if so, we assume it has the same tz as the rain index
if origin.tzinfo is None:
origin = origin.replace(tzinfo=data.index.tz)
startdate=origin
enddate = origin
offset = pd.DateOffset(months=length_in_months)
while enddate<=end:
enddate = startdate + offset
locrain = data[startdate:enddate]
if len(locrain>0):
i1=(locrain.index[0]-startdate).days*24
x = np.arange(i1,i1+len(locrain.values))
# ax.fill_between(x,locrain.values,step='pre',alpha=0.5)
ax.step(x,locrain.values,where='pre')
startdate=enddate
xticks = [k*30*24 for k in range(length_in_months+1)]
ax.set_xticks(xticks)
ax.set_xticklabels([str(k*30) for k in range(length_in_months+1)])
if toshow:
fig.show()
return fig, ax
[docs]
def save(self, data:pd.Series, filename:str, format:Literal['csv','parquet']='csv'):
if format=='csv':
data.to_csv(filename,header=['Data'])
elif format=='parquet':
pd.DataFrame(data).to_parquet(filename, engine='pyarrow',)
[docs]
def load(self, name:str='',
code:int=0,
filename:str='',
fromdate:dt.datetime=None,
todate:dt.datetime=None,
format:Literal['csv','parquet']='parquet'):
myname = filename
if name != '':
myname = self.get_tsid_from_name(name)
filename = self._store_dir / (str(myname) + '.' + format)
elif code > 0:
filename = self.get_tsid_from_code(code)
filename = self._store_dir / (str(filename) + '.' + format)
filename= Path(filename).with_suffix('.'+format)
if exists(filename):
if format=='csv':
mydata = pd.read_csv(filename, index_col=0, parse_dates=True, header=0, names=['Data'])
elif format=='parquet':
mydata = pd.read_parquet(filename, engine='pyarrow')
if len(mydata) ==0:
logging.warning(_("The file {} is empty, unable to load data for station {}").format(filename,myname))
return pd.Series(dtype=np.float64)
mydata.index = pd.to_datetime(mydata.index)
mydata.columns=['Data']
mydata = mydata.squeeze() #on convertit en Series si c'est un DataFrame à une colonne
else:
logging.warning(_("The file {} does not exist, unable to load data for station {}").format(filename,myname))
return pd.Series(dtype=np.float64)
if fromdate is not None and fromdate.tzinfo is None:
fromdate = fromdate.replace(tzinfo=mydata.index.tz)
if todate is not None and todate.tzinfo is None:
todate = todate.replace(tzinfo=mydata.index.tz)
if fromdate is None and todate is None:
return mydata
elif fromdate is None:
return mydata[:todate]
elif todate is None:
return mydata[fromdate:]
else:
return mydata[fromdate:todate]
[docs]
def _download_one(self, dirout:Path | str = None,
name:str='',
fromyear:int=2002, toyear:int=dt.datetime.now().year,
timezone:str='GMT+1',
format:Literal['csv','parquet']='parquet',
force:bool=False):
if dirout is None:
dirout = Path(__file__).parent / '..' / 'data' / 'hydrometry' / 'rain'
dirout=Path(dirout)
dirout.mkdir(parents=True, exist_ok=True)
ts_id = self.get_tsid(name)
fname = dirout / (str(ts_id) + '.' + format)
if fname.exists():
if not force:
logging.info(_("The file {} already exists, skipping to the next one").format(fname))
return
datas = [self.get_data(y, y, name=name, timezone=timezone) for y in range(fromyear, toyear+1)]
myflow = pd.concat(datas)
if myflow is not None:
# on enlève les index en doublons éventuels
myflow = myflow[~myflow.index.duplicated(keep='last')]
self.save(myflow, fname, format=format)
logging.info(name)
self._store_dir = dirout
[docs]
def download_all(self, dirout:Path | str = None,
fromyear:int=2002, toyear:int=dt.datetime.now().year,
timezone:str='GMT+1',
format:Literal['csv','parquet']='parquet',
force:bool=False):
if dirout is None:
dirout = Path(__file__).parent / '..' / 'data' / 'hydrometry' / 'rain'
dirout=Path(dirout)
dirout.mkdir(parents=True, exist_ok=True)
for curstation in tqdm(self.get_names()):
ts_id = self.get_tsid(curstation)
fname = dirout / (str(ts_id) + '.' + format)
if fname.exists():
logging.info(_("The file {} already exists, skipping to the next one").format(fname))
continue
self._download_one(dirout=dirout, name=curstation, fromyear=fromyear, toyear=toyear, timezone=timezone, format=format, force=force)
sleep(0.5) # to avoid overloading the server
self._store_dir = dirout
[docs]
def update_all(self, dirin:Path | str = None,
format:Literal['csv','parquet']='parquet'):
if dirin is None:
dirin = Path(__file__).parent / '..' / 'data' / 'hydrometry' / 'rain'
dirin=Path(dirin)
for curfile in tqdm(list(dirin.iterdir())[::-1]):
if curfile.is_file():
if curfile.suffix in ['.' + format]:
ts_id = curfile.stem
self.update_one(ts_id, dirin=dirin, format=format)
self._format = format
if self._data is not None:
return self.load_all(format=format)
[docs]
def update_one(self, ts_id:str,
dirin:Path | str = None,
format:Literal['csv','parquet']='parquet'):
if dirin is None:
dirin = Path(__file__).parent / '..' / 'data' / 'hydrometry' / 'rain'
dirin=Path(dirin)
curfile = dirin / (str(ts_id) + '.' + format)
if curfile.exists():
myrain = self.load(filename=curfile, format=format)
if myrain is not None:
# download the data from the end of the file to now
if len(myrain) == 0:
logging.warning(_("Le fichier {} est vide").format(curfile))
lastdate = dt.datetime(2002,1,1,tzinfo=dt.timezone(dt.timedelta(hours=1)))
else:
lastdate = myrain.index[-1]
now = dt.datetime.now().astimezone(lastdate.tzinfo)
if lastdate < now:
if lastdate.month == now.month and lastdate.year == now.year:
newrain = self.get_month_data(lastdate.month, lastdate.year, name=self.get_name_from_tsid(ts_id), timezone=lastdate.tzinfo)
else:
newrain = self.get_data(lastdate.year, now.year, name=self.get_name_from_tsid(ts_id), timezone=lastdate.tzinfo)
if newrain is not None:
try:
newrain = newrain[newrain.index > lastdate]
except Exception as e:
logging.error(_("Error while filtering new rainfall data for station {} after {}: {}").format(self.get_name_from_tsid(ts_id), lastdate, str(e)))
if len(newrain) > 0:
myrain = pd.concat([myrain, newrain])
# drop duplicate index if any
myrain = myrain[~myrain.index.duplicated(keep='last')]
self.save(myrain, curfile, format=format)
logging.info(_("Data for station {} updated until {}").format(ts_id, myrain.index[-1]))
else:
logging.info(_("Data for station {} already up to date until {}").format(ts_id, lastdate))
else:
logging.warning(_("Unable to retrieve data for station {} for the period from {} to {}").format(ts_id, lastdate, now))
[docs]
def load_all(self, dirin:Path | str = None,
fromdate:dt.datetime=None, todate:dt.datetime=None,
timezone:str='GMT+1',
format:Literal['csv','parquet']='parquet'):
if dirin is None:
dirin = Path(__file__).parent / '..' / 'data' / 'hydrometry' / 'rain'
self._store_dir = dirin
dirin=Path(dirin)
self._data = {}
for curfile in tqdm(dirin.iterdir()):
if curfile.is_file():
if curfile.suffix == '.' + format:
ts_id = curfile.stem
myrain = self.load(filename=curfile, fromdate=fromdate, todate=todate, format=format)
self._data[ts_id] = myrain
self._format = format
return self._data
@classmethod
[docs]
def analyze(cls, serie: pd.Series,
name:str, volume:bool = True,
min_daily_rainfall:float = 0.1,
force_unique_timestep:bool = False) -> StatisticsSeries:
""" Analyze the time serie and return statistics
- Global statistics: mean, median, std, min, max
- Missing values : number and percentage of missing values
- Date range: start and end dates
- Years : number of years, number of complete years
- Yearly statistics : mean, median, std, min, max per year
- Monthly statistics : mean, median, std, min, max per month
:param serie: pd.Series, time series to analyze
:param name: str, name of the time series
:param volume: bool, if True, convert intensity to volume per month/year
:param min_daily_rainfall: float, minimum daily rainfall to consider
:param force_unique_timestep: bool, if True, enforce unique time step between non-NA values, otherwise, the time step is set to None if inconsistent time steps are found
:return: StatisticsSeries, statistics
"""
all_satistics = StatisticsSeries(name=name)
if serie.empty:
logging.warning(_("The time series {} is empty, unable to compute statistics").format(name))
return all_satistics
all_satistics.global_stats = GlobalStats(
mean=serie.mean(),
median=serie.median(),
std=serie.std(),
min=serie.min(),
max=serie.max(),
q25=serie.quantile(0.25),
q75=serie.quantile(0.75),
skewness=serie.skew(),
kurtosis=serie.kurtosis(),
num_of_values=serie.notna().sum(),
total_num_of_values=len(serie),
coverage=serie.notna().mean() * 100,
)
all_satistics.missing_values = MissingValues(
number=serie.isna().sum(),
percentage=serie.isna().mean() * 100,
)
serie_wo_na = serie.dropna()
# find contiguous non-NA ranges (as (start_timestamp, end_timestamp) tuples)
mask = serie.notna()
na_ranges = []
if mask.any():
groups = mask.ne(mask.shift()).cumsum()
for _, grp in mask.groupby(groups):
if grp.iloc[0]: # contiguous block of valid values
idx = grp.index
na_ranges.append((idx[0], idx[-1]))
all_satistics.date_range = DateRange(
# Excluding NaN values at beginning and end
start=serie_wo_na.index.min(),
end=serie_wo_na.index.max(),
all_ranges=na_ranges
)
# Find time step between non-NA values
if len(serie_wo_na) >= 2:
time_diffs = serie.index.to_series().diff().dropna()
if time_diffs.nunique() > 1:
if force_unique_timestep:
raise ValueError("Inconsistent time steps found")
else:
all_satistics.time_step = None
else:
all_satistics.time_step = time_diffs.mode()[0]
all_satistics.years = YearsStats(
number_of_years=serie.index.year.nunique(),
number_of_complete_years=serie.groupby(serie.index.year).apply(lambda x: x.notna().all()).sum(),
)
if volume:
# Convert to volume per month
serie_month = serie.resample('ME').mean() * serie.resample('ME').count() # m3/hour to m3/month
else:
serie_month = serie
stats = (serie_month.groupby(serie_month.index.month).agg(
mean='mean',
median='median',
std='std',
min='min',
max='max',
q25=lambda x: x.quantile(0.25),
q75=lambda x: x.quantile(0.75),
skewness=lambda x: x.skew(),
kurtosis=lambda x: x.kurtosis(),
)
.rename(columns={'q25': '25%', 'q75': '75%'})
.to_dict(orient='index')
)
all_satistics.monthly_statistics = {}
for month in stats.keys():
try:
all_satistics.monthly_statistics[month] = MonthlyStats(
mean=stats[month]['mean'],
median=stats[month]['median'],
std=stats[month]['std'],
min=stats[month]['min'],
max=stats[month]['max'],
q25=stats[month]['25%'],
q75=stats[month]['75%'],
skewness=stats[month]['skewness'],
kurtosis=stats[month]['kurtosis']
)
except KeyError:
logging.warning(_("Missing statistics for month {} in series {}").format(month, name))
serie_year = serie.copy()
# resample to daily totals first
serie_year = serie_year.resample('D').mean() * serie_year.resample('D').count() # from hourly average to daily total (mm/day)
# Set to NaN values
serie_year[serie_year <= min_daily_rainfall] = np.nan
if volume:
# Convert to volume per year
serie_year = serie_year.resample('YE').mean() * serie_year.resample('YE').count() # m3/hour to m3/year
stats = (
serie_year.groupby(serie_year.index.year)
.agg(
mean='mean',
median='median',
std='std',
min='min',
max='max',
q25=lambda x: x.quantile(0.25),
q75=lambda x: x.quantile(0.75),
skewness=lambda x: x.skew(),
kurtosis=lambda x: x.kurtosis()
)
.rename(columns={'q25': '25%', 'q75': '75%'})
.to_dict(orient='index')
)
all_satistics.yearly_statistics = {}
for year in serie_year.index.year.unique():
all_satistics.yearly_statistics[year] = YearlyStats(
mean=stats[year]['mean'],
median=stats[year]['median'],
std=stats[year]['std'],
min=stats[year]['min'],
max=stats[year]['max'],
q25=stats[year]['25%'],
q75=stats[year]['75%'],
skewness=stats[year]['skewness'],
kurtosis=stats[year]['kurtosis']
)
return all_satistics
[docs]
def analyze_all(self, min_value:float = 0.1, volume:bool = True) -> list[StatisticsSeries]:
"""
Analyze all the series currently loaded in memory and return a dictionary of statistics with station names as keys and statistics as values
"""
all_statistics = [self.analyze(serie, name=key, min_daily_rainfall=min_value, volume=volume) for key, serie in self.get_all_series().items() if not serie.empty]
return all_statistics
@classmethod
[docs]
def plot_time_ranges(cls, statistics: list[StatisticsSeries], figax=None):
""" Create a plot showing the time ranges of the given statistics
:param statistics: list of dict, statistics to plot
:param figax: tuple of (fig, ax) or None
:return: tuple of (fig, ax)
"""
if figax is None:
fig, ax = plt.subplots(figsize=(12,6))
else:
fig, ax = figax
ticks = []
years = []
for i, stats in enumerate(statistics):
years += list(stats.yearly_statistics.keys())
years = list(set(years))
years += [years[-1]+1] # add one year after the last one for better x-axis ticks
for i, stats in enumerate(statistics):
name = stats.name
try:
start = stats.date_range.start
end = stats.date_range.end
ax.plot([start, end], [i, i], marker='o', label=f"{name}", linewidth=6)
for rng in stats.date_range.all_ranges:
ax.plot([rng[0], rng[1]], [i, i], color='white', alpha=0.5, linewidth=4)
# Add coverage percentage as text
coverage = stats.global_stats.coverage
ax.text(end + timedelta(days=30), i, f"{coverage:.1f}%", verticalalignment='center')
except Exception as e:
logging.warning(_("Unable to plot time range for series {}: {}").format(name, str(e)))
ticks.append(name)
ax.set_yticks(range(len(statistics)))
ax.set_yticklabels(ticks)
ax.set_xlabel("Date")
ax.set_title("Time ranges of the different series")
ax.set_xticks([pd.Timestamp(year=int(y), month=1, day=1) for y in years])
ax.set_xticklabels(years, rotation=30)
ax.grid()
return fig, ax
@classmethod
[docs]
def plot_yearly_statistics(cls, statistics: StatisticsSeries, title:str ="Yearly Statistics", figax=None):
""" Plot yearly statistics with error bars (mean +/- std)
:param statistics: dict, statistics to plot
:param title: str, title of the plot
:param figax: tuple of (fig, ax) or None
:return: tuple of (fig, ax)
"""
stats_year = statistics.yearly_statistics
years = list(stats_year.keys())
means = [stats_year[year].mean for year in years]
stds = [stats_year[year].std for year in years]
if figax is None:
fig, ax = plt.subplots(figsize=(10, 5))
else:
fig, ax = figax
ax.errorbar(years, means, yerr=stds, fmt='o', ecolor='r', capthick=2)
ax.set_title(title + ' - ' + statistics.name)
ax.set_xlabel("Year")
ax.set_ylabel("Value")
ax.grid()
return fig, ax
@classmethod
[docs]
def plot_yearly_boxplots(cls, statistics: StatisticsSeries, title:str ="Yearly Boxplots", figax=None):
""" Plot yearly statistics as boxplots
:param statistics: dict, statistics to plot
:param title: str, title of the plot
:param figax: tuple of (fig, ax) or None
:return: tuple of (fig, ax)
"""
stats_year = statistics.yearly_statistics
years = list(stats_year.keys())
data = [ [stats_year[year].min,
stats_year[year].q25,
stats_year[year].median,
stats_year[year].q75,
stats_year[year].max] for year in years]
if figax is None:
fig, ax = plt.subplots(figsize=(10, 5))
else:
fig, ax = figax
ax.boxplot(data, positions=years)
ax.set_title(title + ' - ' + statistics.name)
ax.set_xlabel("Year")
ax.set_ylabel("Value")
ax.set_xticklabels(years, rotation=30)
ax.grid()
return fig, ax
@classmethod
[docs]
def plot_monthly_statistics(cls, statistics: StatisticsSeries, title:str ="Monthly Statistics", figax=None):
""" Plot monthly statistics with error bars (mean +/- std)
:param statistics: dict, statistics to plot
:param title: str, title of the plot
:param figax: tuple of (fig, ax) or None
:return: tuple of (fig, ax)
"""
stats_month = statistics.monthly_statistics
months = list(stats_month.keys())
means = [stats_month[month].mean for month in months]
stds = [stats_month[month].std for month in months]
if figax is None:
fig, ax = plt.subplots(figsize=(10, 5))
else:
fig, ax = figax
ax.errorbar(months, means, yerr=stds, fmt='o', ecolor='r', capthick=2)
ax.set_title(title + ' - ' +statistics.name)
ax.set_xlabel("Month")
ax.set_xticks(months)
ax.set_xticklabels([dt.datetime(2000, month, 1).strftime('%b') for month in months])
ax.set_ylabel("Value")
ax.grid()
return fig, ax
@classmethod
[docs]
def plot_monthly_boxplots(cls, statistics: StatisticsSeries, title:str ="Monthly Boxplots", figax=None, unit_factor:float = 1.0):
""" Plot monthly statistics as boxplots
:param statistics: dict, statistics to plot
:param title: str, title of the plot
:param figax: tuple of (fig, ax) or None
:param unit_factor: float, factor to convert units (e.g. from m3 to 1000 m3)
:return: tuple of (fig, ax)
"""
stats_month = statistics.monthly_statistics
months = list(stats_month.keys())
data = [[stats_month[month].min * unit_factor,
stats_month[month].q25 * unit_factor,
stats_month[month].median * unit_factor,
stats_month[month].q75 * unit_factor,
stats_month[month].max * unit_factor] for month in months]
if figax is None:
fig, ax = plt.subplots(figsize=(10, 5))
else:
fig, ax = figax
ax.boxplot(data, positions=months)
ax.set_title(title + ' - ' + statistics.name)
ax.set_xlabel("Month")
ax.set_xticks(months)
ax.set_xticklabels([dt.datetime(2000, month, 1).strftime('%b') for month in months], rotation=30)
ax.set_ylabel("Value")
ax.grid()
return fig, ax
[docs]
class SPW_pluviographs_1h(SPW_pluviographs):
def __init__(self, store_dir: str | Path = None) -> None:
if store_dir is None:
store_dir = Path(__file__).parent / '..' / 'data' / 'hydrometry' / 'rain' / '1h'
super().__init__('rain','1h', store_dir=store_dir)
[docs]
self._timestep_dl = dt.timedelta(hours=1)
[docs]
def download_all(self, fromyear = 2002, toyear = dt.datetime.now().year, timezone = 'GMT+1', format = 'parquet', force:bool=False):
return super().download_all(self._store_dir, fromyear, toyear, timezone, format, force=force)
[docs]
def download_one(self, name = '', fromyear = 2002, toyear = dt.datetime.now().year, timezone = 'GMT+1', format = 'parquet', force:bool=False):
return super()._download_one(self._store_dir, name, fromyear, toyear, timezone, format, force=force)
[docs]
def load_all(self, fromdate = None, todate = None, timezone = 'GMT+1', format = 'parquet'):
return super().load_all(self._store_dir, fromdate, todate, timezone, format)
[docs]
def update_all(self, format:Literal['csv','parquet']='parquet'):
return super().update_all(self._store_dir, format)
[docs]
def update_one(self, ts_id:str, dirin:Path | str = None, format:Literal['csv','parquet']='parquet'):
return super().update_one(ts_id, dirin=self._store_dir if dirin is None else dirin, format=format)
[docs]
class SPW_pluviographs_5min(SPW_pluviographs):
def __init__(self, store_dir: str | Path = None) -> None:
if store_dir is None:
store_dir = Path(__file__).parent / '..' / 'data' / 'hydrometry' / 'rain' / '5min'
super().__init__('rain','5min', store_dir=store_dir)
[docs]
self._timestep_dl = dt.timedelta(minutes=5)
[docs]
def download_all(self, fromyear = 2002, toyear = dt.datetime.now().year, timezone = 'GMT+1', format = 'parquet', force:bool=False):
return super().download_all(self._store_dir, fromyear, toyear, timezone, format, force=force)
[docs]
def download_one(self, name = '', fromyear = 2002, toyear = dt.datetime.now().year, timezone = 'GMT+1', format = 'parquet', force:bool=False):
return super()._download_one(self._store_dir, name, fromyear, toyear, timezone, format, force=force)
[docs]
def load_all(self, fromdate = None, todate = None, timezone = 'GMT+1', format = 'parquet'):
return super().load_all(self._store_dir, fromdate, todate, timezone, format)
[docs]
def update_all(self, format:Literal['csv','parquet']='parquet'):
return super().update_all(self._store_dir, format)
[docs]
def update_one(self, ts_id:str, dirin:Path | str = None, format:Literal['csv','parquet']='parquet'):
return super().update_one(ts_id, dirin=self._store_dir if dirin is None else dirin, format=format)
if __name__=="__main__":
#exemple
[docs]
my_5min = SPW_pluviographs()
myrain=my_5min.get_month_data(name="Jalhay")
mystats=my_5min.compute_stats_Q(myrain,STATS_HOURS_IRM)
print(mystats)