from .PyTranslate import _
from .PyWMS import WebService, getAlaro, getCartoweb, getIGNFrance, getLifeWatch, getNGI, getOrthoPostFlood2021, getVlaanderen, getWalonmap
import numpy as np
from OpenGL.GL import bytes
from OpenGL.GLUT import bytes, logging
from PIL import Image
from tqdm import tqdm
try:
import diskcache
except ImportError:
raise ImportError(_('diskcache library not found. Please install it with "pip install diskcache" to enable caching functionality.'))
import concurrent.futures
import logging
from io import BytesIO
from pathlib import Path
from dataclasses import dataclass
@dataclass
[docs]
class TilesInfo:
[docs]
bounds: tuple[float, float, float, float]
def __iter__(self):
for i in range(self.nbx):
for j in range(self.nby):
loc_xmin = self.bounds[0] + i * self.tile_size
loc_xmax = self.bounds[0] + (i + 1) * self.tile_size
loc_ymin = self.bounds[2] + j * self.tile_size
loc_ymax = self.bounds[2] + (j + 1) * self.tile_size
yield (loc_xmin, loc_xmax, loc_ymin, loc_ymax, self.width, self.height)
[docs]
class CachedImages():
"""
Manage a cache of images obtained from web services, stored on disk using diskcache.
The cache is organized by service, category, subcategory, and spatial extent (xmin, xmax, ymin, ymax).
The get method retrieves images from the cache if they exist, or fetches them from the web service
and adds them to the cache if they do not.
The clear_cache method allows clearing the entire cache when needed.
"""
def __init__(self, epsg:str, max_resolution:float, magnetic_grid:float = 500.0, max_workers:int = 4) -> None:
""" Initialize the cache for a specific EPSG code
:param epsg: The EPSG code for which to manage the cache (e.g., '31370')
:param max_resolution: The maximum resolution for cached images (m/pixel) - used to determine appropriate grid size for caching
:param magnetic_grid: The size of the magnetic grid for caching (default: 500.0)
:param max_workers: Number of worker threads for concurrent downloads (default: 4)
"""
assert isinstance(epsg, str), "EPSG code must be a string"
assert isinstance(max_resolution, (int, float)), "Max resolution must be a number"
assert isinstance(magnetic_grid, (int, float)), "Magnetic grid must be a number"
assert isinstance(max_workers, int) and max_workers > 0, "Max workers must be a positive integer"
[docs]
self.epsg = epsg.lower().replace('epsg:', '') # Remove 'EPSG:' prefix if present
[docs]
self.max_resolution = max_resolution
[docs]
self._magnetic_grid_size = magnetic_grid
[docs]
self._cache = diskcache.Cache(directory=Path(__file__).parent / "data" / "cache" / "background" / self.epsg,
size_limit= 10 * 1024 * 1024 * 1024) # 10 GB
# Create a persistent thread pool executor for efficient concurrent operations
[docs]
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
[docs]
def _fetch_one_image_and_cache(self, service: WebService, category: str, subcategory: str,
xmin:float, xmax:float,
ymin:float, ymax:float,
width:int = 500, height:int = 500) -> Image.Image | None:
""" Fetch an image from the specified web service and add it to the cache
:param service: The web service from which to fetch the image (e.g., WebService.Walonmap)
:param category: The category of the image to fetch (e.g., 'Orthophotos')
:param subcategory: The subcategory of the image to fetch (e.g., 'Last')
:param xmin: Minimum X coordinate of the spatial extent for which to fetch the image
:param xmax: Maximum X coordinate of the spatial extent for which to fetch the image
:param ymin: Minimum Y coordinate of the spatial extent for which to fetch the image
:param ymax: Maximum Y coordinate of the spatial extent for which to fetch the image
:param width: Width of the image to fetch in pixels
:param height: Height of the image to fetch in pixels
:return: The fetched image as a PIL Image object, or None if there was an error
"""
mybytes: BytesIO
force_alpha = False
if service == WebService.IGN_France:
mybytes = getIGNFrance(category, self.epsg,
xmin, ymin, xmax, ymax,
width, height, False)
elif service == WebService.Vlaanderen:
mybytes = getVlaanderen(category,
xmin, ymin, xmax, ymax,
width, height, False)
elif service == WebService.LifeWatch:
mybytes = getLifeWatch(category + '_' + subcategory,
xmin, ymin, xmax, ymax,
width, height, False)
elif service == WebService.IGN_Belgium:
mybytes = getNGI(subcategory,
xmin, ymin, xmax, ymax,
width, height, False)
elif service == WebService.IGN_Cartoweb:
mybytes = getCartoweb(subcategory,
xmin, ymin, xmax, ymax,
width, height, False)
elif service == WebService.OrthoPostFlood2021:
mybytes = getOrthoPostFlood2021(subcategory,
xmin, ymin, xmax, ymax,
width, height, False)
elif service == WebService.Alaro:
mybytes = getAlaro(subcategory,
xmin, ymin, xmax, ymax,
width, height, False, time= self.time)
force_alpha = True
else:
mybytes = getWalonmap(category + '/' + subcategory,
xmin, ymin, xmax, ymax,
width, height, False)
if mybytes is None:
logging.warning(_('Error opening image file : ') + str(category + '/' + subcategory))
return None
try:
if isinstance(mybytes, bytes | BytesIO):
image = Image.open(mybytes)
if image.mode != 'RGBA':
image = image.convert('RGBA')
if force_alpha:
if self.alpha < 0.0:
self.alpha = 0.0
if self.alpha > 1.0:
self.alpha = 1.0
alpha = Image.new('L', image.size, int(self.alpha * 255))
image.putalpha(alpha)
elif isinstance(mybytes, str):
image = Image.open(mybytes).convert('RGB')
if image.width != width or image.height != height:
image = image.resize((width, height))
image_memory = BytesIO()
image.save(image_memory, format='PNG')
image = Image.open(image_memory)
elif isinstance(mybytes, Image.Image):
image = mybytes
else:
logging.error(_('Unknown type of image file : ') + str(type(mybytes)))
return None
self._cache[(service, category, subcategory, xmin, xmax, ymin, ymax, width, height)] = image
return image
except Exception as e:
logging.warning(_('Error opening image file : ') + str(category + '/' + subcategory))
return None
[docs]
def _align_to_magnetic_grid(self, value:float) -> float:
""" Align a coordinate value to the nearest grid line based on the specified grid size.
:param value: The coordinate value to align
:return: The aligned coordinate value
"""
if self._magnetic_grid_size <= 0:
return value
return np.floor(value / self._magnetic_grid_size) * self._magnetic_grid_size
[docs]
def _get_bounds_to_magnetic_grid(self, xmin:float, xmax:float,
ymin:float, ymax:float) -> tuple[float, float, float, float]:
""" Align the spatial bounds to the grid based on the specified grid size.
:param xmin: Minimum X coordinate
:param xmax: Maximum X coordinate
:param ymin: Minimum Y coordinate
:param ymax: Maximum Y coordinate
:return: Tuple of (aligned_xmin, aligned_xmax, aligned_ymin, aligned_ymax)
"""
if self._magnetic_grid_size <= 0:
return xmin, xmax, ymin, ymax
aligned_xmin = self._align_to_magnetic_grid(xmin)
aligned_xmax = self._align_to_magnetic_grid(xmax + self._magnetic_grid_size - 1e-6) # Add small epsilon to ensure proper ceiling
aligned_ymin = self._align_to_magnetic_grid(ymin)
aligned_ymax = self._align_to_magnetic_grid(ymax + self._magnetic_grid_size - 1e-6)
return aligned_xmin, aligned_xmax, aligned_ymin, aligned_ymax
[docs]
def _get_tile_size(self, xmin:float, xmax:float, ymin:float, ymax:float) -> int:
""" Determine the best tile size for caching based on the requested
resolution and the maximum resolution of the cache.
We accept NTILES tiles in each direction as a good balance between cache
hit rate and number of requests.
:param xmin: Minimum X coordinate
:param xmax: Maximum X coordinate
:param ymin: Minimum Y coordinate
:param ymax: Maximum Y coordinate
:return: The optimal tile size in coordinate units
"""
dx = xmax - xmin
dy = ymax - ymin
tile_size_x = dx / NTILES
tile_size_y = dy / NTILES
tile_size = min(tile_size_x, tile_size_y)
# Snap to the nearest power of 2 for better cache organization
if tile_size > 0:
tile_size = 2 ** np.ceil(np.log2(tile_size))
# Ensure tile size does not exceed max resolution of cache
res_tile_size = max(dx / tile_size, dy / tile_size)
if res_tile_size < self.max_resolution:
tile_size = min(dx / self.max_resolution , dy / self.max_resolution)
tile_size = 2 ** np.ceil(np.log2(tile_size))
return int(tile_size)
[docs]
def _get_infos(self, xmin:float, xmax:float,
ymin:float, ymax:float,
scale_x:float = 1.0, scale_y:float = 1.0) -> TilesInfo:
""" Prepare the spatial bounds by aligning them to the magnetic grid and determining the appropriate tile size.
:param xmin: Minimum X coordinate
:param xmax: Maximum X coordinate
:param ymin: Minimum Y coordinate
:param ymax: Maximum Y coordinate
:param scale_x: Scale factor for X dimension (default: 1.0)
:param scale_y: Scale factor for Y dimension (default: 1.0)
:return: TilesInfo object containing tile information and bounds
"""
# Snap bounds to grid
xmin, xmax, ymin, ymax = self._get_bounds_to_magnetic_grid(xmin, xmax, ymin, ymax)
tile_size = self._get_tile_size(xmin, xmax, ymin, ymax)
nbx = max(1, int((xmax - xmin) / tile_size)+1)
nby = max(1, int((ymax - ymin) / tile_size)+1)
width = min(1024, int(tile_size * scale_x))
height = min(1024, int(tile_size * scale_y))
# Snap width and height to the nearest power of 2 for better cache organization
if width > 0:
width = 2 ** np.ceil(np.log2(width))
if height > 0:
height = 2 ** np.ceil(np.log2(height))
width = min(1024, int(width))
height = min(1024, int(height))
return TilesInfo(bounds=(xmin, xmax, ymin, ymax),
tile_size=tile_size,
nbx=nbx, nby=nby,
width=width, height=height)
def __call__(self, xmin:float, xmax:float, ymin:float, ymax:float,
scale_x:float = 1.0, scale_y:float = 1.0) -> TilesInfo:
""" Prepare the spatial bounds by aligning them to the magnetic grid and determining the appropriate tile size. """
return self._get_infos(xmin, xmax, ymin, ymax, scale_x, scale_y)
[docs]
def _check_if_all_is_in_cache(self, service: WebService, category: str, subcategory: str,
xmin:float, xmax:float, ymin:float, ymax:float,
scale_x:float = 1.0, scale_y:float = 1.0) -> bool:
"""
Perform a rapid check to see if the requested image is likely
to be in the cache based on the magnetic grid size and spatial extent.
:param service: The web service to check
:param category: The image category
:param subcategory: The image subcategory
:param xmin: Minimum X coordinate
:param xmax: Maximum X coordinate
:param ymin: Minimum Y coordinate
:param ymax: Maximum Y coordinate
:param scale_x: Scale factor for X dimension (default: 1.0)
:param scale_y: Scale factor for Y dimension (default: 1.0)
:return: True if all requested tiles are in cache, False otherwise
"""
for tile in self(xmin, xmax, ymin, ymax, scale_x, scale_y):
if (service, category, subcategory, *tile) not in self._cache:
return False
return True
[docs]
def _async_add_and_cache(self, service: WebService, category: str, subcategory: str,
xmin:float, xmax:float,
ymin:float, ymax:float,
width:int = 500,
height:int = 500) -> concurrent.futures.Future:
""" Asynchronously submit a fetch task to the thread pool without blocking.
Returns a Future object that can be waited on later.
This allows multiple requests to be submitted in parallel before waiting for results.
:param service: The web service from which to fetch the image
:param category: The image category
:param subcategory: The image subcategory
:param xmin: Minimum X coordinate
:param xmax: Maximum X coordinate
:param ymin: Minimum Y coordinate
:param ymax: Maximum Y coordinate
:param width: Width of image in pixels (default: 500)
:param height: Height of image in pixels (default: 500)
:return: A Future object that resolves to the fetched and cached image
"""
return self._executor.submit(self._fetch_one_image_and_cache, service, category, subcategory,
xmin, xmax, ymin, ymax, width, height)
[docs]
def get_dict(self, service: WebService, category: str, subcategory: str,
xmin:float, xmax:float, ymin:float, ymax:float,
scale_x:float = 1.0, scale_y:float = 1.0,
async_fetch:bool = True) -> dict[tuple[float, float, float, float], Image.Image]:
""" Retrieve an image from the cache if it exists, or fetch it from the web service and add it to the cache if it does not.
:param service: The web service from which to retrieve the image (e.g., WebService.Walonmap)
:param category: The category of the image to retrieve (e.g., 'Orthophotos')
:param subcategory: The subcategory of the image to retrieve (e.g., 'Last')
:param xmin: Minimum X coordinate of the spatial extent for which to retrieve the image
:param xmax: Maximum X coordinate of the spatial extent for which to retrieve the image
:param ymin: Minimum Y coordinate of the spatial extent for which to retrieve the image
:param ymax: Maximum Y coordinate of the spatial extent for which to retrieve the image
:param async_fetch: Whether to fetch images asynchronously (default is True)
:return: A dictionary mapping spatial extents (xmin, xmax, ymin, ymax) to the corresponding images as PIL Image objects
"""
ret = {}
in_cache = self._check_if_all_is_in_cache(service, category, subcategory, xmin, xmax, ymin, ymax, scale_x, scale_y)
try:
if in_cache:
logging.debug(_('All images are in cache -- retrieving from cache'))
for tile in self(xmin, xmax, ymin, ymax, scale_x, scale_y):
ret[(tile[0], tile[1], tile[2], tile[3])] = self._cache[(service, category, subcategory,
tile[0], tile[1],
tile[2], tile[3],
tile[4], tile[5]
)]
else:
logging.debug(_('Some images are not in cache -- retrieving from service'))
if async_fetch:
# First pass: collect all futures without waiting
futures_dict = {} # Maps (loc_xmin, loc_xmax, loc_ymin, loc_ymax) to Future
future_to_key = {} # Reverse mapping: Future -> key (for O(1) lookup instead of O(n))
for tile in self(xmin, xmax, ymin, ymax, scale_x, scale_y):
loc_xmin, loc_xmax, loc_ymin, loc_ymax, img_width, img_height = tile
# check if the image is already in cache
if (service, category, subcategory, loc_xmin, loc_xmax, loc_ymin, loc_ymax, img_width, img_height) in self._cache:
ret[(loc_xmin, loc_xmax, loc_ymin, loc_ymax)] = self._cache[(service, category, subcategory,
loc_xmin, loc_xmax,
loc_ymin, loc_ymax,
img_width, img_height
)]
else:
# Submit task and store future (non-blocking)
key = (loc_xmin, loc_xmax, loc_ymin, loc_ymax)
future = self._async_add_and_cache(service, category, subcategory,
loc_xmin, loc_xmax, loc_ymin, loc_ymax,
img_width, img_height)
futures_dict[key] = future
future_to_key[future] = key
# Second pass: collect results progressively as they complete (progressive waiting)
for future in concurrent.futures.as_completed(futures_dict.values()):
key = future_to_key[future]
try:
ret[key] = future.result()
except Exception as e:
logging.warning(_('Error fetching tile : ') + str(key))
logging.warning(str(e))
ret[key] = None
else:
for tile in self(xmin, xmax, ymin, ymax, scale_x, scale_y):
loc_xmin, loc_xmax, loc_ymin, loc_ymax, img_width, img_height = tile
# check if the image is already in cache
if (service, category, subcategory, loc_xmin, loc_xmax, loc_ymin, loc_ymax, img_width, img_height) in self._cache:
ret[(loc_xmin, loc_xmax, loc_ymin, loc_ymax)] = self._cache[(service, category, subcategory,
loc_xmin, loc_xmax,
loc_ymin, loc_ymax,
img_width, img_height
)]
else:
ret[(loc_xmin, loc_xmax, loc_ymin, loc_ymax)] = self._fetch_one_image_and_cache(service, category, subcategory,
loc_xmin, loc_xmax,
loc_ymin, loc_ymax,
img_width, img_height)
except Exception as e:
logging.warning(_('Error fetching image from cache for : ') + str(category + '/' + subcategory))
logging.warning(_('Fetching image from web service...'))
logging.warning(str(e))
return ret
[docs]
def get_image(self, service: WebService, category: str, subcategory: str,
xmin:float, xmax:float, ymin:float, ymax:float,
scale_x:float = 1.0, scale_y:float = 1.0,
async_fetch:bool = True, direct_fetch:bool = False) -> Image.Image:
""" Retrieve an image from the cache if it exists, or fetch it from the web service and add it to the cache if it does not.
:param service: The web service from which to retrieve the image (e.g., WebService.Walonmap)
:param category: The category of the image to retrieve (e.g., 'Orthophotos')
:param subcategory: The subcategory of the image to retrieve (e.g., 'Last')
:param xmin: Minimum X coordinate of the spatial extent for which to retrieve the image
:param xmax: Maximum X coordinate of the spatial extent for which to retrieve the image
:param ymin: Minimum Y coordinate of the spatial extent for which to retrieve the image
:param ymax: Maximum Y coordinate of the spatial extent for which to retrieve the image
:param async_fetch: Whether to fetch images asynchronously (default is True)
:param direct_fetch: Whether to fetch images directly from the web service (default is False)
:return: The retrieved image as a PIL Image object
"""
img_width = int((xmax - xmin) * scale_x)
img_height = int((ymax - ymin) * scale_y)
if direct_fetch:
# Non need to check cache or combine tiles - just fetch the single image for the entire extent
return self._fetch_one_image_and_cache(service, category, subcategory,
xmin, xmax, ymin, ymax,
img_width, img_height)
else:
ret = self.get_dict(service, category, subcategory, xmin, xmax, ymin, ymax, scale_x, scale_y, async_fetch)
if not ret:
# cache miss and error fetching tiles - try to fetch the single image for the entire extent as a fallback
img = self._fetch_one_image_and_cache(service, category, subcategory,
xmin, xmax, ymin, ymax,
img_width, img_height)
else:
# Combining the tiles into a single image and crop to the requested extent
combined_image = Image.new('RGBA', (img_width, img_height))
for key, img in ret.items():
if img is not None:
tile_xmin, tile_xmax, tile_ymin, tile_ymax = key
# Calculate the position of the tile in the combined image
pos_xmin = int((tile_xmin - xmin) / (xmax - xmin) * img_width)
pos_xmax = int((tile_xmax - xmin) / (xmax - xmin) * img_width)
pos_ymin = int((tile_ymin - ymin) / (ymax - ymin) * img_height)
pos_ymax = int((tile_ymax - ymin) / (ymax - ymin) * img_height)
scaled_image = img.resize((pos_xmax - pos_xmin, pos_ymax - pos_ymin))
combined_image.paste(scaled_image, (pos_xmin, img_height - pos_ymax))
img = combined_image
return img
[docs]
def get_image_for_async(self, service: WebService, category: str, subcategory: str,
xmin:float, xmax:float, ymin:float, ymax:float,
scale_x:float = 1.0, scale_y:float = 1.0,
direct_fetch:bool = False) -> Image.Image | None:
""" Synchronous wrapper for async loaders - returns image directly.
This method is designed to be called from an asyncio executor for non-blocking operation.
It combines tiles if needed and returns the final image suitable for OpenGL texture loading.
:param service: The web service from which to retrieve the image
:param category: The category of the image to retrieve
:param subcategory: The subcategory of the image to retrieve
:param xmin: Minimum X coordinate of the spatial extent
:param xmax: Maximum X coordinate of the spatial extent
:param ymin: Minimum Y coordinate of the spatial extent
:param ymax: Maximum Y coordinate of the spatial extent
:param scale_x: Scale factor for X dimension (default: 1.0)
:param scale_y: Scale factor for Y dimension (default: 1.0)
:param direct_fetch: Whether to fetch the entire extent as one image
:return: The retrieved image as a PIL Image object, or None if there was an error
"""
# Use existing get_image method (synchronous, suitable for executor)
return self.get_image(service, category, subcategory, xmin, xmax, ymin, ymax,
scale_x, scale_y, async_fetch=False, direct_fetch=direct_fetch)
[docs]
def clear_cache(self):
""" Clear the entire cache of images from disk """
self._cache.clear()
def __del__(self):
""" Cleanup: shutdown the thread pool executor """
if hasattr(self, '_executor') and self._executor is not None:
self._executor.shutdown(wait=True)