Source code for wolfhece.wolf_array._mb

"""
WolfArrayMB - GUI-enabled multi-block array class with OpenGL rendering
and wxPython integration.

Inherits all data operations from :class:`WolfArrayMBModel` and adds:

- OpenGL rendering, palette/colormap management
- Interactive wx dialogs
- Matplotlib visualization (``imshow``, ``plot_matplotlib``)

Author: HECE - University of Liege, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import os
import logging
import wx

import numpy as np
import numpy.ma as ma

from typing import Union, Literal
from matplotlib.axis import Axis
from matplotlib.figure import Figure
import matplotlib.pyplot as plt
from matplotlib.colors import Colormap

try:
    from ..PyTranslate import _
except ImportError as e:
    print(e)
    raise Exception(_('Error importing modules'))

from ..PyVertexvectors import Zones, vector, zone
from ..PyPalette import wolfpalette

from ._header_wolf import (
    header_wolf,
    getkeyblock,
    decodekeyblock,
    WOLF_ARRAY_FULL_SINGLE,
    WOLF_ARRAY_FULL_INTEGER,
    WOLF_ARRAY_FULL_INTEGER16,
    WOLF_ARRAY_FULL_INTEGER16_2,
    WOLF_ARRAY_MB_SINGLE,
    WOLF_ARRAY_MB_INTEGER,
    WOLF_ARRAY_FULL_INTEGER8,
)

from ._selection_data import SelectionData, SelectionDataMB
from ._base import WolfArrayModel, VERSION_RGB
from ._base_gui import WolfArray
from ._mb_model import WolfArrayMBModel


[docs] class WolfArrayMB(WolfArrayMBModel, WolfArray): """GUI-enabled multi-block array class. Inherits data operations from :class:`WolfArrayMBModel` and GUI/rendering from :class:`WolfArray`. Blocks (:class:`WolfArray` instances) are stored in the :attr:`myblocks` dictionary, keyed by identifiers produced by :func:`getkeyblock`. :ivar myblocks: Dictionary mapping block keys to :class:`WolfArray` instances. """
[docs] myblocks: dict[str, WolfArray]
# ======================================================================== # Constructor & Factory # ======================================================================== def __init__(self, fname=None, mold=None, masknull=True, crop=None, whichtype=WOLF_ARRAY_MB_SINGLE, preload=True, create=False, mapviewer=None, nullvalue=0., srcheader=None, idx: str = ''): """Create a GUI-enabled multi-block array. :param fname: Path to a WOLF binary file. :param mold: Template array to copy layout from. :param masknull: If ``True``, mask cells equal to *nullvalue*. :param crop: Optional spatial crop ``[[xmin, xmax], [ymin, ymax]]``. :param whichtype: Array element type constant. :param preload: If ``True``, load data during construction. :param create: If ``True``, create a new file on disk. :param mapviewer: Parent :class:`WolfMapViewer` for GUI integration. :param nullvalue: No-data marker value. :param srcheader: A :class:`header_wolf` to initialise from. :param idx: Identity string, used by the map viewer. """ WolfArray.__init__(self, fname, mold, masknull, crop, whichtype, preload, create, mapviewer, nullvalue, srcheader=srcheader, idx=idx)
[docs] self.mngselection = SelectionDataMB(self)
[docs] self._active_blocks = 0
if self.myblocks is None: self.myblocks = {} @classmethod
[docs] def make_multiblocks(cls, others: list, abs: bool = True, copy: bool = True) -> "WolfArrayMB": """Merge several single-block arrays into a new GUI multi-block array. :param others: List of :class:`WolfArray` instances to merge. :param abs: If ``True``, use global (absolute) world coordinates. :param copy: If ``True``, copy data from each source array. :return: A new :class:`WolfArrayMB` containing all blocks. """ newMBArray = cls() for curarray in others: newMBArray.add_block(curarray, force_idx=True, copyarray=copy) newMBArray.set_header_from_added_blocks() return newMBArray
# ======================================================================== # Block Management # ========================================================================
[docs] def add_block(self, arr: WolfArray, force_idx: bool = False, copyarray=False): """Add a block to this multi-block array. After adding all blocks, call :meth:`set_header_from_added_blocks` to recompute the global header. :param arr: The block to add. :param force_idx: If ``True``, the block key is assigned automatically. :param copyarray: If ``True``, a deep copy of *arr* is stored. """ if copyarray: arr = WolfArray(mold=arr, nullvalue=arr.nullvalue) force_idx = True if force_idx: arr.idx = getkeyblock(len(self.myblocks)) else: assert arr.idx is not None and type(arr.idx) == str and arr.idx.strip() != '', \ f"The block index/key is wrong {arr.idx}" assert arr.idx not in self.myblocks, \ "You can't have the same block twice" pos = len(self.myblocks) posidx = decodekeyblock(arr.idx, False) assert pos == posidx, \ f"The block index/key is wrong {arr.idx}" self.myblocks[arr.idx] = arr arr.isblock = True arr.blockindex = len(self.myblocks) - 1
[docs] def add_ops_sel(self): """Add operations and selection manager to all blocks.""" super().add_ops_sel() if self.myblocks is None: self.myblocks = {} for curblock in self.myblocks.values(): curblock.add_ops_sel()
[docs] def extract_selection(self): """Extract the current selection into a new multi-block array. Each block's :class:`SelectionData` produces a sub-array; the results are assembled into a new :class:`WolfArrayMB`. If a map viewer is attached, the extracted array is also registered in it. """ newarrays = [] for curblock in self.myblocks.values(): newblock = curblock.SelectionData.get_newarray() if newblock is not None: newarrays.append(newblock) if len(newarrays) == 0: logging.warning(_('No selection to extract')) return None newMBarray = WolfArrayMB() for newarray in newarrays: newMBarray.add_block(newarray, force_idx=True) mapviewer = self.get_mapviewer() if mapviewer is not None: mapviewer.add_object('array', newobj=newarray, ToCheck=True, id=self.idx + '_extracted')
[docs] def contour(self, levels: Union[int, list[float]] = 10) -> Zones: """Compute contour lines from this multi-block array. The array is first converted to a single-block :class:`WolfArray` before contouring. :param levels: Number of levels or explicit list of level values. :return: A :class:`Zones` object containing the contour polylines. """ tmp = self.as_WolfArray() return tmp.contour(levels)
# ======================================================================== # Memory Allocation & Conversion # ========================================================================
[docs] def allocate_ressources(self): """Allocate memory for every block defined in :attr:`head_blocks`. Creates :class:`WolfArray` (GUI-enabled) instances for each block header. """ if self.myblocks is None: logging.warning("No blocks to allocate") else: if len(self.myblocks) == 0: for id, (key, curhead) in enumerate(self.head_blocks.items()): if self.wolftype == WOLF_ARRAY_MB_SINGLE: self.myblocks[key] = WolfArray( srcheader=curhead, whichtype=WOLF_ARRAY_FULL_SINGLE) elif self.wolftype == WOLF_ARRAY_MB_INTEGER: self.myblocks[key] = WolfArray( srcheader=curhead, whichtype=WOLF_ARRAY_FULL_INTEGER) self.myblocks[key].isblock = True self.myblocks[key].blockindex = id self.myblocks[key].idx = key
[docs] def _make_result_array(self): """Create an empty result array of the same type (GUI-enabled). :return: A new :class:`WolfArrayMB` with the same header. """ newArray = WolfArrayMB() newArray.set_header(self.get_header()) return newArray
[docs] def as_WolfArray(self, abs: bool = True, forced_header: header_wolf = None) -> WolfArray: """Convert to a single-block GUI-enabled :class:`WolfArray`. When blocks have different resolutions, coarser blocks are rebinned to match the finest resolution. :param abs: If ``True``, use absolute coordinates. :param forced_header: If provided, override the target header. :return: A new single-block :class:`WolfArray`. """ newArray = WolfArray() if forced_header is None: myhead = self.get_header(abs=abs) else: myhead = forced_header myhead.wolftype = self.wolftype dx = set([curblock.get_header().dx for curblock in iter(self.myblocks.values())]) dy = set([curblock.get_header().dy for curblock in iter(self.myblocks.values())]) if len(dx) == 1 and len(dy) == 1: # ---- Uniform resolution ---- newArray.dx = list(dx)[0] newArray.dy = list(dy)[0] newArray.origx = myhead.origx newArray.origy = myhead.origy newArray.nbx = int((myhead.nbx * myhead.dx) // newArray.dx) newArray.nby = int((myhead.nby * myhead.dy) // newArray.dy) newArray.translx = myhead.translx newArray.transly = myhead.transly newArray.wolftype = (WOLF_ARRAY_FULL_SINGLE if myhead.wolftype == WOLF_ARRAY_MB_SINGLE else WOLF_ARRAY_FULL_INTEGER) newArray.allocate_ressources() newArray.array[:, :] = 0 for curblock in self.myblocks.values(): ij = np.where(~curblock.array.mask) if len(ij[0]) > 0: i = ij[0] j = ij[1] x0, y0 = curblock.get_xy_from_ij(0, 0, abs=True) i0, j0 = newArray.get_ij_from_xy(x0, y0, abs=True) i_dest = i + i0 j_dest = j + j0 newArray.array[i_dest, j_dest] = curblock.array[i, j] newArray.array.mask[i_dest, j_dest] = False else: logging.debug( f"Block {curblock.idx} is empty or totally masked.") else: # ---- Multiple resolutions – rebin to finest ---- dx = sorted(list(dx)) dy = sorted(list(dy)) newArray.dx = dx[0] newArray.dy = dy[0] newArray.origx = myhead.origx newArray.origy = myhead.origy newArray.nbx = int((myhead.nbx * myhead.dx) // newArray.dx) newArray.nby = int((myhead.nby * myhead.dy) // newArray.dy) newArray.translx = myhead.translx newArray.transly = myhead.transly newArray.wolftype = (WOLF_ARRAY_FULL_SINGLE if myhead.wolftype == WOLF_ARRAY_MB_SINGLE else WOLF_ARRAY_FULL_INTEGER) newArray.allocate_ressources() newArray.array[:, :] = 0 for curblock in self.myblocks.values(): if curblock.dx == dx[0] and curblock.dy == dy[0]: blockArray = curblock else: factor = dx[0] / curblock.dx blockArray = WolfArray(mold=curblock) blockArray.rebin(factor) ij = np.where(~blockArray.array.mask) if len(ij[0]) > 0: i = ij[0] j = ij[1] x0, y0 = blockArray.get_xy_from_ij(0, 0, abs=True) i0, j0 = newArray.get_ij_from_xy(x0, y0, abs=True) i_dest = i + i0 j_dest = j + j0 newArray.array[i_dest, j_dest] = blockArray.array[i, j] newArray.array.mask[i_dest, j_dest] = False else: logging.debug( f"Block {curblock.idx} is empty or totally masked.") return newArray
# ======================================================================== # File I/O # ========================================================================
[docs] def read_data(self): """Read binary block data from :attr:`filename`. Creates :class:`WolfArray` (GUI-enabled) instances for each block. """ with open(self.filename, 'rb') as f: for i in range(self.nb_blocks): if self.wolftype == WOLF_ARRAY_MB_SINGLE: curblock = WolfArray( whichtype=WOLF_ARRAY_FULL_SINGLE, srcheader=self.head_blocks[getkeyblock(i)]) elif self.wolftype == WOLF_ARRAY_MB_INTEGER: curblock = WolfArray(whichtype=WOLF_ARRAY_FULL_INTEGER) curblock.isblock = True curblock.blockindex = i curblock.idx = getkeyblock(i) curblock._read_binary_data(f) self.myblocks[getkeyblock(i)] = curblock
# ======================================================================== # Mapviewer & GUI Hooks # ========================================================================
[docs] def change_gui(self, newparentgui): """Re-parent this array to a different :class:`WolfMapViewer`. If no viewer was previously attached, creates a new ``Ops_Array`` panel. Otherwise, migrates the existing one. :param newparentgui: Target :class:`WolfMapViewer` instance. :raises AssertionError: If *newparentgui* is not a :class:`WolfMapViewer`. """ from ..PyDraw import WolfMapViewer assert isinstance(newparentgui, WolfMapViewer), \ _('newparentgui must be a WolfMapViewer instance') super().change_gui(newparentgui) for curblock in self.myblocks.values(): # curblock.change_gui(newparentgui) curblock.mapviewer = newparentgui
[docs] def check_plot(self): """Mark the array for plotting and load data if needed. Reads binary data from disk, applies the null-value mask, initialises the RGB buffer and updates the palette. """ self.plotted = True self.mimic_plotdata() if not self.loaded and self.filename != '': if os.path.exists(self.filename): self.read_data() if self.masknull: self.mask_data(self.nullvalue) if VERSION_RGB==1 : if self.rgb is None: self.rgb = np.ones((self.nbx, self.nby, 4), order='F', dtype=np.integer) self.updatepalette(0) self.loaded = True else: raise Exception(_(f"Trying to load an array that doesn't exist ({self.filename})")) else: logging.info(_('Array already loaded'))
[docs] def uncheck_plot(self, unload: bool = True, forceresetOGL: bool = False, askquestion: bool = True): """Uncheck (hide) the array and optionally unload data. Optionally resets OpenGL display lists and frees block data. :param unload: If ``True``, unload block data from memory. :param forceresetOGL: If ``True``, force-reset OpenGL lists without asking. :param askquestion: If ``True``, prompt the user before resetting OpenGL lists. """ self.plotted = False self.mimic_plotdata() if unload and self.filename != '': if askquestion and not forceresetOGL: if self.wx_exists: dlg = wx.MessageDialog(None, _('Do you want to reset OpenGL lists?'), style=wx.YES_NO) ret = dlg.ShowModal() if ret == wx.ID_YES: forceresetOGL = True else: forceresetOGL = True for curblock in self.myblocks.values(): curblock.uncheck_plot(unload, forceresetOGL, askquestion=False) if VERSION_RGB==1 : self.rgb = None self.myblocks = {} self.loaded = False else: logging.info(_('Array not unloaded'))
[docs] def mimic_plotdata(self): """Propagate plot/plotting flags to every child block.""" for curblock in self.myblocks.values(): curblock: WolfArray curblock.plotted = self.plotted curblock.plotting = self.plotting
# ======================================================================== # Palette & Colormap # ========================================================================
[docs] def share_palette(self): """Share this array's palette with all linked arrays.""" for cur in self.linkedarrays: if id(cur.mypal) != id(self.mypal): cur.mypal = self.mypal cur.link_palette()
[docs] def updatepalette(self, which: int = 0, onzoom: list[float] = []): """Update the palette/colormap across all blocks. When :attr:`mypal.automatic` is ``True``, the palette range is recomputed from the data (optionally restricted to *onzoom*). :param which: Colormap index to use. :param onzoom: If not empty, ``[xmin, xmax, ymin, ymax]`` bounds restricting the data used for palette computation. """ if len(self.myblocks) == 0: return if self.mypal.automatic: if onzoom != []: allarrays = [] for curblock in self.myblocks.values(): istart, jstart = curblock.get_ij_from_xy(onzoom[0], onzoom[2]) iend, jend = curblock.get_ij_from_xy(onzoom[1], onzoom[3]) istart = 0 if istart < 0 else istart jstart = 0 if jstart < 0 else jstart iend = curblock.nbx if iend > curblock.nbx else iend jend = curblock.nby if jend > curblock.nby else jend partarray = curblock.array[istart:iend, jstart:jend] partarray = partarray[partarray.mask == False] if len(partarray) > 0: allarrays.append(partarray.flatten()) allarrays = np.concatenate(allarrays) self.mypal.isopop(allarrays, allarrays.count()) else: allarrays = np.concatenate( [curblock.array[curblock.array.mask == False].flatten() for curblock in self.myblocks.values()]) self.mypal.isopop(allarrays, self.nbnotnull) self.link_palette() if VERSION_RGB == 1: for curblock in self.myblocks.values(): curblock.rgb = self.mypal.get_rgba(curblock.array) if self.myops is not None: self.myops.update_palette()
# ======================================================================== # OpenGL Rendering # ========================================================================
[docs] def delete_lists(self): """Delete OpenGL display lists for every block.""" for curblock in self.myblocks.values(): curblock.delete_lists()
[docs] def plot(self, sx=None, sy=None, xmin=None, ymin=None, xmax=None, ymax=None, size=None): """Render all blocks via OpenGL. Also draws the current selection and any attached zones. :param sx: X scale factor. :param sy: Y scale factor. :param xmin: Viewport X minimum. :param ymin: Viewport Y minimum. :param xmax: Viewport X maximum. :param ymax: Viewport Y maximum. :param size: Unused, kept for API compatibility. """ self.plotting = True self.mimic_plotdata() for curblock in self.myblocks.values(): curblock.plot(sx, sy, xmin, ymin, xmax, ymax) self.plotting = False self.mimic_plotdata() # Plot selected nodes if self.SelectionData is not None: self.SelectionData.plot_selection() # Plot zones attached to array if self.myops is not None: self.myops.myzones.plot()
[docs] def _fill_ogllist_for_one_grid_cell(self, curscale, loci, locj, force=False): """Fill OpenGL display list for a single grid cell in every block.""" for curblock in self.myblocks.values(): curblock._fill_ogllist_for_one_grid_cell(curscale, loci, locj, force)
# ======================================================================== # Visualization (Matplotlib) # ========================================================================
[docs] def imshow(self, figax: tuple[Figure, Axis] = None, cmap: Colormap = None, step_ticks=100.) -> tuple[Figure, Axis]: """Create a Matplotlib image from this multi-block array. Assembles all blocks into a single raster at the finest resolution, applies the palette (or a Matplotlib colormap), and renders with ``imshow``. :param figax: ``(fig, ax)`` to plot into. If ``None``, a new figure is created. :param cmap: Matplotlib :class:`~matplotlib.colors.Colormap`. If ``None``, the WOLF palette's RGBA output is used. :param step_ticks: Spacing between axis tick labels in world coordinate units. :return: ``(fig, ax)`` tuple. """ # Use figax if passed as argument if figax is None: fig, ax = plt.subplots(1, 1) else: fig, ax = figax # Find minimal spatial resolution dx = sorted([self[i].dx for i in range(self.nb_blocks)]) dx_min = dx[0] bounds = self.get_bounds() # Set local WolfHeader _header = self.get_header() _header.dx = dx_min _header.dy = dx_min _header.nbx = int((bounds[0][1] - bounds[0][0]) / dx_min) _header.nby = int((bounds[1][1] - bounds[1][0]) / dx_min) if cmap is None: # Full scale image color colors = np.zeros((_header.nbx, _header.nby, 4)) # Iterate on blocks for i in range(self.nb_blocks): # update local colors if not already done if self[i].rgb is None: self[i].updatepalette(0) # Pointing RGB _colors = self[i].rgb if self[i].dx > dx_min: n = int(self[i].dx / dx_min) tmpcolors = np.zeros((_colors.shape[0]*n, _colors.shape[1]*n, _colors.shape[2])) for i in range(n): for j in range(n): tmpcolors[i::n, j::n, :] = _colors _colors = tmpcolors # Searching relative position x, y = self[i].get_xy_from_ij(0, 0) loci, locj = _header.get_ij_from_xy(x, y) # Alias subarray in colors sub = colors[loci:loci+_colors.shape[0], locj:locj+_colors.shape[1], :] # Copy color if not masked values sub[~self[i].array.mask, :] = _colors[~self[i].array.mask, :] # Plot colors = colors.swapaxes(0, 1) ax.imshow(colors, origin='lower') else: # Full scale image values vals = np.zeros((_header.nbx, _header.nby)) alpha = np.zeros(vals.shape) # Iterate on blocks for i in range(self.nb_blocks): # Pointing values _vals = self[i].array.data if self[i].dx > dx_min: n = int(self[i].dx / dx_min) tmpvals = np.zeros((_vals.shape[0]*n, _vals.shape[1]*n)) for i in range(n): for j in range(n): tmpvals[i::n, j::n, :] = _vals _vals = tmpvals # Searching relative position x, y = self[i].get_xy_from_ij(0, 0) loci, locj = _header.get_ij_from_xy(x, y) # Alias subarray in values sub = vals[loci:loci+_vals.shape[0], locj:locj+_vals.shape[1]] # Copy values if not masked sub[~self[i].array.mask] = _vals[~self[i].array.mask] sub = alpha[loci:loci+_vals.shape[0], locj:locj+_vals.shape[1]] sub[~self[i].array.mask] = 1. # Plot vals = vals.swapaxes(0, 1) alpha = alpha.swapaxes(0, 1) ax.imshow(vals, origin='lower', cmap=cmap, alpha=alpha) ax.set_aspect('equal') x_start = (bounds[0][0] // step_ticks) * step_ticks x_end = (bounds[0][1] // step_ticks) * step_ticks y_start = (bounds[1][0] // step_ticks) * step_ticks y_end = (bounds[1][1] // step_ticks) * step_ticks x_pos = np.arange(x_start, x_end + .0001, step_ticks) y_pos = np.arange(y_start, y_end + .0001, step_ticks) i = [self.get_ij_from_xy(x, 0.)[0] for x in x_pos] j = [self.get_ij_from_xy(0., y)[1] for y in y_pos] ax.set_xticks(i) ax.set_xticklabels(x_pos, rotation=30) ax.set_yticks(j) ax.set_yticklabels(y_pos) return fig, ax
[docs] def plot_matplotlib(self, figax: tuple = None, getdata_im: bool = False, update_palette: bool = True, vmin: float = None, vmax: float = None, figsize: tuple = None, Walonmap: bool = False, cat: str = 'IMAGERIE/ORTHO_2022_ETE', first_mask_data: bool = True, with_legend: bool = False): """Plot the multi-block array using Matplotlib. Converts to a single-block :class:`WolfArray` and delegates to its :meth:`~WolfArray.plot_matplotlib` method. Notes ----- - The null-value mask is applied before plotting. - If *Walonmap* is ``True``, a background map from the WalOnMap service is overlaid. - The aspect ratio is set to ``'equal'``. :param figax: ``(fig, ax)`` tuple. If ``None``, a new figure is created. :param getdata_im: If ``True``, also return the image object. :param update_palette: If ``True``, update the palette before plotting. :param vmin: Minimum value for colour scaling. :param vmax: Maximum value for colour scaling. :param figsize: Figure size in inches ``(width, height)``. :param Walonmap: If ``True``, overlay an orthophoto from WalOnMap. :param cat: WalOnMap orthophoto category (e.g. ``'IMAGERIE/ORTHO_2022_ETE'``). :param first_mask_data: If ``True``, apply the mask before plotting. :param with_legend: If ``True``, add a colour legend. :return: ``(fig, ax)`` or ``(fig, ax, im)`` if *getdata_im* is ``True``. :rtype: tuple """ # Convert to single block single_block = self.as_WolfArray() return single_block.plot_matplotlib(figax=figax, getdata_im=getdata_im, update_palette=update_palette, vmin=vmin, vmax=vmax, figsize=figsize, Walonmap=Walonmap, cat=cat, first_mask_data=first_mask_data, with_legend=with_legend)