from importlib.resources import files
from pathlib import Path
from enum import Enum
import numpy as np
import logging
import ftplib
from eccodes import codes_grib_new_from_file, codes_get, codes_get_values, codes_release, codes_keys_iterator_new, codes_keys_iterator_next, codes_keys_iterator_get_name
from datetime import datetime as dt, timedelta as td
from datetime import timezone as timezone
from shapely.geometry import Polygon, Point
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import wx
from .pydownloader import DATADIR, toys_dataset
from .Coordinates_operations import transform_coordinates
from .wolf_array import header_wolf, WolfArray
from .PyVertexvectors import Zones, zone, vector, wolfvertex as wv
from .PyTranslate import _
[docs]
OPENDATA_FORECASTS = 'forecasts'
[docs]
OPENDATA_FTP_SERVER = 'opendata.meteo.be'
[docs]
OPENDATA_ALARO_40L = 'alaro_40l'
[docs]
FILE_PREFIX = 'alaro40l'
# Date Format : YYYYMMDDHH with HH in [00, 06, 12, 18]
[docs]
def _convert_col2date_str(col:str) -> str:
""" Create a string representation of the date from the column name. """
if col is None:
return "No data"
parts = col.split('_')
run_date = dt.strptime(parts[0], '%Y%m%d%H').strftime('%Y-%m-%d %H:%M')
real_date = dt.strptime(parts[1], '%Y%m%d').strftime('%Y-%m-%d')
hour = parts[2]
date_str = _(f'Forecast date : {real_date}, Hour : {hour} - Run : {run_date}')
return date_str
[docs]
def _extract_dates_from_columnstr(col:str) -> list[dt]:
""" Extract the run date and forecast date from the column name. """
if col is None:
return [None, None]
parts = col.split('_')
run_date = dt.strptime(parts[0], '%Y%m%d%H')
real_date = dt.strptime(parts[1], '%Y%m%d')
hour = int(parts[2])
# set timezone as UTC
run_date = run_date.replace(tzinfo=timezone.utc)
real_date = real_date.replace(tzinfo=timezone.utc)
forecast_date = real_date + td(hours=hour)
return [run_date, forecast_date]
[docs]
def ArrayBelgium() -> WolfArray:
"""
Create a WolfArray for the Belgium domain.
"""
# Define the spatial extent for Belgium
h = header_wolf()
h.set_origin(17_000., 20_000.)
h.set_resolution(4_000., 4_000.)
h.shape = (int((296_000.-17_000.)//4_000.), int((245_000.-20_000.)//4_000.))
# Create the WolfArray
return WolfArray(srcheader=h)
[docs]
class GribFiles(Enum):
"""
Enum for Grib files used in IRM Alaro data.
"""
# FIXME : check the units and descriptions
[docs]
FILE_10U = '10U.grb' # 10m U-component of wind [m/s]
[docs]
FILE_10V = '10V.grb' # 10m V-component of wind [m/s]
[docs]
FILE_MaxT2 = 'MaxT2.grb' # Maximum 2m temperature [K]
[docs]
FILE_MinT2 = 'MinT2.grb' # Minimum 2m temperature [K]
[docs]
FILE_2D = '2D.grb' # 2m dew point temperature [K]
[docs]
FILE_2T = '2T.grb' # 2m temperature [K]
[docs]
FILE_RH2M = 'RH2M.grb' # 2m relative humidity [0-1]
[docs]
FILE_CR = 'CR.grb' # Convective rain [mm] or [kg/m²]
[docs]
FILE_CS = 'CS.grb' # Convective snow [mm] or [kg/m²]
[docs]
FILE_Z = 'Z.grb' # Geopotential height [m]
[docs]
FILE_IFCCC = 'IFCCC.grb' # Instant fluctuation convective cloud cover [0-1]
[docs]
FILE_IFHCC = 'IFHCC.grb' # Instant fluctuation high cloud cover [0-1]
[docs]
FILE_IFLCC = 'IFLCC.grb' # Instant fluctuation low cloud cover [0-1]
[docs]
FILE_IFMCC = 'IFMCC.grb' # Instant fluctuation medium cloud cover [0-1]
[docs]
FILE_IFTCC = 'IFTCC.grb' # Instant fluctuation total cloud cover [0-1]
[docs]
FILE_LSR = 'LSR.grb' # Large scale rain [mm] or [kg/m²]
[docs]
FILE_LSS = 'LSS.grb' # Large scale snow [mm] or [kg/m²]
[docs]
FILE_MSLP = 'MSLP.grb' # Mean sea level pressure [Pa]
[docs]
FILE_R = 'R.grb' # Relative humidity [0-1] ??
[docs]
FILE_Q = 'Q.grb' # Relative humidity isobaric [0-1] ??
[docs]
FILE_MerGust = 'MerGust.grb' # SBL Medidian gust [m/s]
[docs]
FILE_ZonGust = 'ZonGust.grb' # SBL Zonal gust [m/s]
[docs]
FILE_SurfSWrad = 'SurfSWrad.grb' # Surface shortwave radiation [W/m²]
[docs]
FILE_SurfLWrad = 'SurfLWrad.grb' # Surface longwave radiation [W/m²]
[docs]
FILE_SurfCape = 'SurfCape.grb' # Surface CAPE [J/kg]
[docs]
FILE_ST = 'ST.grb' # Surface temperature [K]
[docs]
FILE_ORO = 'ORO.grb' # Surface orography [m]
[docs]
FILE_T = 'T.grb' # Temperature [K]
[docs]
FILE_TotPrecip = 'TotPrecip.grb' # Total precipitation [m_water_equivalent]
[docs]
FILE_U = 'U.grb' # U-component of wind [m/s]
[docs]
FILE_V = 'V.grb' # V-component of wind [m/s]
[docs]
FILE_W = 'W.grb' # Vertical velocity [Pa/s]
[docs]
FILE_WBPT = 'WBPT.grb' # Wet bulb potential temperature [K]
[docs]
FILE_fzht = 'fzht.grb' # Freezing leveml (0°C isotherm) [m]
[docs]
class IRM_Alaro():
"""
Class for handling IRM Alaro forecasts.
"""
def __init__(self, ftp_server: str = OPENDATA_FTP_SERVER, ftp_path: str = OPENDATA_FORECASTS):
"""
Initialize the IRM_Alaro class with FTP server and path.
"""
[docs]
self.ftp_server = ftp_server
[docs]
self.ftp_path = ftp_path
[docs]
self._array = ArrayBelgium()
[docs]
self._available_run_dates = []
[docs]
self._colormap = plt.get_cmap('Blues')
[docs]
self._cities = gpd.read_file(toys_dataset('Communes_Belgique', 'PDS__COMMUNES.shp'))
[docs]
def _ftp_init(self):
""" Initialize the FTP connection. """
self.ftp = ftplib.FTP(self.ftp_server,)
self.ftp.login()
self.ftp.cwd(self.ftp_path + '/' + OPENDATA_ALARO_40L)
[docs]
def _ftp_close(self):
"""
Close the FTP connection.
"""
try:
self.ftp.close()
except Exception as e:
logging.error(f"Error closing FTP connection: {e}")
[docs]
def list_run_dates_cached(self) -> list:
"""
Return the cached list of available run dates.
"""
all_grib_files = self.data_directory.rglob('*.grb')
dates = [f.stem.split('_')[1] for f in all_grib_files if f.is_file() and len(f.stem.split('_')) == 3]
return sorted(list(set(dates)))
[docs]
def list_run_dates(self) -> list:
"""
List available data files on the FTP server.
"""
today = dt.now()
possible_run_dates = []
# hours
hours = ['00', '06', '12', '18']
date_format = '%Y%m%d'
# Generate forecast strings for the last 3 days
for i in range(3):
forecast_time = today - td(days=i)
forecast_str = forecast_time.strftime(date_format)
possible_run_dates.append([forecast_str + hour for hour in hours])
# flatten the list of lists
possible_run_dates = [item for sublist in possible_run_dates for item in sublist]
try:
self._ftp_init()
for poss in possible_run_dates:
try:
self.ftp.cwd(poss)
files = self.ftp.nlst()
if files:
self._available_run_dates.append(poss)
self.ftp.cwd('..') # Go back to the parent directory
except ftplib.error_perm:
continue
self._ftp_close()
self._available_run_dates.sort()
return self._available_run_dates
except ftplib.error_perm as e:
logging.error(f"Error listing files: {e}")
return []
@property
[docs]
def run_dates(self) -> list:
""" Return the available forecasts run dates. """
if len(self._available_run_dates) == 0:
self.list_run_dates()
return self._available_run_dates
@property
[docs]
def run_dates_str(self) -> str:
""" Return the available forecasts run dates as a string. """
return [dt.strptime(run, '%Y%m%d%H').strftime('%Y-%m-%d %H:%M') for run in self.run_dates]
[docs]
def list_files_for_forecast(self, run_date: str) -> list:
"""
List files for a specific forecast.
:param run_date: The forecast time.
"""
try:
self._ftp_init()
self.ftp.cwd(f"{run_date}")
files = self.ftp.nlst()
self._ftp_close()
# check if all files are present
missing_files = [file for file in GribFiles if self._get_filename(file, run_date) not in files]
if missing_files:
logging.warning(f"Missing files for forecast {run_date}: {missing_files}")
return files
except ftplib.error_perm as e:
logging.error(f"Error listing files for forecast {run_date}: {e}")
return []
[docs]
def _get_filename(self, file: GribFiles, run_date: str) -> str:
"""
Generate the filename for a given Grib file and forecast.
:param file: The Grib file enum.
:param run_date: The forecast time.
"""
return f"{FILE_PREFIX}_{run_date}_{file.value}"
@property
[docs]
def data_directory(self) -> Path:
""" Return the data directory path. """
return Path(DATADIR) / 'forecasts'
[docs]
def download_data(self, filename: GribFiles | str, run_date: str) -> Path:
"""
Download data from the FTP server.
:param filename: The Grib file to download or a specific filename.
:param run_date: The forecast time.
"""
if isinstance(filename, str):
fn = filename
else:
fn = self._get_filename(filename, run_date)
local_path = self.data_directory / f'{fn}'
local_path.parent.mkdir(parents=True, exist_ok=True)
if run_date not in self.run_dates:
if not local_path.exists():
logging.info(f"Run date {run_date} not available on FTP server nor locally.")
return local_path
else:
logging.info(f"Run date {run_date} not available on FTP server, but file exists locally.")
return local_path
self._ftp_init()
self.ftp.cwd(f"{run_date}")
# Get size of the file
try:
remote_size = self.ftp.size(fn)
if local_path.exists():
local_size = local_path.stat().st_size
if local_size == remote_size:
logging.info(f"File {fn} already exists and is complete. Skipping download.")
else:
logging.info(f"File {fn} exists but is incomplete. Re-downloading.")
with open(local_path, 'wb') as f:
self.ftp.retrbinary(f'RETR {fn}', f.write)
else:
logging.info(f"File {fn} does not exist. Re-downloading.")
with open(local_path, 'wb') as f:
self.ftp.retrbinary(f'RETR {fn}', f.write)
except ftplib.error_perm as e:
logging.error(f"Could not get size for file {fn}: {e}")
# Proceed to download the file anyway
with open(local_path, 'wb') as f:
self.ftp.retrbinary(f'RETR {fn}', f.write)
self._ftp_close()
return local_path
[docs]
def download_TotalPrecipitations_available_files(self) -> list[Path]:
"""
Download Cumulated rain, Temperature et 2m - files available on the FTP server.
"""
to_download = [GribFiles.FILE_TotPrecip, GribFiles.FILE_2T]
rundates = self.run_dates
downloaded_files = []
for rundate in rundates:
for file in self.list_files_for_forecast(rundate):
for selection in to_download:
if selection.value in file:
local_path = self.download_data(file, rundate)
downloaded_files.append(local_path)
return downloaded_files
[docs]
def download_all_available_files(self) -> list[Path]:
"""
Download all files available on the FTP server.
"""
rundates = self.run_dates
downloaded_files = []
for rundate in rundates:
for file in self.list_files_for_forecast(rundate):
local_path = self.download_data(file, rundate)
downloaded_files.append(local_path)
return downloaded_files
[docs]
def _get_center_coordinates(self, filename: GribFiles,
run_date: str,
EPSG:str = 'EPSG:31370') -> tuple[np.ndarray, np.ndarray]:
"""
Load GRIB data and compute coordinates.
:param filename: The GRIB file to process.
:param forecast: The forecast time.
:param download: Whether to download the file if it doesn't exist.
:param EPSG: The target EPSG code for the output coordinates.
:return: The center coordinates (x, y) for the given GRIB file and forecast.
"""
file_path = self.download_data(filename, run_date)
if not file_path.exists():
logging.error(f"File {file_path} does not exist.")
return np.array([])
with open(file_path, 'rb') as f:
gid = codes_grib_new_from_file(f)
# Type de grille
grid_type = codes_get(gid, "gridType")
logging.info("Type de grille :", grid_type)
# Dimensions de la grille
Ni = codes_get(gid, "Ni") # nombre de points en longitude
Nj = codes_get(gid, "Nj") # nombre de points en latitude
logging.info(f"Dimensions : {Ni} x {Nj}")
# Coordonnées du premier et dernier point
lat1 = codes_get(gid, "latitudeOfFirstGridPointInDegrees")
lon1 = codes_get(gid, "longitudeOfFirstGridPointInDegrees")
lat2 = codes_get(gid, "latitudeOfLastGridPointInDegrees")
lon2 = codes_get(gid, "longitudeOfLastGridPointInDegrees")
logging.info(f"Grille de ({lat1}, {lon1}) à ({lat2}, {lon2})")
# Incréments
dlat = codes_get(gid, "jDirectionIncrementInDegrees")
dlon = codes_get(gid, "iDirectionIncrementInDegrees")
logging.info(f"Incréments : {dlon}° en longitude, {dlat}° en latitude")
# Reconstruire les coordonnées
lats = np.linspace(lat1, lat2, Nj)
lons = np.linspace(lon1, lon2, Ni)
# ---------------
# ATTENTION : data are enumerated longitude first, then latitude
# We need to use meshgrid to get the correct order
lon_grid, lat_grid = np.meshgrid(lons, lats)
# ---------------
# Convert to numpy arrays
lats = lat_grid.flatten()
lons = lon_grid.flatten()
# Convert to Lambert 72 E
xy = transform_coordinates(np.vstack([lons, lats]).T, "EPSG:4326", EPSG, chunk_size= 50_000)
#
x, y = xy[:, 0], xy[:, 1]
# reshape the coordinates
x_grid = np.reshape(x, (Nj, Ni))
y_grid = np.reshape(y, (Nj, Ni))
return (x_grid, y_grid)
[docs]
def _get_corners_coordinates(self, filename: GribFiles,
run_date: str,
EPSG:str = 'EPSG:31370') -> tuple[tuple[np.ndarray, np.ndarray]]:
"""
Load GRIB data and compute coordinates.
:param filename: The GRIB file to process.
:param forecast: The forecast time.
:param download: Whether to download the file if it doesn't exist.
:param EPSG: The target EPSG code for the output coordinates.
:return: The coordinates (x, y) for (center, lower-left, lower-right, upper-right, upper-left).
"""
CHUNK_SIZE = 200_000
file_path = self.download_data(filename, run_date)
if not file_path.exists():
logging.error(f"File {file_path} does not exist.")
return np.array([])
with open(file_path, 'rb') as f:
gid = codes_grib_new_from_file(f)
# Type de grille
grid_type = codes_get(gid, "gridType")
logging.info(f"Type de grille : {grid_type}")
# Dimensions de la grille
Ni = codes_get(gid, "Ni") # nombre de points en longitude
Nj = codes_get(gid, "Nj") # nombre de points en latitude
logging.info(f"Dimensions : {Ni} x {Nj}")
# Coordonnées du premier et dernier point
lat1 = codes_get(gid, "latitudeOfFirstGridPointInDegrees")
lon1 = codes_get(gid, "longitudeOfFirstGridPointInDegrees")
lat2 = codes_get(gid, "latitudeOfLastGridPointInDegrees")
lon2 = codes_get(gid, "longitudeOfLastGridPointInDegrees")
logging.info(f"Grille de ({lat1}, {lon1}) à ({lat2}, {lon2})")
# Incréments
dlat = codes_get(gid, "jDirectionIncrementInDegrees")
dlon = codes_get(gid, "iDirectionIncrementInDegrees")
logging.info(f"Incréments : {dlon}° en longitude, {dlat}° en latitude")
# Reconstruire les coordonnées
lats = np.linspace(lat1, lat2, Ni)
lons = np.linspace(lon1, lon2, Nj)
# ---------------
# ATTENTION : data are enumerated longitude first, then latitude
# We need to use meshgrid to get the correct order
lon_grid, lat_grid = np.meshgrid(lons, lats)
# ---------------
lat_corners_ll = lat_grid.copy()
lat_corners_ul = lat_grid.copy()
lon_corners_ll = lon_grid.copy()
lon_corners_lr = lon_grid.copy()
# Estimate Corners by averaging the nearest neighbors
lat_corners_ll[1:, :] = (lat_grid[:-1, :] + lat_grid[1:, :]) / 2
lat_corners_ul[:-1, :] = (lat_grid[:-1, :] + lat_grid[1:, :]) / 2
lat_corners_ll[0, :] = lat_grid[0, :] - dlat / 2
lat_corners_ul[-1, :] = lat_grid[-1, :] + dlat / 2
lat_corners_lr = lat_corners_ll.copy()
lat_corners_ur = lat_corners_ul.copy()
lon_corners_ll[:, 1:] = (lon_grid[:, :-1] + lon_grid[:, 1:]) / 2
lon_corners_lr[:, :-1] = (lon_grid[:, :-1] + lon_grid[:, 1:]) / 2
lon_corners_ll[:, 0] = lon_grid[:, 0] - dlon / 2
lon_corners_lr[:, -1] = lon_grid[:, -1] + dlon / 2
lon_corners_ul = lon_corners_ll.copy()
lon_corners_ur = lon_corners_lr.copy()
# Convert to numpy arrays
lats = lat_grid.flatten()
lons = lon_grid.flatten()
lat_corners_ll = lat_corners_ll.flatten()
lon_corners_ll = lon_corners_ll.flatten()
lat_corners_ul = lat_corners_ul.flatten()
lon_corners_ul = lon_corners_ul.flatten()
lat_corners_lr = lat_corners_lr.flatten()
lon_corners_lr = lon_corners_lr.flatten()
lat_corners_ur = lat_corners_ur.flatten()
lon_corners_ur = lon_corners_ur.flatten()
# Convert to Lambert 72 E
xy_center = transform_coordinates(np.vstack([lons, lats]).T, "EPSG:4326", EPSG, chunk_size= CHUNK_SIZE)
# concatenate
all_corners = np.concatenate((np.vstack([lon_corners_ll, lat_corners_ll]).T,
np.vstack([lon_corners_ul, lat_corners_ul]).T,
np.vstack([lon_corners_lr, lat_corners_lr]).T,
np.vstack([lon_corners_ur, lat_corners_ur]).T))
all_corners = transform_coordinates(all_corners, "EPSG:4326", EPSG, chunk_size= CHUNK_SIZE)
# split
xy_corners_ll = all_corners[0:Ni*Nj, :]
xy_corners_ul = all_corners[Ni*Nj:2*Ni*Nj, :]
xy_corners_lr = all_corners[2*Ni*Nj:3*Ni*Nj, :]
xy_corners_ur = all_corners[3*Ni*Nj:4*Ni*Nj, :]
#
x, y = xy_center[:, 0], xy_center[:, 1]
# reshape the coordinates
x_grid = np.reshape(x, (Nj, Ni))
y_grid = np.reshape(y, (Nj, Ni))
x_ll, y_ll = xy_corners_ll[:, 0], xy_corners_ll[:, 1]
x_ul, y_ul = xy_corners_ul[:, 0], xy_corners_ul[:, 1]
x_lr, y_lr = xy_corners_lr[:, 0], xy_corners_lr[:, 1]
x_ur, y_ur = xy_corners_ur[:, 0], xy_corners_ur[:, 1]
x_ll = np.reshape(x_ll, (Nj, Ni))
y_ll = np.reshape(y_ll, (Nj, Ni))
x_ul = np.reshape(x_ul, (Nj, Ni))
y_ul = np.reshape(y_ul, (Nj, Ni))
x_lr = np.reshape(x_lr, (Nj, Ni))
y_lr = np.reshape(y_lr, (Nj, Ni))
x_ur = np.reshape(x_ur, (Nj, Ni))
y_ur = np.reshape(y_ur, (Nj, Ni))
return (x_grid, y_grid), (x_ll, y_ll), (x_lr, y_lr), (x_ur, y_ur), (x_ul, y_ul)
[docs]
def _prepare_gdf_from_grib(self, filename: GribFiles,
run_date: str,
EPSG:str = 'EPSG:31370') -> gpd.GeoDataFrame:
"""
Prepare a GeoDataFrame from grib file.
:param filename: The GRIB file to process.
:param forecast: The forecast time.
:param EPSG: The target EPSG code for the output coordinates.
:return: The GeoDataFrame with polygons for each grid cell.
"""
file_path = self.download_data(filename, run_date)
if not file_path.exists():
logging.error(f"File {file_path} does not exist.")
return np.array([])
(x_grid, y_grid), (x_ll, y_ll), (x_lr, y_lr), (x_ur, y_ur), (x_ul, y_ul) = self._get_corners_coordinates(filename, run_date, EPSG)
# Create polygons around the grid points
# Corners are between grid points, so we need to create polygons
Ni, Nj = x_grid.shape
polygons = [Polygon([(x_ll[i, j], y_ll[i, j]),
(x_lr[i, j], y_lr[i, j]),
(x_ur[i, j], y_ur[i, j]),
(x_ul[i, j], y_ul[i, j])])
for i in range(Ni) for j in range(Nj)]
data = {}
data['centroid_x'] = x_grid.flatten()
data['centroid_y'] = y_grid.flatten()
self._gdf = gpd.GeoDataFrame(data, geometry=polygons, crs=EPSG)
self._gdf_cache = self._gdf.copy()
return self._gdf
[docs]
def _prepare_Zones_from_grib(self, filename: GribFiles,
run_date: str,
EPSG:str = 'EPSG:31370') -> Zones:
"""
Prepare a Zones from brib file.
:param filename: The GRIB file to process.
:param forecast: The forecast time.
:param EPSG: The target EPSG code for the output coordinates.
:return: The Zones with polygons for each grid cell.
"""
file_path = self.download_data(filename, run_date)
if not file_path.exists():
logging.error(f"File {file_path} does not exist.")
return np.array([])
(x_grid, y_grid), (x_ll, y_ll), (x_lr, y_lr), (x_ur, y_ur), (x_ul, y_ul) = self._get_corners_coordinates(filename, run_date, EPSG)
# Create polygons around the grid points
# Corners are between grid points, so we need to create polygons
self._zones = Zones(idx = 'Alaro forecasts')
Ni, Nj = x_grid.shape
polygons = [Polygon([(x_ll[i, j], y_ll[i, j]),
(x_lr[i, j], y_lr[i, j]),
(x_ur[i, j], y_ur[i, j]),
(x_ul[i, j], y_ul[i, j])])
for i in range(Ni) for j in range(Nj)]
for i in range(Ni):
for j in range(Nj):
loczone = zone(name=f"Alaro_{i}_{j}")
self._zones.add_zone(loczone, forceparent = True)
locvec = vector(name=f"Alaro_{i}_{j}_polygon")
loczone.add_vector(locvec, forceparent=True)
locvec.add_vertex([wv(x_ll[i, j], y_ll[i, j]),
wv(x_lr[i, j], y_lr[i, j]),
wv(x_ur[i, j], y_ur[i, j]),
wv(x_ul[i, j], y_ul[i, j])])
locvec.force_to_close()
return self._zones
[docs]
def _load_grib_data(self, filename: GribFiles,
run_dates: str | list[str],
) -> gpd.GeoDataFrame:
"""
Load GRIB data from a file and add it to the GeoDataFrame.
:param filename: The GRIB file to process.
:param run_dates: The forecast run dates.
:return: The GeoDataFrame with added data.
"""
if isinstance(run_dates, str):
run_dates = [run_dates]
new_data = {}
for run_date in run_dates:
file_path = self.download_data(filename, run_date)
if not file_path.exists():
logging.error(f"File {file_path} does not exist.")
return self._gdf
with open(file_path, 'rb') as f:
while True:
gid = codes_grib_new_from_file(f)
if gid is None:
break
# Dimensions de la grille
Ni = codes_get(gid, "Ni") # nombre de points en longitude
Nj = codes_get(gid, "Nj") # nombre de points en latitude
validityDate = codes_get(gid, "validityDate")
validityTime = codes_get(gid, "validityTime") // 100
# Valeurs du champ (ex. température, pression)
values = codes_get_values(gid)
# Reshape des valeurs si nécessaire
# data = np.reshape(values, (Nj, Ni))
new_data[f"{run_date}_{validityDate}_{validityTime:02d}"] = values
codes_release(gid)
return new_data
[docs]
def reset_gdf(self):
""" Reset the GeoDataFrame. """
self._gdf = self._gdf_cache.copy() if self._gdf_cache is not None else None
self._gdf_diff = None
[docs]
def load_grib_data_to_gdf(self, filename: GribFiles,
run_dates: str | list[str],
) -> gpd.GeoDataFrame:
"""
Load GRIB data from a file and add it to the GeoDataFrame.
:param filename: The GRIB file to process.
:param run_dates: The forecast run dates.
:return: The GeoDataFrame with added data.
"""
if isinstance(run_dates, str):
run_dates = [run_dates]
new_data = self._load_grib_data(filename, run_dates)
if new_data is None:
logging.error(f"No data found for {run_dates}.")
return self._gdf
if self._gdf is None:
self._prepare_gdf_from_grib(filename, run_dates[0])
if len(new_data) == 0:
logging.error(f"No data found for {run_dates}.")
return self._gdf
new_gdf = gpd.GeoDataFrame(new_data)
# Concatenate gdf and new_gdf
self._gdf = pd.concat([self._gdf, new_gdf], axis=1)
return self._gdf
[docs]
def load_grib_data_to_Zones(self, filename: GribFiles,
run_dates: str | list[str],
) -> gpd.GeoDataFrame:
"""
Load GRIB data from a file and add it to the Zones.
:param filename: The GRIB file to process.
:param run_dates: The forecast run dates.
:return: The Zones with added data.
"""
new_data = self._load_grib_data(filename, run_dates)
if self._zones is None:
self._prepare_zones_from_grib(filename, run_dates[0])
if len(new_data) == 0:
logging.error(f"No data found for {run_dates}.")
return self._zones
# Put values in zones
for key, values in new_data.items():
self._zones.add_values(key, values)
return self._zones
[docs]
def forecasts_to_arrays(self, forecasts: str | list[str] = None) -> list[WolfArray]:
""" Set the forecasts to the WolfArray.
:param forecasts: List of forecast columns to convert to WolfArray. If None, all forecast columns are used.
"""
if forecasts is None:
forecasts = self.get_forecast_columns()
elif isinstance(forecasts, str):
forecasts = [forecasts]
# Create a numpy array from centroid and values from forecast column
xyz = np.zeros((self._gdf.shape[0], 3))
xyz[:,0] = self._gdf["centroid_x"].values
xyz[:,1] = self._gdf["centroid_y"].values
arrays = []
for forecast in forecasts:
if not forecast in self._gdf.columns:
logging.error(f"Forecast {forecast} not found in GeoDataFrame columns.")
continue
xyz[:,2] = self._gdf[forecast].values
new_array = WolfArray(mold=self._array, idx=forecast)
new_array.fillin_from_xyz(xyz)
arrays.append(new_array)
return arrays
[docs]
def get_forecast_columns(self, diff:bool = False) -> list[str]:
""" Get the list of forecast columns in the GeoDataFrame.
Exclude 'geometry', 'centroid_x', 'centroid_y' columns.
:param diff: If True, get columns from the diff GeoDataFrame.
"""
if diff:
if self._gdf_diff is None:
self._compute_diff()
columns = self._gdf_diff.columns
# pop 'geometry', 'centroid_x', 'centroid_y'
columns = columns[columns != 'centroid_x']
columns = columns[columns != 'centroid_y']
columns = columns[columns != 'geometry']
columns = [col for col in columns if col is not None]
else:
if self._gdf is None:
logging.error("No data loaded. Please load GRIB data first.")
return []
columns = self._gdf.columns
# pop 'geometry', 'centroid_x', 'centroid_y'
columns = columns[columns != 'centroid_x']
columns = columns[columns != 'centroid_y']
columns = columns[columns != 'geometry']
columns = [col for col in columns if col is not None]
return list(columns)
[docs]
def _compute_diff(self):
""" Compute local rain by difference in total cumulated rainfall """
if self._gdf is None:
logging.error("No data loaded. Please load GRIB data first.")
return None
self._gdf_diff = self._gdf.copy()
columns = self.get_forecast_columns(diff=True)
# all date and hours
dates_hours = list(set([col.split('_')[1] + '_' + col.split('_')[2] for col in columns]))
rundates = list(set([col.split('_')[0] for col in columns]))
rundates.sort()
dates_hours.sort()
columns_lists = []
for datehour in dates_hours:
columns_lists.append([f"{rundate}_{datehour}" if f"{rundate}_{datehour}" in columns else None for rundate in rundates])
# Diff all columns
ref = columns_lists[0]
for cols in columns_lists[1:]:
for loccol, locref in zip(cols, ref):
self._gdf_diff[loccol] = 0.0
if loccol is not None and locref is not None:
self._gdf_diff[locref] = self._gdf[loccol] - self._gdf[locref]
if self._gdf_diff[locref].min() < 0.:
logging.debug(f"Negative values found in column {loccol} : {self._gdf_diff[locref].min()}")
ref = cols
[docs]
def _create_animation(self, filename: GribFiles,
run_date: str,
bounds: list[list[float], list[float]] | str = 'Belgium',
vmin: float = None,
vmax: float = None,
factor: float = 1.0
) -> animation.FuncAnimation:
"""
Create a video from the GeoDataFrame data.
:param filename: The GRIB file to process.
:param run_date: The forecast run date to visualize.
:param bounds: Bounds for the plot. Can be 'Belgium' or a list of [xlim, ylim].
:param vmin: Minimum value for color scaling.
:param vmax: Maximum value for color scaling.
:param factor: Factor to multiply the data values for scaling.
:return: The animation object.
"""
self.load_grib_data_to_gdf(filename, run_date)
from matplotlib.collections import PatchCollection
from matplotlib.patches import Polygon as MplPolygon
from matplotlib.cm import ScalarMappable
from mpl_toolkits.axes_grid1 import make_axes_locatable
fig, ax = plt.subplots(figsize=(10, 10))
columns = self.get_forecast_columns()
# Filter columns containing the "run_date" string
columns = [col for col in columns if col.startswith(run_date)]
if vmin is None:
vmin = self._gdf[columns].min().min()
if vmax is None:
vmax = self._gdf[columns].max().max()
vmin *= factor
vmax *= factor
norm = plt.Normalize(vmin=vmin, vmax=vmax)
colors = self._colormap(norm(self._gdf[columns[0]].values * factor))
# Création d'un axe pour la colorbar sans modifier ax
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.1)
# Ajout de la colorbar
sm = ScalarMappable(norm=norm, cmap=self._colormap)
cbar = fig.colorbar(sm, cax=cax,)
patches = [MplPolygon(geom.exterior.coords, closed=True) for geom in self._gdf.geometry]
collection = PatchCollection(patches, facecolor=colors, edgecolor='black')
ax.add_collection(collection)
self._cities.plot(ax=ax, facecolor = 'none', edgecolor='black', linewidth=0.5)
if bounds == 'Belgium':
ax.set_xlim(self._cities.total_bounds[[0, 2]])
ax.set_ylim(self._cities.total_bounds[[1, 3]])
elif isinstance(bounds, list) and len(bounds) == 4:
ax.set_xlim(bounds[0][0], bounds[0][1])
ax.set_ylim(bounds[1][0], bounds[1][1])
else:
ax.set_xlim(self._gdf.total_bounds[[0, 2]])
ax.set_ylim(self._gdf.total_bounds[[1, 3]])
def update(column):
new_colors = self._colormap(norm(self._gdf[column].values * factor))
collection.set_color(new_colors)
ax.set_title(f"Total Precip. for {_convert_col2date_str(column)} - [mm] - time UTC")
if wx.GetApp() is not None:
wx.GetApp().Yield() # Allow GUI to update
ani = animation.FuncAnimation(fig, update, frames=columns, repeat=False)
return ani
[docs]
def _create_comparison_animation(self, filename: GribFiles,
run_dates: list[str],
size = 10,
bounds: list[list[float], list[float]] | str = 'Belgium',
vmin:float = None,
vmax:float = None,
factor: float = 1.0) -> animation.FuncAnimation:
"""
Create a video from the GeoDataFrame data.
:param filename: The GRIB file to process.
:param run_dates: List of forecast run dates to compare.
:param size: Size of each subplot.
:param bounds: Bounds for the plot. Can be 'Belgium' or a list of [xlim, ylim].
:param vmin: Minimum value for color scaling.
:param vmax: Maximum value for color scaling.
:param factor: Factor to multiply the data values for scaling.
:return: The animation object.
"""
assert len(run_dates) > 1, "At least two forecasts must be provided."
self.load_grib_data_to_gdf(filename, run_dates)
from matplotlib.collections import PatchCollection
from matplotlib.patches import Polygon as MplPolygon
from matplotlib.cm import ScalarMappable
from mpl_toolkits.axes_grid1 import make_axes_locatable
fig, axes = plt.subplots(ncols=len(run_dates), figsize=(len(run_dates) * size, size))
fig.suptitle(_('Rain intensity [mm/h]'))
columns = self.get_forecast_columns(diff=False)
# all date and hours
dates_hours = list(set([col.split('_')[1] + '_' + col.split('_')[2] for col in columns]))
dates_hours.sort()
columns_lists = []
for datehour in dates_hours:
columns_lists.append([f"{forecast}_{datehour}" if f"{forecast}_{datehour}" in columns else None for forecast in run_dates])
if vmin is None:
vmin = self._gdf[columns].min().min()
if vmax is None:
vmax = self._gdf[columns].max().max()
vmin *= factor
vmax *= factor
norm = plt.Normalize(vmin=vmin, vmax=vmax)
colors = self._colormap(norm(self._gdf[columns[0]].values * factor))
# Création d'un axe pour la colorbar sans modifier ax
divider = make_axes_locatable(axes[-1])
cax = divider.append_axes("right", size="5%", pad=0.1)
# Ajout de la colorbar
sm = ScalarMappable(norm=norm, cmap=self._colormap)
cbar = fig.colorbar(sm, cax=cax,)
patches = []
collection = []
for idx, forecast in enumerate(run_dates):
patches.append([MplPolygon(geom.exterior.coords, closed=True) for geom in self._gdf.geometry])
collection.append(PatchCollection(patches[idx], facecolor=colors, edgecolor='black'))
ax = axes[idx]
ax.add_collection(collection[idx])
# Plot cities but just the edges
self._cities.plot(ax=ax, facecolor = 'none', edgecolor='black', linewidth=0.5)
if bounds == 'Belgium':
ax.set_xlim(self._cities.total_bounds[[0, 2]])
ax.set_ylim(self._cities.total_bounds[[1, 3]])
elif isinstance(bounds, list) and len(bounds) == 4:
ax.set_xlim(bounds[0][0], bounds[0][1])
ax.set_ylim(bounds[1][0], bounds[1][1])
else:
ax.set_xlim(self._gdf.total_bounds[[0, 2]])
ax.set_ylim(self._gdf.total_bounds[[1, 3]])
ax.set_aspect('equal')
fig.tight_layout()
def update(column):
for idx, ax in enumerate(axes):
if column[idx] is None:
# All white
new_colors = np.ones((self._gdf.shape[0], 4))
else:
new_colors = self._colormap(norm(self._gdf[column[idx]].values * factor))
collection[idx].set_color(new_colors)
ax.set_title(f"{_convert_col2date_str(column[idx])} - time UTC")
if wx.GetApp() is not None:
wx.GetApp().Yield() # Allow GUI to update
# ax.set_title(f"Data for {column}")
ani = animation.FuncAnimation(fig, update, frames=columns_lists, repeat=False)
return ani
[docs]
def _create_comparison_animation_diff(self, filename: GribFiles,
run_dates: list[str],
size = 10,
bounds: list[list[float], list[float]] | str = 'Belgium',
vmin:float = None,
vmax:float = None,
factor: float = 1.0) -> animation.FuncAnimation:
"""
Create a video from the GeoDataFrame data.
:param filename: The GRIB file to process.
:param run_dates: List of forecast run dates to compare.
:param size: Size of each subplot.
:param bounds: Bounds for the plot. Can be 'Belgium' or a list of [xlim, ylim].
:param vmin: Minimum value for color scaling.
:param vmax: Maximum value for color scaling.
:param factor: Factor to multiply the data values for scaling.
:return: The animation object.
"""
assert len(run_dates) > 1, "At least two forecasts must be provided."
self.load_grib_data_to_gdf(filename, run_dates)
self._compute_diff()
from matplotlib.collections import PatchCollection
from matplotlib.patches import Polygon as MplPolygon
from matplotlib.cm import ScalarMappable
from mpl_toolkits.axes_grid1 import make_axes_locatable
fig, axes = plt.subplots(ncols=len(run_dates), figsize=(len(run_dates) * size, size))
fig.suptitle(_('Rain intensity [mm/h]'))
columns = self.get_forecast_columns(diff=True)
# all date and hours
dates_hours = list(set([col.split('_')[1] + '_' + col.split('_')[2] for col in columns]))
dates_hours.sort()
columns_lists = []
for datehour in dates_hours:
columns_lists.append([f"{forecast}_{datehour}" if f"{forecast}_{datehour}" in columns else None for forecast in run_dates])
if vmin is None:
vmin = self._gdf_diff[columns].min().min()
if vmax is None:
vmax = self._gdf_diff[columns].max().max()
vmin *= factor
vmax *= factor
norm = plt.Normalize(vmin=vmin, vmax=vmax)
colors = self._colormap(norm(self._gdf_diff[columns[0]].values * factor))
# Création d'un axe pour la colorbar sans modifier ax
divider = make_axes_locatable(axes[-1])
cax = divider.append_axes("right", size="5%", pad=0.1)
# Ajout de la colorbar
sm = ScalarMappable(norm=norm, cmap=self._colormap)
cbar = fig.colorbar(sm, cax=cax,)
patches = []
collection = []
for idx, forecast in enumerate(run_dates):
patches.append([MplPolygon(geom.exterior.coords, closed=True) for geom in self._gdf_diff.geometry])
collection.append(PatchCollection(patches[idx], facecolor=colors, edgecolor='black'))
ax = axes[idx]
ax.add_collection(collection[idx])
# Plot cities but just the edges
self._cities.plot(ax=ax, facecolor = 'none', edgecolor='black', linewidth=0.5)
if bounds == 'Belgium':
ax.set_xlim(self._cities.total_bounds[[0, 2]])
ax.set_ylim(self._cities.total_bounds[[1, 3]])
elif isinstance(bounds, list) and len(bounds) == 4:
ax.set_xlim(bounds[0][0], bounds[0][1])
ax.set_ylim(bounds[1][0], bounds[1][1])
else:
ax.set_xlim(self._gdf_diff.total_bounds[[0, 2]])
ax.set_ylim(self._gdf_diff.total_bounds[[1, 3]])
ax.set_aspect('equal')
fig.tight_layout()
def update(column):
for idx, ax in enumerate(axes):
if column[idx] is None:
# All white
new_colors = np.ones((self._gdf_diff.shape[0], 4))
else:
new_colors = self._colormap(norm(self._gdf_diff[column[idx]].values * factor))
collection[idx].set_color(new_colors)
ax.set_title(_convert_col2date_str(column[idx]))
if wx.GetApp() is not None:
wx.GetApp().Yield() # Allow GUI to update
ani = animation.FuncAnimation(fig, update, frames=columns_lists, repeat=False)
return ani
def __del__(self):
"""
Destructor to ensure the FTP connection is closed.
"""
try:
self._ftp_close()
except Exception as e:
logging.error(f"Error closing FTP connection: {e}")
[docs]
def video_cumulated_rain(self, run_date:str, output_file:Path, fps:int = 2):
"""
Create a MP4 video comparison of cumulated rain forecasts.
:param forecast: The forecast date string.
:param output_file: The output MP4 file path.
:param fps: Frames per second for the video.
"""
output_file = Path(output_file)
if output_file.suffix != '.mp4':
output_file = output_file.with_suffix('.mp4')
ani = self._create_animation(GribFiles.FILE_TotPrecip, run_date, factor=1000.)
output_file.parent.mkdir(parents=True, exist_ok=True)
ani.save(output_file, writer='ffmpeg', fps=fps)
return output_file
[docs]
def videos_cumulated_rain_allforecasts(self, output_dir:Path, fps:int = 2, run_dates: str | list[str] = None) -> list[Path] | None:
"""
Create a MP4 video comparison of cumulated rain forecasts.
:param output_dir: The output directory for the MP4 files.
:param fps: Frames per second for the video.
"""
output_dir = Path(output_dir)
if run_dates is not None:
if isinstance(run_dates, str):
files = [run_dates]
files = run_dates
else:
files = self.run_dates
if files:
videos_out = []
output_dir.mkdir(parents=True, exist_ok=True)
for forecast in files:
output_file = output_dir / f"Alaro_cumulated_rain_{forecast}.mp4"
self.video_cumulated_rain(forecast, output_file, fps=fps)
videos_out.append(output_file)
return videos_out
return None
[docs]
def video_gradient_cumulated_rain_compare(self, output_file:Path, fps:int = 2, run_dates: str | list[str] = None) -> Path | None:
"""
Create a MP4 video comparison of cumulated rain forecasts.
:param output_file: The output MP4 file path.
:param fps: Frames per second for the video.
"""
output_file = Path(output_file)
if output_file.suffix != '.mp4':
output_file = output_file.with_suffix('.mp4')
if run_dates is None:
files = self.list_run_dates()
elif isinstance(run_dates, str):
files = [run_dates]
else:
files = run_dates
if files:
ani = self._create_comparison_animation_diff(GribFiles.FILE_TotPrecip, files, factor=1000.)
output_file.parent.mkdir(parents=True, exist_ok=True)
ani.save(output_file, writer='ffmpeg', fps=fps)
return output_file
return None
[docs]
def convert_gdf2dataframe(self, X:float, Y:float, use_diff:bool = False) -> pd.DataFrame:
"""
Convert the GeoDataFrame to a Pandas DataFrame for a given point (X, Y).
:param X: The X coordinate.
:param Y: The Y coordinate.
:return: The Pandas DataFrame with the data for the nearest grid cell.
"""
if self._gdf is None:
logging.error("No data loaded. Please load GRIB data first.")
return pd.DataFrame()
if use_diff and self._gdf_diff is None:
self._compute_diff()
gdf = self._gdf_diff if use_diff else self._gdf
point = Point(X, Y)
# Find the nearest polygon
distances = gdf.geometry.distance(point)
idx_min = distances.idxmin()
# The data are currently stored columns wise, we want to return a Dataframe with 3 columns:
# 'forecast_date', 'value', 'run_date'
# where 'forecast_date' is the date of the forecast, 'value' is the
# value of the forecast, and 'run_date' is the date of the run.
# The index of the dataframe is a combination of 'run_date'_'forecast_date'
columns = self.get_forecast_columns()
df = pd.DataFrame({
'forecast_date': [_extract_dates_from_columnstr(col)[1] for col in columns],
'value': [gdf.loc[idx_min][col] for col in columns],
'run_date': [_extract_dates_from_columnstr(col)[0] for col in columns]
})
return df
[docs]
def _plot4XY(self, X:float, Y:float, factor:float = 1., size:tuple[int, int]=(10, 5), use_diff:bool = False, figax:tuple[plt.Figure, plt.Axes] = None) -> plt.Figure:
"""
Plot the data for a given point (X, Y).
:param X: The X coordinate.
:param Y: The Y coordinate.
:param factor: The factor to multiply the data values for scaling.
:param size: The size of the plot.
:return: The Matplotlib Figure object.
"""
df = self.convert_gdf2dataframe(X, Y, use_diff=use_diff)
if df.empty:
logging.error("No data available for the given point.")
return None
if figax is not None:
fig, ax = figax
else:
fig, ax = plt.subplots(figsize=size)
# Pivot the dataframe to have 'forecast_date' as index and 'run_date' as columns
df_pivot = df.pivot(index='forecast_date', columns='run_date', values='value')
# Plot each run_date as a separate line
for run_date in df_pivot.columns:
ax.plot(df_pivot.index, df_pivot[run_date] * factor, marker='o', label=run_date)
ax.set_title(f"Total precipitation at point ({X}, {Y})")
ax.set_xlabel("Forecast Date (time zone: UTC)")
ax.set_ylabel("Total precipitation [mm]")
ax.legend(title="Run Date")
ax.grid(True)
# Set xticks every rounded 6 hours
import matplotlib.dates as mdates
ax.set_xlim([df_pivot.index.min(), df_pivot.index.max()])
ax.xaxis.set_major_locator(mdates.HourLocator(interval=6))
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d %H+00'))
fig.autofmt_xdate()
return fig
[docs]
def _bar4XY(self, X:float, Y:float, factor:float = 1., size:tuple[int, int]=(10, 5), use_diff:bool = False, figax:tuple[plt.Figure, plt.Axes] = None) -> plt.Figure:
"""
Plot the data for a given point (X, Y).
:param X: The X coordinate.
:param Y: The Y coordinate.
:param factor: The factor to multiply the data values for scaling.
:param size: The size of the plot.
:return: The Matplotlib Figure object.
"""
df = self.convert_gdf2dataframe(X, Y, use_diff=use_diff)
if df.empty:
logging.error("No data available for the given point.")
return None
if figax is not None:
fig, ax = figax
else:
fig, ax = plt.subplots(figsize=size)
# Pivot the dataframe to have 'forecast_date' as index and 'run_date' as columns
df_pivot = df.pivot(index='forecast_date', columns='run_date', values='value')
# Plot each run_date as a separate line
for run_date in df_pivot.columns:
# Filter Nan value
used_range = df_pivot[run_date].notna()
ax.bar(df_pivot.index[used_range], df_pivot[run_date][used_range] * factor, width=td(seconds=3600), label=run_date, align='edge', alpha=0.7)
ax.set_title(f"Rain intensity at point ({X}, {Y})")
ax.set_xlabel("Forecast Date (time zone: UTC)")
ax.set_ylabel("Rain intensity [mm/h]")
# Set xticks every rounded 6 hours
import matplotlib.dates as mdates
ax.set_xlim([df_pivot.index.min(), df_pivot.index.max()])
ax.xaxis.set_major_locator(mdates.HourLocator(interval=6))
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d %H+00'))
ax.legend(title="Run Date")
ax.grid(True)
fig.autofmt_xdate()
return fig
[docs]
def plot_TotPrec4XY(self, X:float, Y:float, size:tuple[int, int]=(10, 5)) -> plt.Figure:
"""
Plot the total precipitation data for a given point (X, Y).
:param X: The X coordinate.
:param Y: The Y coordinate.
:param size: The size of the plot.
:return: The Matplotlib Figure object.
"""
if self._gdf is None:
self.load_grib_data_to_gdf(GribFiles.FILE_TotPrecip, self.run_dates)
return self._plot4XY(X, Y, factor=1000., size=size)
[docs]
def plot_RainIntensity4XY(self, X:float, Y:float, size:tuple[int, int]=(10, 5)) -> plt.Figure:
"""
Plot the rain intensity data for a given point (X, Y).
:param X: The X coordinate.
:param Y: The Y coordinate.
:param size: The size of the plot.
:return: The Matplotlib Figure object.
"""
if self._gdf is None:
self.load_grib_data_to_gdf(GribFiles.FILE_TotPrecip, self.run_dates)
fig = self._bar4XY(X, Y, factor=1000., size=size, use_diff=True)
return fig
[docs]
def plot_Rain_and_TotPrecip4XY(self, X:float, Y:float, size:tuple[int, int]=(10, 10)) -> plt.Figure:
"""
Plot the rain intensity and total precipitation data for a given point (X, Y).
:param X: The X coordinate.
:param Y: The Y coordinate.
:param size: The size of the plot.
:return: The Matplotlib Figure object.
"""
if self._gdf is None:
self.load_grib_data_to_gdf(GribFiles.FILE_TotPrecip, self.run_dates)
fig, (ax1, ax2) = plt.subplots(nrows=2, figsize=size)
self._plot4XY(X, Y, factor=1000., size=size, use_diff=False, figax=(fig, ax1))
self._bar4XY(X, Y, factor=1000., size=size, use_diff=True, figax=(fig, ax2))
fig.tight_layout()
return fig