import pandas as pd
import geopandas as gpd
import numpy as np
from osgeo import osr, gdal
from pyproj import Proj, Transformer
from pathlib import Path
from shapely.geometry import Point, Polygon
try:
from shapely import polygons as shapely_polygons
except ImportError:
[docs]
shapely_polygons = None
import matplotlib.pyplot as plt
from scipy.spatial import KDTree
import logging
from datetime import datetime, timedelta
import zarr
import xarray as xr
from ..maps import BELGIUM, WALLONIA
from ..PyTranslate import _
logging.basicConfig(level=logging.INFO)
[docs]
DATADIR = Path(r'E:\IRM\RADCLIM-Analogs-median') # Path to the ZARR IRM climate data directory - To change if needed
[docs]
FIELDS_FILE = "radclim-analogs-median.zarr"
[docs]
ANALOG_FILE = "precipitation-analogs.zarr"
[docs]
def convert_pixels_to_squares(pixels:tuple[np.ndarray, np.ndarray]) -> tuple[np.ndarray, KDTree]:
"""
From pixels coordinates, define squares around each pixel center.
Corners are defined as the average of the pixel center and its neighbors.
Returns a (NB, 4, 2) numpy array of corner coordinates and the KDTree.
Each row contains 4 corners: lower-left, lower-right, upper-right, upper-left.
"""
PIXEL_SIZE = 1000
half = PIXEL_SIZE / 2
NB = pixels[0].size
pixels_arr = np.column_stack((pixels[0].reshape(-1), pixels[1].reshape(-1)))
tree = KDTree(pixels_arr)
# Corner offsets: lower-left, lower-right, upper-right, upper-left
offsets = np.array([[-half, -half], [+half, -half], [+half, +half], [-half, +half]])
result_x = np.empty((4, NB))
result_y = np.empty((4, NB))
for ci in range(4):
query_pts = pixels_arr + offsets[ci]
_, idx = tree.query(query_pts, k=4, distance_upper_bound=PIXEL_SIZE * 1.1)
valid = idx != NB # (NB, 4)
counts = valid.sum(axis=1) # (NB,)
# Safe indexing: replace invalid indices with 0 (masked out below)
safe_idx = np.where(valid, idx, 0)
# Neighbor coordinates with NaN for invalid entries
neighbor_x = np.where(valid, pixels_arr[safe_idx, 0], np.nan)
neighbor_y = np.where(valid, pixels_arr[safe_idx, 1], np.nan)
mean_x = np.nanmean(neighbor_x, axis=1)
mean_y = np.nanmean(neighbor_y, axis=1)
default_x = pixels_arr[:, 0] + offsets[ci, 0]
default_y = pixels_arr[:, 1] + offsets[ci, 1]
# Start with means (covers count==4 and count==0 cases)
rx = mean_x.copy()
ry = mean_y.copy()
# count in {1, 3}: use default offsets
mask_13 = (counts == 1) | (counts == 3)
rx[mask_13] = default_x[mask_13]
ry[mask_13] = default_y[mask_13]
# count == 2: check neighbor alignment
mask_2 = counts == 2
if mask_2.any():
m2_idx = np.where(mask_2)[0]
valid_m2 = valid[m2_idx]
safe_idx_m2 = safe_idx[m2_idx]
# Find columns of the two valid neighbors
cumvalid = np.cumsum(valid_m2, axis=1)
first_col = np.argmax(valid_m2, axis=1)
second_col = np.argmax(cumvalid == 2, axis=1)
arange = np.arange(len(m2_idx))
idx0 = safe_idx_m2[arange, first_col]
idx1 = safe_idx_m2[arange, second_col]
dx_abs = np.abs(pixels_arr[idx0, 0] - pixels_arr[idx1, 0])
horiz = dx_abs < 100 # neighbors aligned vertically (same x)
rx[mask_2] = np.where(horiz, default_x[m2_idx], mean_x[m2_idx])
ry[mask_2] = np.where(horiz, mean_y[m2_idx], default_y[m2_idx])
# Fixup: if result coincides with pixel center, reset to default
rx = np.where(rx == pixels_arr[:, 0], default_x, rx)
ry = np.where(ry == pixels_arr[:, 1], default_y, ry)
result_x[ci] = rx
result_y[ci] = ry
# Build (NB, 4, 2) array
squares = np.stack([np.column_stack((result_x[ci], result_y[ci])) for ci in range(4)], axis=1)
return squares, tree
[docs]
class ClimateDataHourly:
"""
Based on data from IRM : https://essd.copernicus.org/articles/17/6405/2025/
Title : Hourly precipitation fields at 1 km resolution over Belgium from 1940 to 2016 based on the analog technique
Authors : Elke Debrie, Jonathan Demaeyer, and Stéphane Vannitsem
"""
def __init__(self, datadir=DATADIR, fields_file=FIELDS_FILE, analog_file=ANALOG_FILE):
""" Initialize the ClimateDataHourly object.
:param datadir: Directory where the ZARR files are located.
:param fields_file: Name of the ZARR file containing the fields dataset.
:param analog_file: Name of the ZARR file containing the analog dataset.
"""
[docs]
self.fields_file = fields_file
[docs]
self.analog_file = analog_file
@property
[docs]
def xy(self):
""" Return the spatial coordinates in Lambert 72 as a tuple of two 2D numpy arrays (x, y)
with the same shape as the spatial dimensions of the datasets.
"""
if self._xy_lbt72 is None:
raise ValueError(_("Data not loaded. Call load_data() first."))
return self._xy_lbt72
[docs]
def load_data(self, fields:bool=True, analogs:bool=True):
""" Load the datasets from ZARR files and build the spatial geometries.
:param fields: Whether to load the fields dataset.
:param analogs: Whether to load the analog dataset.
"""
logging.info(_("Loading climate data from ZARR files..."))
if fields:
if (self.datadir / self.fields_file).exists():
self.fields_ds = xr.open_zarr(self.datadir / self.fields_file)
else:
logging.warning(_("Fields file {self.datadir / self.fields_file} not found. Fields dataset will be None."))
if analogs:
if (self.datadir / self.analog_file).exists():
self.analog_ds = xr.open_zarr(self.datadir / self.analog_file)
else:
logging.warning(_("Analog file {self.datadir / self.analog_file} not found. Analog dataset will be None."))
if self.fields_ds is None and self.analog_ds is None:
logging.error(_("No datasets loaded. Check that the ZARR files are present in the specified directory."))
return
logging.info(_("Data loaded successfully."))
logging.info(_("Processing spatial coordinates and building geometries..."))
# convert latitude and longitude to Lambert 72 coordinates
lat_list = self.fields_ds['latitude'].values
lon_list = self.fields_ds['longitude'].values
self._xy_lbt72 = transform_latlon_to_lambert72_array(lat_list, lon_list)
polygons, kd = convert_pixels_to_squares(self._xy_lbt72)
if shapely_polygons is not None:
geometries = shapely_polygons(polygons)
else:
geometries = [Polygon(poly) for poly in polygons]
self._gdf = gpd.GeoDataFrame(geometry=geometries, crs='EPSG:31370')
# ad columns for indices of the pixels
ij = np.meshgrid(np.arange(self._xy_lbt72[0].shape[0]), np.arange(self._xy_lbt72[0].shape[1]), indexing='ij')
self._gdf['pixel_row'] = ij[0].flatten().astype(int)
self._gdf['pixel_col'] = ij[1].flatten().astype(int)
logging.info(_("Geometries and spatial index built successfully."))
[docs]
def _crop_gdf_to_region(self, region:Polygon):
""" Crop the GeoDataFrame to the given region by selecting only the pixels whose square intersects the region.
:param region: Polygon representing the area of interest in Lambert 72 coordinates
:return: Cropped GeoDataFrame containing only the pixels that intersect the region.
"""
if self._gdf is None:
raise ValueError(_("Data not loaded. Call load_data() first."))
# Select only the pixels whose square intersects the region
mask = self._gdf.intersects(region)
cropped_gdf = self._gdf[mask]
return cropped_gdf
[docs]
def _find_slices_for_region(self, region:Polygon):
""" Find the row and column slices that correspond to the bounding box of the given region.
:param region: Polygon representing the area of interest in Lambert 72 coordinates
:return: tuple of (row_slice, col_slice) that can be used to index the datasets.
"""
if self._gdf is None:
raise ValueError(_("Data not loaded. Call load_data() first."))
cropped_gdf = self._crop_gdf_to_region(region)
if cropped_gdf.empty:
raise ValueError(_("No pixels found in the specified region."))
row_min = cropped_gdf['pixel_row'].min()
row_max = cropped_gdf['pixel_row'].max() + 1
col_min = cropped_gdf['pixel_col'].min()
col_max = cropped_gdf['pixel_col'].max() + 1
return slice(row_min, row_max), slice(col_min, col_max)
[docs]
def get_surface_and_fractions_of_pixels_in_region(self, region:Polygon) -> tuple[np.ndarray, float]:
""" For a given region, compute the fraction of each pixel that is covered by the region, and the total surface of the region.
:param region: Polygon representing the area of interest in Lambert 72 coordinates
:return: tuple of (fractions, surface) where fractions is a numpy array of the same length as
the number of pixels, containing the fraction of each pixel covered by the region,
and surface is the total area of the region in square meters.
"""
if self._gdf is None:
raise ValueError(_("Data not loaded. Call load_data() first."))
fractions = np.zeros(self._gdf.shape[0])
surface = 0.
for idx, row in self._gdf.iterrows():
pixel_square = row.geometry
intersection_area = pixel_square.intersection(region).area
fractions[idx] = intersection_area / pixel_square.area
surface += intersection_area
# add a columns for the fractions in the gdf for debugging purposes
self._gdf[region] = fractions
return fractions, surface
[docs]
def crop_to_new_zarr(self, dirout:Path, region:Polygon, filename:str = FIELDS_FILE,
show_progress:bool = False, time_chunk_size:int = 720):
"""Crop the fields dataset to *region* and write an optimised zarr.
The output zarr uses a single spatial chunk (the full cropped
extent) and large temporal chunks (*time_chunk_size*, default
720 = 1 month hourly). This layout is ideal for later time-
series extraction: one file read per month instead of one per
6 time-steps.
:param dirout: Output directory.
:param region: Region polygon (Lambert 72).
:param filename: Output zarr name (without .zarr).
:param show_progress: Show a dask progress bar.
:param time_chunk_size: Number of time steps per chunk (720 = ~1 month hourly).
"""
if self.fields_ds is None:
raise ValueError(_("Data not loaded. Call load_data() first."))
row_slice, col_slice = self._find_slices_for_region(region)
# Resolve spatial dimensions from latitude/longitude coordinates.
if 'latitude' in self.fields_ds and self.fields_ds['latitude'].ndim >= 2:
row_dim, col_dim = self.fields_ds['latitude'].dims[-2:]
elif 'longitude' in self.fields_ds and self.fields_ds['longitude'].ndim >= 2:
row_dim, col_dim = self.fields_ds['longitude'].dims[-2:]
else:
raise ValueError(
"Unable to infer spatial dimensions from dataset coordinates. "
"Expected 2D 'latitude' or 'longitude' coordinates."
)
# Crop the fields dataset using positional indexing on named dims.
cropped_fields = self.fields_ds.isel({row_dim: row_slice, col_dim: col_slice})
# Optimised chunk layout: full spatial extent + large time chunks.
row_size = row_slice.stop - row_slice.start
col_size = col_slice.stop - col_slice.start
chunk_spec = {row_dim: row_size, col_dim: col_size}
# Identify the time dimension and rechunk it too.
time_dim = None
for dim in cropped_fields.dims:
if dim not in (row_dim, col_dim):
time_dim = dim
break
if time_dim is not None:
chunk_spec[time_dim] = time_chunk_size
cropped_fields = cropped_fields.chunk(chunk_spec)
# Remove inherited zarr chunk encoding to avoid mismatch with cropped dask chunks.
for name in cropped_fields.variables:
cropped_fields[name].encoding.pop('chunks', None)
# Save the cropped dataset to a new ZARR file
dirout.mkdir(parents=True, exist_ok=True)
write_task = cropped_fields.to_zarr(
dirout / str(filename).endswith('.zarr'),
mode='w',
zarr_version=2,
compute=False,
)
if show_progress:
from dask.diagnostics import ProgressBar
with ProgressBar():
write_task.compute()
else:
write_task.compute()
logging.info(_('Cropped zarr written to %s with chunks %s'),
dirout / f'{filename}.zarr', chunk_spec)
[docs]
def rechunk_source_zarr(self, dirout:Path, filename:str = FIELDS_FILE,
time_chunk_size:int = 720, lat_chunk_size:int = -1,
lon_chunk_size:int = -1, show_progress:bool = False):
"""Rechunk the full source zarr with an optimised layout.
The original IRM zarr uses chunks of (6, 19, 36) which causes
extremely poor read performance for time-series queries. This
method rewrites the dataset with much larger chunks.
:param dirout: Output directory.
:param filename: Output zarr name (without .zarr).
:param time_chunk_size: Time steps per chunk (720 = ~1 month).
:param lat_chunk_size: Latitude chunk size (-1 = full extent).
:param lon_chunk_size: Longitude chunk size (-1 = full extent).
:param show_progress: Show a dask progress bar.
"""
if self.fields_ds is None:
raise ValueError(_("Data not loaded. Call load_data() first."))
# Resolve spatial dimensions.
if 'latitude' in self.fields_ds and self.fields_ds['latitude'].ndim >= 2:
row_dim, col_dim = self.fields_ds['latitude'].dims[-2:]
elif 'longitude' in self.fields_ds and self.fields_ds['longitude'].ndim >= 2:
row_dim, col_dim = self.fields_ds['longitude'].dims[-2:]
else:
raise ValueError(
"Unable to infer spatial dimensions from dataset coordinates. "
"Expected 2D 'latitude' or 'longitude' coordinates."
)
lat_size = self.fields_ds.sizes[row_dim] if lat_chunk_size == -1 else lat_chunk_size
lon_size = self.fields_ds.sizes[col_dim] if lon_chunk_size == -1 else lon_chunk_size
chunk_spec = {row_dim: lat_size, col_dim: lon_size}
for dim in self.fields_ds.dims:
if dim not in (row_dim, col_dim):
chunk_spec[dim] = time_chunk_size
rechunked = self.fields_ds.chunk(chunk_spec)
for name in rechunked.variables:
rechunked[name].encoding.pop('chunks', None)
dirout.mkdir(parents=True, exist_ok=True)
out_path = dirout / str(filename).endswith('.zarr')
write_task = rechunked.to_zarr(out_path, mode='w', zarr_version=2, compute=False)
logging.info(_('Rechunking to %s with chunks %s ...'), out_path, chunk_spec)
if show_progress:
from dask.diagnostics import ProgressBar
with ProgressBar():
write_task.compute()
else:
write_task.compute()
logging.info(_('Rechunked zarr written to %s'), out_path)
def __str__(self):
""" Return a string representation of the ClimateDataHourly object, including paths and dataset info.
"""
paths = f"ClimateDataHourly(datadir={self.datadir}, fields_file={self.fields_file}, analog_file={self.analog_file})"
infos = f"Fields dataset: {self.fields_ds}\nAnalog dataset: {self.analog_ds}"
return f"{paths}\n{infos}"
[docs]
def get_precipitation(self, region:Polygon, time_interval:tuple[datetime, datetime]) -> pd.Series:
"""
Get precipitation for a given bounding box and time interval.
:region: Polygon representing the area of interest in Lambert 72 coordinates
:time_interval: (start_time, end_time) as a tuple of datetime objects
:return: Pandas Series with time as index and precipitation [mm/h] as values
"""
if self.fields_ds is None:
raise ValueError(_("Data not loaded. Call load_data() first."))
row_slice, col_slice = self._find_slices_for_region(region)
fractions, s = self.get_surface_and_fractions_of_pixels_in_region(region)
mask = self._gdf[region] > 0.
frac_np = np.zeros((row_slice.stop - row_slice.start, col_slice.stop - col_slice.start))
sub = self._gdf.loc[mask, ['pixel_row', 'pixel_col', region]]
rows = sub['pixel_row'].values.astype(int) - row_slice.start
cols = sub['pixel_col'].values.astype(int) - col_slice.start
frac_np[rows, cols] = sub[region].values
# Read data via zarr directly (bypasses dask overhead entirely).
# zarr's internal chunk-assembly is implemented in C and avoids
# the per-chunk Python task overhead that dask imposes.
zarr_path = Path(self.datadir / self.fields_file)
zarr_store = zarr.open(str(zarr_path), mode='r')
# Resolve time indices for direct slicing
time_coords = self.fields_ds['time'].values
t_start = np.datetime64(time_interval[0])
t_end = np.datetime64(time_interval[1])
time_mask = (time_coords >= t_start) & (time_coords <= t_end)
time_indices = np.where(time_mask)[0]
if len(time_indices) == 0:
return pd.Series(dtype=np.float64)
t_slice = slice(int(time_indices[0]), int(time_indices[-1]) + 1)
# Direct zarr array read — single contiguous slice, no dask graph
precip_np = zarr_store['tp_median'][t_slice,
row_slice.start:row_slice.stop,
col_slice.start:col_slice.stop]
times = time_coords[t_slice]
result = (precip_np * frac_np).sum(axis=(1, 2))
return pd.Series(result, index=times)