Source code for wolfhece.hydrology.climate_data_hourly

import pandas as pd
import geopandas as gpd

import numpy as np
from osgeo import osr, gdal
from pyproj import Proj, Transformer
from pathlib import Path

from shapely.geometry import Point, Polygon
try:
    from shapely import polygons as shapely_polygons
except ImportError:
[docs] shapely_polygons = None
import matplotlib.pyplot as plt from scipy.spatial import KDTree import logging from datetime import datetime, timedelta import zarr import xarray as xr from ..maps import BELGIUM, WALLONIA from ..PyTranslate import _ logging.basicConfig(level=logging.INFO)
[docs] DATADIR = Path(r'E:\IRM\RADCLIM-Analogs-median') # Path to the ZARR IRM climate data directory - To change if needed
[docs] FIELDS_FILE = "radclim-analogs-median.zarr"
[docs] ANALOG_FILE = "precipitation-analogs.zarr"
[docs] def transform_latlon_to_lambert72_array(lat_array:np.ndarray, lon_array:np.ndarray) -> tuple[np.ndarray, np.ndarray]: """ Transform arrays of EPSG:4258 coordinates to Lambert 72 coordinates. Coordinates from IRM are in EPSG:4258, and we want to convert them to Lambert 72 (EPSG:31370). :lat_array: Array of latitudes in EPSG:4258 :lon_array: Array of longitudes in EPSG:4258 :return: tuple of two arrays of x and y coordinates in Lambert 72, with the same shape as the input arrays """ t = Transformer.from_crs('EPSG:4258', 'EPSG:31370', always_xy=True) xy = np.array([t.transform(lon, lat) for lat, lon in zip(lat_array, lon_array)]) x = xy[:, 0] y = xy[:, 1] return x, y
[docs] def convert_pixels_to_squares(pixels:tuple[np.ndarray, np.ndarray]) -> tuple[np.ndarray, KDTree]: """ From pixels coordinates, define squares around each pixel center. Corners are defined as the average of the pixel center and its neighbors. Returns a (NB, 4, 2) numpy array of corner coordinates and the KDTree. Each row contains 4 corners: lower-left, lower-right, upper-right, upper-left. """ PIXEL_SIZE = 1000 half = PIXEL_SIZE / 2 NB = pixels[0].size pixels_arr = np.column_stack((pixels[0].reshape(-1), pixels[1].reshape(-1))) tree = KDTree(pixels_arr) # Corner offsets: lower-left, lower-right, upper-right, upper-left offsets = np.array([[-half, -half], [+half, -half], [+half, +half], [-half, +half]]) result_x = np.empty((4, NB)) result_y = np.empty((4, NB)) for ci in range(4): query_pts = pixels_arr + offsets[ci] _, idx = tree.query(query_pts, k=4, distance_upper_bound=PIXEL_SIZE * 1.1) valid = idx != NB # (NB, 4) counts = valid.sum(axis=1) # (NB,) # Safe indexing: replace invalid indices with 0 (masked out below) safe_idx = np.where(valid, idx, 0) # Neighbor coordinates with NaN for invalid entries neighbor_x = np.where(valid, pixels_arr[safe_idx, 0], np.nan) neighbor_y = np.where(valid, pixels_arr[safe_idx, 1], np.nan) mean_x = np.nanmean(neighbor_x, axis=1) mean_y = np.nanmean(neighbor_y, axis=1) default_x = pixels_arr[:, 0] + offsets[ci, 0] default_y = pixels_arr[:, 1] + offsets[ci, 1] # Start with means (covers count==4 and count==0 cases) rx = mean_x.copy() ry = mean_y.copy() # count in {1, 3}: use default offsets mask_13 = (counts == 1) | (counts == 3) rx[mask_13] = default_x[mask_13] ry[mask_13] = default_y[mask_13] # count == 2: check neighbor alignment mask_2 = counts == 2 if mask_2.any(): m2_idx = np.where(mask_2)[0] valid_m2 = valid[m2_idx] safe_idx_m2 = safe_idx[m2_idx] # Find columns of the two valid neighbors cumvalid = np.cumsum(valid_m2, axis=1) first_col = np.argmax(valid_m2, axis=1) second_col = np.argmax(cumvalid == 2, axis=1) arange = np.arange(len(m2_idx)) idx0 = safe_idx_m2[arange, first_col] idx1 = safe_idx_m2[arange, second_col] dx_abs = np.abs(pixels_arr[idx0, 0] - pixels_arr[idx1, 0]) horiz = dx_abs < 100 # neighbors aligned vertically (same x) rx[mask_2] = np.where(horiz, default_x[m2_idx], mean_x[m2_idx]) ry[mask_2] = np.where(horiz, mean_y[m2_idx], default_y[m2_idx]) # Fixup: if result coincides with pixel center, reset to default rx = np.where(rx == pixels_arr[:, 0], default_x, rx) ry = np.where(ry == pixels_arr[:, 1], default_y, ry) result_x[ci] = rx result_y[ci] = ry # Build (NB, 4, 2) array squares = np.stack([np.column_stack((result_x[ci], result_y[ci])) for ci in range(4)], axis=1) return squares, tree
[docs] class ClimateDataHourly: """ Based on data from IRM : https://essd.copernicus.org/articles/17/6405/2025/ Title : Hourly precipitation fields at 1 km resolution over Belgium from 1940 to 2016 based on the analog technique Authors : Elke Debrie, Jonathan Demaeyer, and Stéphane Vannitsem """ def __init__(self, datadir=DATADIR, fields_file=FIELDS_FILE, analog_file=ANALOG_FILE): """ Initialize the ClimateDataHourly object. :param datadir: Directory where the ZARR files are located. :param fields_file: Name of the ZARR file containing the fields dataset. :param analog_file: Name of the ZARR file containing the analog dataset. """
[docs] self.datadir = datadir
[docs] self.fields_file = fields_file
[docs] self.analog_file = analog_file
[docs] self.fields_ds = None
[docs] self.analog_ds = None
[docs] self._xy_lbt72 = None
[docs] self._gdf = None
@property
[docs] def xy(self): """ Return the spatial coordinates in Lambert 72 as a tuple of two 2D numpy arrays (x, y) with the same shape as the spatial dimensions of the datasets. """ if self._xy_lbt72 is None: raise ValueError(_("Data not loaded. Call load_data() first.")) return self._xy_lbt72
[docs] def load_data(self, fields:bool=True, analogs:bool=True): """ Load the datasets from ZARR files and build the spatial geometries. :param fields: Whether to load the fields dataset. :param analogs: Whether to load the analog dataset. """ logging.info(_("Loading climate data from ZARR files...")) if fields: if (self.datadir / self.fields_file).exists(): self.fields_ds = xr.open_zarr(self.datadir / self.fields_file) else: logging.warning(_("Fields file {self.datadir / self.fields_file} not found. Fields dataset will be None.")) if analogs: if (self.datadir / self.analog_file).exists(): self.analog_ds = xr.open_zarr(self.datadir / self.analog_file) else: logging.warning(_("Analog file {self.datadir / self.analog_file} not found. Analog dataset will be None.")) if self.fields_ds is None and self.analog_ds is None: logging.error(_("No datasets loaded. Check that the ZARR files are present in the specified directory.")) return logging.info(_("Data loaded successfully.")) logging.info(_("Processing spatial coordinates and building geometries...")) # convert latitude and longitude to Lambert 72 coordinates lat_list = self.fields_ds['latitude'].values lon_list = self.fields_ds['longitude'].values self._xy_lbt72 = transform_latlon_to_lambert72_array(lat_list, lon_list) polygons, kd = convert_pixels_to_squares(self._xy_lbt72) if shapely_polygons is not None: geometries = shapely_polygons(polygons) else: geometries = [Polygon(poly) for poly in polygons] self._gdf = gpd.GeoDataFrame(geometry=geometries, crs='EPSG:31370') # ad columns for indices of the pixels ij = np.meshgrid(np.arange(self._xy_lbt72[0].shape[0]), np.arange(self._xy_lbt72[0].shape[1]), indexing='ij') self._gdf['pixel_row'] = ij[0].flatten().astype(int) self._gdf['pixel_col'] = ij[1].flatten().astype(int) logging.info(_("Geometries and spatial index built successfully."))
[docs] def _crop_gdf_to_region(self, region:Polygon): """ Crop the GeoDataFrame to the given region by selecting only the pixels whose square intersects the region. :param region: Polygon representing the area of interest in Lambert 72 coordinates :return: Cropped GeoDataFrame containing only the pixels that intersect the region. """ if self._gdf is None: raise ValueError(_("Data not loaded. Call load_data() first.")) # Select only the pixels whose square intersects the region mask = self._gdf.intersects(region) cropped_gdf = self._gdf[mask] return cropped_gdf
[docs] def _find_slices_for_region(self, region:Polygon): """ Find the row and column slices that correspond to the bounding box of the given region. :param region: Polygon representing the area of interest in Lambert 72 coordinates :return: tuple of (row_slice, col_slice) that can be used to index the datasets. """ if self._gdf is None: raise ValueError(_("Data not loaded. Call load_data() first.")) cropped_gdf = self._crop_gdf_to_region(region) if cropped_gdf.empty: raise ValueError(_("No pixels found in the specified region.")) row_min = cropped_gdf['pixel_row'].min() row_max = cropped_gdf['pixel_row'].max() + 1 col_min = cropped_gdf['pixel_col'].min() col_max = cropped_gdf['pixel_col'].max() + 1 return slice(row_min, row_max), slice(col_min, col_max)
[docs] def get_surface_and_fractions_of_pixels_in_region(self, region:Polygon) -> tuple[np.ndarray, float]: """ For a given region, compute the fraction of each pixel that is covered by the region, and the total surface of the region. :param region: Polygon representing the area of interest in Lambert 72 coordinates :return: tuple of (fractions, surface) where fractions is a numpy array of the same length as the number of pixels, containing the fraction of each pixel covered by the region, and surface is the total area of the region in square meters. """ if self._gdf is None: raise ValueError(_("Data not loaded. Call load_data() first.")) fractions = np.zeros(self._gdf.shape[0]) surface = 0. for idx, row in self._gdf.iterrows(): pixel_square = row.geometry intersection_area = pixel_square.intersection(region).area fractions[idx] = intersection_area / pixel_square.area surface += intersection_area # add a columns for the fractions in the gdf for debugging purposes self._gdf[region] = fractions return fractions, surface
[docs] def crop_to_new_zarr(self, dirout:Path, region:Polygon, filename:str = FIELDS_FILE, show_progress:bool = False, time_chunk_size:int = 720): """Crop the fields dataset to *region* and write an optimised zarr. The output zarr uses a single spatial chunk (the full cropped extent) and large temporal chunks (*time_chunk_size*, default 720 = 1 month hourly). This layout is ideal for later time- series extraction: one file read per month instead of one per 6 time-steps. :param dirout: Output directory. :param region: Region polygon (Lambert 72). :param filename: Output zarr name (without .zarr). :param show_progress: Show a dask progress bar. :param time_chunk_size: Number of time steps per chunk (720 = ~1 month hourly). """ if self.fields_ds is None: raise ValueError(_("Data not loaded. Call load_data() first.")) row_slice, col_slice = self._find_slices_for_region(region) # Resolve spatial dimensions from latitude/longitude coordinates. if 'latitude' in self.fields_ds and self.fields_ds['latitude'].ndim >= 2: row_dim, col_dim = self.fields_ds['latitude'].dims[-2:] elif 'longitude' in self.fields_ds and self.fields_ds['longitude'].ndim >= 2: row_dim, col_dim = self.fields_ds['longitude'].dims[-2:] else: raise ValueError( "Unable to infer spatial dimensions from dataset coordinates. " "Expected 2D 'latitude' or 'longitude' coordinates." ) # Crop the fields dataset using positional indexing on named dims. cropped_fields = self.fields_ds.isel({row_dim: row_slice, col_dim: col_slice}) # Optimised chunk layout: full spatial extent + large time chunks. row_size = row_slice.stop - row_slice.start col_size = col_slice.stop - col_slice.start chunk_spec = {row_dim: row_size, col_dim: col_size} # Identify the time dimension and rechunk it too. time_dim = None for dim in cropped_fields.dims: if dim not in (row_dim, col_dim): time_dim = dim break if time_dim is not None: chunk_spec[time_dim] = time_chunk_size cropped_fields = cropped_fields.chunk(chunk_spec) # Remove inherited zarr chunk encoding to avoid mismatch with cropped dask chunks. for name in cropped_fields.variables: cropped_fields[name].encoding.pop('chunks', None) # Save the cropped dataset to a new ZARR file dirout.mkdir(parents=True, exist_ok=True) write_task = cropped_fields.to_zarr( dirout / str(filename).endswith('.zarr'), mode='w', zarr_version=2, compute=False, ) if show_progress: from dask.diagnostics import ProgressBar with ProgressBar(): write_task.compute() else: write_task.compute() logging.info(_('Cropped zarr written to %s with chunks %s'), dirout / f'{filename}.zarr', chunk_spec)
[docs] def rechunk_source_zarr(self, dirout:Path, filename:str = FIELDS_FILE, time_chunk_size:int = 720, lat_chunk_size:int = -1, lon_chunk_size:int = -1, show_progress:bool = False): """Rechunk the full source zarr with an optimised layout. The original IRM zarr uses chunks of (6, 19, 36) which causes extremely poor read performance for time-series queries. This method rewrites the dataset with much larger chunks. :param dirout: Output directory. :param filename: Output zarr name (without .zarr). :param time_chunk_size: Time steps per chunk (720 = ~1 month). :param lat_chunk_size: Latitude chunk size (-1 = full extent). :param lon_chunk_size: Longitude chunk size (-1 = full extent). :param show_progress: Show a dask progress bar. """ if self.fields_ds is None: raise ValueError(_("Data not loaded. Call load_data() first.")) # Resolve spatial dimensions. if 'latitude' in self.fields_ds and self.fields_ds['latitude'].ndim >= 2: row_dim, col_dim = self.fields_ds['latitude'].dims[-2:] elif 'longitude' in self.fields_ds and self.fields_ds['longitude'].ndim >= 2: row_dim, col_dim = self.fields_ds['longitude'].dims[-2:] else: raise ValueError( "Unable to infer spatial dimensions from dataset coordinates. " "Expected 2D 'latitude' or 'longitude' coordinates." ) lat_size = self.fields_ds.sizes[row_dim] if lat_chunk_size == -1 else lat_chunk_size lon_size = self.fields_ds.sizes[col_dim] if lon_chunk_size == -1 else lon_chunk_size chunk_spec = {row_dim: lat_size, col_dim: lon_size} for dim in self.fields_ds.dims: if dim not in (row_dim, col_dim): chunk_spec[dim] = time_chunk_size rechunked = self.fields_ds.chunk(chunk_spec) for name in rechunked.variables: rechunked[name].encoding.pop('chunks', None) dirout.mkdir(parents=True, exist_ok=True) out_path = dirout / str(filename).endswith('.zarr') write_task = rechunked.to_zarr(out_path, mode='w', zarr_version=2, compute=False) logging.info(_('Rechunking to %s with chunks %s ...'), out_path, chunk_spec) if show_progress: from dask.diagnostics import ProgressBar with ProgressBar(): write_task.compute() else: write_task.compute() logging.info(_('Rechunked zarr written to %s'), out_path)
def __str__(self): """ Return a string representation of the ClimateDataHourly object, including paths and dataset info. """ paths = f"ClimateDataHourly(datadir={self.datadir}, fields_file={self.fields_file}, analog_file={self.analog_file})" infos = f"Fields dataset: {self.fields_ds}\nAnalog dataset: {self.analog_ds}" return f"{paths}\n{infos}"
[docs] def get_precipitation(self, region:Polygon, time_interval:tuple[datetime, datetime]) -> pd.Series: """ Get precipitation for a given bounding box and time interval. :region: Polygon representing the area of interest in Lambert 72 coordinates :time_interval: (start_time, end_time) as a tuple of datetime objects :return: Pandas Series with time as index and precipitation [mm/h] as values """ if self.fields_ds is None: raise ValueError(_("Data not loaded. Call load_data() first.")) row_slice, col_slice = self._find_slices_for_region(region) fractions, s = self.get_surface_and_fractions_of_pixels_in_region(region) mask = self._gdf[region] > 0. frac_np = np.zeros((row_slice.stop - row_slice.start, col_slice.stop - col_slice.start)) sub = self._gdf.loc[mask, ['pixel_row', 'pixel_col', region]] rows = sub['pixel_row'].values.astype(int) - row_slice.start cols = sub['pixel_col'].values.astype(int) - col_slice.start frac_np[rows, cols] = sub[region].values # Read data via zarr directly (bypasses dask overhead entirely). # zarr's internal chunk-assembly is implemented in C and avoids # the per-chunk Python task overhead that dask imposes. zarr_path = Path(self.datadir / self.fields_file) zarr_store = zarr.open(str(zarr_path), mode='r') # Resolve time indices for direct slicing time_coords = self.fields_ds['time'].values t_start = np.datetime64(time_interval[0]) t_end = np.datetime64(time_interval[1]) time_mask = (time_coords >= t_start) & (time_coords <= t_end) time_indices = np.where(time_mask)[0] if len(time_indices) == 0: return pd.Series(dtype=np.float64) t_slice = slice(int(time_indices[0]), int(time_indices[-1]) + 1) # Direct zarr array read — single contiguous slice, no dask graph precip_np = zarr_store['tp_median'][t_slice, row_slice.start:row_slice.stop, col_slice.start:col_slice.stop] times = time_coords[t_slice] result = (precip_np * frac_np).sum(axis=(1, 2)) return pd.Series(result, index=times)