Source code for wolfhece.async_image_loader

"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import asyncio
import logging
from typing import Callable, Optional
from dataclasses import dataclass
from enum import Enum
from PIL import Image

from wolfhece.PyTranslate import _


[docs] class ImageLoadPriority(Enum): """Priority levels for image loading"""
[docs] LOW = 3
[docs] NORMAL = 2
[docs] HIGH = 1
@dataclass
[docs] class ImageLoadRequest: """Represents a request to load an image asynchronously"""
[docs] id: str
[docs] loader_func: Callable
[docs] callback: Callable[[Optional[Image.Image]], None]
[docs] priority: ImageLoadPriority = ImageLoadPriority.NORMAL
[docs] bounds: tuple[float, float, float, float] = None # (xmin, xmax, ymin, ymax) for deduplication
[docs] class AsyncImageLoader: """ Manages asynchronous loading of images with a priority queue. Allows scheduling images to be loaded in the background without blocking the UI. Uses asyncio with ThreadPoolExecutor for blocking I/O operations. """ def __init__(self, max_workers: int = 4, loop: Optional[asyncio.AbstractEventLoop] = None): """ Initialize the async image loader :param max_workers: Maximum number of concurrent loading tasks :param loop: Optional asyncio event loop (creates new one if not provided) """
[docs] self.max_workers = max_workers
[docs] self.loop = loop or asyncio.new_event_loop()
# Priority queue: lower priority value = higher priority
[docs] self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
# Track active loads to avoid duplicates
[docs] self._active_loads: dict[str, asyncio.Task] = {}
[docs] self._pending_requests: dict[str, ImageLoadRequest] = {}
# Control flags
[docs] self._running = False
[docs] self._worker_task: Optional[asyncio.Task] = None
# Executor for blocking I/O import concurrent.futures
[docs] self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
logging.debug(_('AsyncImageLoader initialized with ') + str(max_workers) + _(' workers'))
[docs] def start(self): """ Start the image loader background worker """ if self._running: return self._running = True self._worker_task = asyncio.ensure_future(self._worker_loop(), loop=self.loop) logging.debug(_('AsyncImageLoader started'))
[docs] def stop(self): """ Stop the image loader and cleanup """ self._running = False if self._worker_task: self._worker_task.cancel() # Cancel all pending tasks for task in self._active_loads.values(): if not task.done(): task.cancel() self._active_loads.clear() self._pending_requests.clear() if self._executor: self._executor.shutdown(wait=False) logging.debug(_('AsyncImageLoader stopped'))
[docs] def schedule_load(self, request_id: str, loader_func: Callable, callback: Callable[[Optional[Image.Image]], None], priority: ImageLoadPriority = ImageLoadPriority.NORMAL, bounds: tuple[float, float, float, float] = None) -> None: """ Schedule an image to be loaded asynchronously :param request_id: Unique identifier for this request (used for deduplication) :param loader_func: Callable that returns the image (PIL Image or None) :param callback: Callable to invoke when image is ready: callback(image) :param priority: ImageLoadPriority level :param bounds: Optional spatial bounds (xmin, xmax, ymin, ymax) for tracking """ request = ImageLoadRequest( id=request_id, loader_func=loader_func, callback=callback, priority=priority, bounds=bounds ) # If same request already pending, just update the callback if request_id in self._pending_requests: logging.debug(_('Updating pending request: ') + request_id) self._pending_requests[request_id] = request return # Store pending request self._pending_requests[request_id] = request # Enqueue it (priority, insertion_order, request) try: self.loop.call_soon_threadsafe( self._queue.put_nowait, (priority.value, request_id, request) ) except RuntimeError: logging.warning(_('Failed to schedule image load - event loop may be closed'))
[docs] async def _worker_loop(self): """ Main worker loop that processes the image queue """ while self._running: try: # Get next request from queue (with timeout to allow graceful shutdown) try: _, request_id, request = await asyncio.wait_for( self._queue.get(), timeout=1.0 ) except asyncio.TimeoutError: # Timeout is normal - allows checking _running flag continue # Remove from pending if still there self._pending_requests.pop(request_id, None) # Check if already loading (skip if so) if request_id in self._active_loads: task = self._active_loads[request_id] if not task.done(): logging.debug(_('Already loading: ') + request_id) continue # Create task for this load task = asyncio.ensure_future( self._load_image(request), loop=self.loop ) self._active_loads[request_id] = task except asyncio.CancelledError: break except Exception as e: logging.error(_('Error in image loader worker: ') + str(e))
[docs] async def _load_image(self, request: ImageLoadRequest) -> None: """ Load an image asynchronously using executor :param request: ImageLoadRequest to process """ try: logging.debug(_('Loading image: ') + request.id) # Run blocking I/O in executor (thread pool) image = await self.loop.run_in_executor( self._executor, request.loader_func ) # Invoke callback with result if callable(request.callback): request.callback(image) logging.debug(_('Loaded image: ') + request.id) except asyncio.CancelledError: logging.debug(_('Load cancelled: ') + request.id) except Exception as e: logging.error(_('Error loading image: ') + request.id + ' - ' + str(e)) # Still invoke callback with None to notify caller if callable(request.callback): request.callback(None) finally: # Remove from active loads self._active_loads.pop(request.id, None)
[docs] def cancel_obsolete_for_texture(self, texture_id: str, keep_request_id: str) -> int: """ Cancel all pending/active loads for a texture except the specified request Useful when panning/zooming: old requests for the same texture become obsolete and can be cleared from the queue to improve responsiveness. :param texture_id: The texture identifier (e.g., self.idx from wolf_texture) :param keep_request_id: The request ID to keep (typically the most recent one) :return: Number of requests cancelled """ cancelled_count = 0 # Cancel pending requests for this texture to_remove = [] for req_id, request in self._pending_requests.items(): # Check if this request is for the same texture and not the one to keep if texture_id in req_id and req_id != keep_request_id: to_remove.append(req_id) cancelled_count += 1 for req_id in to_remove: del self._pending_requests[req_id] logging.debug(_('Cancelled obsolete request: ') + req_id) # Cancel active loads for this texture (but let them finish gracefully) to_cancel = [] for req_id, task in self._active_loads.items(): if texture_id in req_id and req_id != keep_request_id: if not task.done(): task.cancel() to_cancel.append(req_id) cancelled_count += 1 if cancelled_count > 0: logging.debug(_('Cancelled ') + str(cancelled_count) + _(' obsolete texture loads')) return cancelled_count
[docs] def get_pending_count(self) -> int: """ Get the number of pending image load requests """ return len(self._pending_requests) + len(self._active_loads)
def __del__(self): """ Cleanup on deletion """ self.stop()
# Global singleton instance (optional - can also instantiate per module)
[docs] _global_loader: Optional[AsyncImageLoader] = None
[docs] def get_global_loader(max_workers: int = 4) -> AsyncImageLoader: """ Get or create the global image loader instance :param max_workers: Number of worker threads (only used on first call) :return: Global AsyncImageLoader instance """ global _global_loader if _global_loader is None: _global_loader = AsyncImageLoader(max_workers=max_workers) _global_loader.start() return _global_loader
[docs] def stop_global_loader(): """ Stop the global image loader """ global _global_loader if _global_loader is not None: _global_loader.stop() _global_loader = None