"""
WolfArrayMB - GUI-enabled multi-block array class with OpenGL rendering
and wxPython integration.
Inherits all data operations from :class:`WolfArrayMBModel` and adds:
- OpenGL rendering, palette/colormap management
- Interactive wx dialogs
- Matplotlib visualization (``imshow``, ``plot_matplotlib``)
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
import os
import logging
import wx
import numpy as np
import numpy.ma as ma
from typing import Union, Literal
from matplotlib.axis import Axis
from matplotlib.figure import Figure
import matplotlib.pyplot as plt
from matplotlib.colors import Colormap
try:
from ..PyTranslate import _
except ImportError as e:
print(e)
raise Exception(_('Error importing modules'))
from ..PyVertexvectors import Zones, vector, zone
from ..PyPalette import wolfpalette
from ._header_wolf import (
header_wolf,
getkeyblock,
decodekeyblock,
WOLF_ARRAY_FULL_SINGLE,
WOLF_ARRAY_FULL_INTEGER,
WOLF_ARRAY_FULL_INTEGER16,
WOLF_ARRAY_FULL_INTEGER16_2,
WOLF_ARRAY_MB_SINGLE,
WOLF_ARRAY_MB_INTEGER,
WOLF_ARRAY_FULL_INTEGER8,
)
from ._selection_data import SelectionData, SelectionDataMB
from ._base import WolfArrayModel, VERSION_RGB
from ._base_gui import WolfArray
from ._mb_model import WolfArrayMBModel
[docs]
class WolfArrayMB(WolfArrayMBModel, WolfArray):
"""GUI-enabled multi-block array class.
Inherits data operations from :class:`WolfArrayMBModel` and
GUI/rendering from :class:`WolfArray`.
Blocks (:class:`WolfArray` instances) are stored in the
:attr:`myblocks` dictionary, keyed by identifiers produced by
:func:`getkeyblock`.
:ivar myblocks: Dictionary mapping block keys to :class:`WolfArray`
instances.
"""
[docs]
myblocks: dict[str, WolfArray]
# ========================================================================
# Constructor & Factory
# ========================================================================
def __init__(self, fname=None, mold=None, masknull=True, crop=None,
whichtype=WOLF_ARRAY_MB_SINGLE, preload=True,
create=False, mapviewer=None, nullvalue=0., srcheader=None,
idx: str = ''):
"""Create a GUI-enabled multi-block array.
:param fname: Path to a WOLF binary file.
:param mold: Template array to copy layout from.
:param masknull: If ``True``, mask cells equal to *nullvalue*.
:param crop: Optional spatial crop ``[[xmin, xmax], [ymin, ymax]]``.
:param whichtype: Array element type constant.
:param preload: If ``True``, load data during construction.
:param create: If ``True``, create a new file on disk.
:param mapviewer: Parent :class:`WolfMapViewer` for GUI integration.
:param nullvalue: No-data marker value.
:param srcheader: A :class:`header_wolf` to initialise from.
:param idx: Identity string, used by the map viewer.
"""
WolfArray.__init__(self, fname, mold, masknull, crop, whichtype,
preload, create, mapviewer, nullvalue,
srcheader=srcheader, idx=idx)
[docs]
self.mngselection = SelectionDataMB(self)
[docs]
self._active_blocks = 0
if self.myblocks is None:
self.myblocks = {}
@classmethod
[docs]
def make_multiblocks(cls, others: list, abs: bool = True,
copy: bool = True) -> "WolfArrayMB":
"""Merge several single-block arrays into a new GUI multi-block array.
:param others: List of :class:`WolfArray` instances to merge.
:param abs: If ``True``, use global (absolute) world coordinates.
:param copy: If ``True``, copy data from each source array.
:return: A new :class:`WolfArrayMB` containing all blocks.
"""
newMBArray = cls()
for curarray in others:
newMBArray.add_block(curarray, force_idx=True, copyarray=copy)
newMBArray.set_header_from_added_blocks()
return newMBArray
# ========================================================================
# Block Management
# ========================================================================
[docs]
def add_block(self, arr: WolfArray, force_idx: bool = False,
copyarray=False):
"""Add a block to this multi-block array.
After adding all blocks, call :meth:`set_header_from_added_blocks`
to recompute the global header.
:param arr: The block to add.
:param force_idx: If ``True``, the block key is assigned
automatically.
:param copyarray: If ``True``, a deep copy of *arr* is stored.
"""
if copyarray:
arr = WolfArray(mold=arr, nullvalue=arr.nullvalue)
force_idx = True
if force_idx:
arr.idx = getkeyblock(len(self.myblocks))
else:
assert arr.idx is not None and type(arr.idx) == str and arr.idx.strip() != '', \
f"The block index/key is wrong {arr.idx}"
assert arr.idx not in self.myblocks, \
"You can't have the same block twice"
pos = len(self.myblocks)
posidx = decodekeyblock(arr.idx, False)
assert pos == posidx, \
f"The block index/key is wrong {arr.idx}"
self.myblocks[arr.idx] = arr
arr.isblock = True
arr.blockindex = len(self.myblocks) - 1
[docs]
def add_ops_sel(self):
"""Add operations and selection manager to all blocks."""
super().add_ops_sel()
if self.myblocks is None:
self.myblocks = {}
for curblock in self.myblocks.values():
curblock.add_ops_sel()
[docs]
def contour(self, levels: Union[int, list[float]] = 10) -> Zones:
"""Compute contour lines from this multi-block array.
The array is first converted to a single-block :class:`WolfArray`
before contouring.
:param levels: Number of levels or explicit list of level values.
:return: A :class:`Zones` object containing the contour polylines.
"""
tmp = self.as_WolfArray()
return tmp.contour(levels)
# ========================================================================
# Memory Allocation & Conversion
# ========================================================================
[docs]
def allocate_ressources(self):
"""Allocate memory for every block defined in :attr:`head_blocks`.
Creates :class:`WolfArray` (GUI-enabled) instances for each block
header.
"""
if self.myblocks is None:
logging.warning("No blocks to allocate")
else:
if len(self.myblocks) == 0:
for id, (key, curhead) in enumerate(self.head_blocks.items()):
if self.wolftype == WOLF_ARRAY_MB_SINGLE:
self.myblocks[key] = WolfArray(
srcheader=curhead,
whichtype=WOLF_ARRAY_FULL_SINGLE)
elif self.wolftype == WOLF_ARRAY_MB_INTEGER:
self.myblocks[key] = WolfArray(
srcheader=curhead,
whichtype=WOLF_ARRAY_FULL_INTEGER)
self.myblocks[key].isblock = True
self.myblocks[key].blockindex = id
self.myblocks[key].idx = key
[docs]
def _make_result_array(self):
"""Create an empty result array of the same type (GUI-enabled).
:return: A new :class:`WolfArrayMB` with the same header.
"""
newArray = WolfArrayMB()
newArray.set_header(self.get_header())
return newArray
[docs]
def as_WolfArray(self, abs: bool = True,
forced_header: header_wolf = None) -> WolfArray:
"""Convert to a single-block GUI-enabled :class:`WolfArray`.
When blocks have different resolutions, coarser blocks are rebinned
to match the finest resolution.
:param abs: If ``True``, use absolute coordinates.
:param forced_header: If provided, override the target header.
:return: A new single-block :class:`WolfArray`.
"""
newArray = WolfArray()
if forced_header is None:
myhead = self.get_header(abs=abs)
else:
myhead = forced_header
myhead.wolftype = self.wolftype
dx = set([curblock.get_header().dx
for curblock in iter(self.myblocks.values())])
dy = set([curblock.get_header().dy
for curblock in iter(self.myblocks.values())])
if len(dx) == 1 and len(dy) == 1:
# ---- Uniform resolution ----
newArray.dx = list(dx)[0]
newArray.dy = list(dy)[0]
newArray.origx = myhead.origx
newArray.origy = myhead.origy
newArray.nbx = int((myhead.nbx * myhead.dx) // newArray.dx)
newArray.nby = int((myhead.nby * myhead.dy) // newArray.dy)
newArray.translx = myhead.translx
newArray.transly = myhead.transly
newArray.wolftype = (WOLF_ARRAY_FULL_SINGLE
if myhead.wolftype == WOLF_ARRAY_MB_SINGLE
else WOLF_ARRAY_FULL_INTEGER)
newArray.allocate_ressources()
newArray.array[:, :] = 0
for curblock in self.myblocks.values():
ij = np.where(~curblock.array.mask)
if len(ij[0]) > 0:
i = ij[0]
j = ij[1]
x0, y0 = curblock.get_xy_from_ij(0, 0, abs=True)
i0, j0 = newArray.get_ij_from_xy(x0, y0, abs=True)
i_dest = i + i0
j_dest = j + j0
newArray.array[i_dest, j_dest] = curblock.array[i, j]
newArray.array.mask[i_dest, j_dest] = False
else:
logging.debug(
f"Block {curblock.idx} is empty or totally masked.")
else:
# ---- Multiple resolutions – rebin to finest ----
dx = sorted(list(dx))
dy = sorted(list(dy))
newArray.dx = dx[0]
newArray.dy = dy[0]
newArray.origx = myhead.origx
newArray.origy = myhead.origy
newArray.nbx = int((myhead.nbx * myhead.dx) // newArray.dx)
newArray.nby = int((myhead.nby * myhead.dy) // newArray.dy)
newArray.translx = myhead.translx
newArray.transly = myhead.transly
newArray.wolftype = (WOLF_ARRAY_FULL_SINGLE
if myhead.wolftype == WOLF_ARRAY_MB_SINGLE
else WOLF_ARRAY_FULL_INTEGER)
newArray.allocate_ressources()
newArray.array[:, :] = 0
for curblock in self.myblocks.values():
if curblock.dx == dx[0] and curblock.dy == dy[0]:
blockArray = curblock
else:
factor = dx[0] / curblock.dx
blockArray = WolfArray(mold=curblock)
blockArray.rebin(factor)
ij = np.where(~blockArray.array.mask)
if len(ij[0]) > 0:
i = ij[0]
j = ij[1]
x0, y0 = blockArray.get_xy_from_ij(0, 0, abs=True)
i0, j0 = newArray.get_ij_from_xy(x0, y0, abs=True)
i_dest = i + i0
j_dest = j + j0
newArray.array[i_dest, j_dest] = blockArray.array[i, j]
newArray.array.mask[i_dest, j_dest] = False
else:
logging.debug(
f"Block {curblock.idx} is empty or totally masked.")
return newArray
# ========================================================================
# File I/O
# ========================================================================
[docs]
def read_data(self):
"""Read binary block data from :attr:`filename`.
Creates :class:`WolfArray` (GUI-enabled) instances for each block.
"""
with open(self.filename, 'rb') as f:
for i in range(self.nb_blocks):
if self.wolftype == WOLF_ARRAY_MB_SINGLE:
curblock = WolfArray(
whichtype=WOLF_ARRAY_FULL_SINGLE,
srcheader=self.head_blocks[getkeyblock(i)])
elif self.wolftype == WOLF_ARRAY_MB_INTEGER:
curblock = WolfArray(whichtype=WOLF_ARRAY_FULL_INTEGER)
curblock.isblock = True
curblock.blockindex = i
curblock.idx = getkeyblock(i)
curblock._read_binary_data(f)
self.myblocks[getkeyblock(i)] = curblock
# ========================================================================
# Mapviewer & GUI Hooks
# ========================================================================
[docs]
def change_gui(self, newparentgui):
"""Re-parent this array to a different :class:`WolfMapViewer`.
If no viewer was previously attached, creates a new
``Ops_Array`` panel. Otherwise, migrates the existing one.
:param newparentgui: Target :class:`WolfMapViewer` instance.
:raises AssertionError: If *newparentgui* is not a
:class:`WolfMapViewer`.
"""
from ..PyDraw import WolfMapViewer
assert isinstance(newparentgui, WolfMapViewer), \
_('newparentgui must be a WolfMapViewer instance')
super().change_gui(newparentgui)
for curblock in self.myblocks.values():
# curblock.change_gui(newparentgui)
curblock.mapviewer = newparentgui
[docs]
def check_plot(self):
"""Mark the array for plotting and load data if needed.
Reads binary data from disk, applies the null-value mask,
initialises the RGB buffer and updates the palette.
"""
self.plotted = True
self.mimic_plotdata()
if not self.loaded and self.filename != '':
if os.path.exists(self.filename):
self.read_data()
if self.masknull:
self.mask_data(self.nullvalue)
if VERSION_RGB==1 :
if self.rgb is None:
self.rgb = np.ones((self.nbx, self.nby, 4), order='F', dtype=np.integer)
self.updatepalette(0)
self.loaded = True
else:
raise Exception(_(f"Trying to load an array that doesn't exist ({self.filename})"))
else:
logging.info(_('Array already loaded'))
[docs]
def uncheck_plot(self, unload: bool = True, forceresetOGL: bool = False,
askquestion: bool = True):
"""Uncheck (hide) the array and optionally unload data.
Optionally resets OpenGL display lists and frees block data.
:param unload: If ``True``, unload block data from memory.
:param forceresetOGL: If ``True``, force-reset OpenGL lists
without asking.
:param askquestion: If ``True``, prompt the user before resetting
OpenGL lists.
"""
self.plotted = False
self.mimic_plotdata()
if unload and self.filename != '':
if askquestion and not forceresetOGL:
if self.wx_exists:
dlg = wx.MessageDialog(None, _('Do you want to reset OpenGL lists?'), style=wx.YES_NO)
ret = dlg.ShowModal()
if ret == wx.ID_YES:
forceresetOGL = True
else:
forceresetOGL = True
for curblock in self.myblocks.values():
curblock.uncheck_plot(unload, forceresetOGL, askquestion=False)
if VERSION_RGB==1 : self.rgb = None
self.myblocks = {}
self.loaded = False
else:
logging.info(_('Array not unloaded'))
[docs]
def mimic_plotdata(self):
"""Propagate plot/plotting flags to every child block."""
for curblock in self.myblocks.values():
curblock: WolfArray
curblock.plotted = self.plotted
curblock.plotting = self.plotting
# ========================================================================
# Palette & Colormap
# ========================================================================
[docs]
def share_palette(self):
"""Share this array's palette with all linked arrays."""
for cur in self.linkedarrays:
if id(cur.mypal) != id(self.mypal):
cur.mypal = self.mypal
cur.link_palette()
[docs]
def link_palette(self):
"""Assign this array's palette to every child block."""
for curblock in self.myblocks.values():
curblock.mypal = self.mypal
[docs]
def updatepalette(self, which: int = 0, onzoom: list[float] = []):
"""Update the palette/colormap across all blocks.
When :attr:`mypal.automatic` is ``True``, the palette range is
recomputed from the data (optionally restricted to *onzoom*).
:param which: Colormap index to use.
:param onzoom: If not empty, ``[xmin, xmax, ymin, ymax]`` bounds
restricting the data used for palette computation.
"""
if len(self.myblocks) == 0:
return
if self.mypal.automatic:
if onzoom != []:
allarrays = []
for curblock in self.myblocks.values():
istart, jstart = curblock.get_ij_from_xy(onzoom[0], onzoom[2])
iend, jend = curblock.get_ij_from_xy(onzoom[1], onzoom[3])
istart = 0 if istart < 0 else istart
jstart = 0 if jstart < 0 else jstart
iend = curblock.nbx if iend > curblock.nbx else iend
jend = curblock.nby if jend > curblock.nby else jend
partarray = curblock.array[istart:iend, jstart:jend]
partarray = partarray[partarray.mask == False]
if len(partarray) > 0:
allarrays.append(partarray.flatten())
allarrays = np.concatenate(allarrays)
self.mypal.isopop(allarrays, allarrays.count())
else:
allarrays = np.concatenate(
[curblock.array[curblock.array.mask == False].flatten()
for curblock in self.myblocks.values()])
self.mypal.isopop(allarrays, self.nbnotnull)
self.link_palette()
if VERSION_RGB == 1:
for curblock in self.myblocks.values():
curblock.rgb = self.mypal.get_rgba(curblock.array)
if self.myops is not None:
self.myops.update_palette()
# ========================================================================
# OpenGL Rendering
# ========================================================================
[docs]
def delete_lists(self):
"""Delete OpenGL display lists for every block."""
for curblock in self.myblocks.values():
curblock.delete_lists()
[docs]
def plot(self, sx=None, sy=None, xmin=None, ymin=None, xmax=None,
ymax=None, size=None):
"""Render all blocks via OpenGL.
Also draws the current selection and any attached zones.
:param sx: X scale factor.
:param sy: Y scale factor.
:param xmin: Viewport X minimum.
:param ymin: Viewport Y minimum.
:param xmax: Viewport X maximum.
:param ymax: Viewport Y maximum.
:param size: Unused, kept for API compatibility.
"""
self.plotting = True
self.mimic_plotdata()
for curblock in self.myblocks.values():
curblock.plot(sx, sy, xmin, ymin, xmax, ymax)
self.plotting = False
self.mimic_plotdata()
# Plot selected nodes
if self.SelectionData is not None:
self.SelectionData.plot_selection()
# Plot zones attached to array
if self.myops is not None:
self.myops.myzones.plot()
[docs]
def _fill_ogllist_for_one_grid_cell(self, curscale, loci, locj,
force=False):
"""Fill OpenGL display list for a single grid cell in every block."""
for curblock in self.myblocks.values():
curblock._fill_ogllist_for_one_grid_cell(curscale, loci, locj,
force)
# ========================================================================
# Visualization (Matplotlib)
# ========================================================================
[docs]
def imshow(self, figax: tuple[Figure, Axis] = None,
cmap: Colormap = None,
step_ticks=100.) -> tuple[Figure, Axis]:
"""Create a Matplotlib image from this multi-block array.
Assembles all blocks into a single raster at the finest resolution,
applies the palette (or a Matplotlib colormap), and renders with
``imshow``.
:param figax: ``(fig, ax)`` to plot into. If ``None``, a new
figure is created.
:param cmap: Matplotlib :class:`~matplotlib.colors.Colormap`.
If ``None``, the WOLF palette's RGBA output is used.
:param step_ticks: Spacing between axis tick labels in world
coordinate units.
:return: ``(fig, ax)`` tuple.
"""
# Use figax if passed as argument
if figax is None:
fig, ax = plt.subplots(1, 1)
else:
fig, ax = figax
# Find minimal spatial resolution
dx = sorted([self[i].dx for i in range(self.nb_blocks)])
dx_min = dx[0]
bounds = self.get_bounds()
# Set local WolfHeader
_header = self.get_header()
_header.dx = dx_min
_header.dy = dx_min
_header.nbx = int((bounds[0][1] - bounds[0][0]) / dx_min)
_header.nby = int((bounds[1][1] - bounds[1][0]) / dx_min)
if cmap is None:
# Full scale image color
colors = np.zeros((_header.nbx, _header.nby, 4))
# Iterate on blocks
for i in range(self.nb_blocks):
# update local colors if not already done
if self[i].rgb is None:
self[i].updatepalette(0)
# Pointing RGB
_colors = self[i].rgb
if self[i].dx > dx_min:
n = int(self[i].dx / dx_min)
tmpcolors = np.zeros((_colors.shape[0]*n, _colors.shape[1]*n, _colors.shape[2]))
for i in range(n):
for j in range(n):
tmpcolors[i::n, j::n, :] = _colors
_colors = tmpcolors
# Searching relative position
x, y = self[i].get_xy_from_ij(0, 0)
loci, locj = _header.get_ij_from_xy(x, y)
# Alias subarray in colors
sub = colors[loci:loci+_colors.shape[0], locj:locj+_colors.shape[1], :]
# Copy color if not masked values
sub[~self[i].array.mask, :] = _colors[~self[i].array.mask, :]
# Plot
colors = colors.swapaxes(0, 1)
ax.imshow(colors, origin='lower')
else:
# Full scale image values
vals = np.zeros((_header.nbx, _header.nby))
alpha = np.zeros(vals.shape)
# Iterate on blocks
for i in range(self.nb_blocks):
# Pointing values
_vals = self[i].array.data
if self[i].dx > dx_min:
n = int(self[i].dx / dx_min)
tmpvals = np.zeros((_vals.shape[0]*n, _vals.shape[1]*n))
for i in range(n):
for j in range(n):
tmpvals[i::n, j::n, :] = _vals
_vals = tmpvals
# Searching relative position
x, y = self[i].get_xy_from_ij(0, 0)
loci, locj = _header.get_ij_from_xy(x, y)
# Alias subarray in values
sub = vals[loci:loci+_vals.shape[0], locj:locj+_vals.shape[1]]
# Copy values if not masked
sub[~self[i].array.mask] = _vals[~self[i].array.mask]
sub = alpha[loci:loci+_vals.shape[0], locj:locj+_vals.shape[1]]
sub[~self[i].array.mask] = 1.
# Plot
vals = vals.swapaxes(0, 1)
alpha = alpha.swapaxes(0, 1)
ax.imshow(vals, origin='lower', cmap=cmap, alpha=alpha)
ax.set_aspect('equal')
x_start = (bounds[0][0] // step_ticks) * step_ticks
x_end = (bounds[0][1] // step_ticks) * step_ticks
y_start = (bounds[1][0] // step_ticks) * step_ticks
y_end = (bounds[1][1] // step_ticks) * step_ticks
x_pos = np.arange(x_start, x_end + .0001, step_ticks)
y_pos = np.arange(y_start, y_end + .0001, step_ticks)
i = [self.get_ij_from_xy(x, 0.)[0] for x in x_pos]
j = [self.get_ij_from_xy(0., y)[1] for y in y_pos]
ax.set_xticks(i)
ax.set_xticklabels(x_pos, rotation=30)
ax.set_yticks(j)
ax.set_yticklabels(y_pos)
return fig, ax
[docs]
def plot_matplotlib(self,
figax: tuple = None,
getdata_im: bool = False,
update_palette: bool = True,
vmin: float = None, vmax: float = None,
figsize: tuple = None,
Walonmap: bool = False,
cat: str = 'IMAGERIE/ORTHO_2022_ETE',
first_mask_data: bool = True,
with_legend: bool = False):
"""Plot the multi-block array using Matplotlib.
Converts to a single-block :class:`WolfArray` and delegates to
its :meth:`~WolfArray.plot_matplotlib` method.
Notes
-----
- The null-value mask is applied before plotting.
- If *Walonmap* is ``True``, a background map from the WalOnMap
service is overlaid.
- The aspect ratio is set to ``'equal'``.
:param figax: ``(fig, ax)`` tuple. If ``None``, a new figure is
created.
:param getdata_im: If ``True``, also return the image object.
:param update_palette: If ``True``, update the palette before
plotting.
:param vmin: Minimum value for colour scaling.
:param vmax: Maximum value for colour scaling.
:param figsize: Figure size in inches ``(width, height)``.
:param Walonmap: If ``True``, overlay an orthophoto from WalOnMap.
:param cat: WalOnMap orthophoto category (e.g.
``'IMAGERIE/ORTHO_2022_ETE'``).
:param first_mask_data: If ``True``, apply the mask before plotting.
:param with_legend: If ``True``, add a colour legend.
:return: ``(fig, ax)`` or ``(fig, ax, im)`` if *getdata_im* is
``True``.
:rtype: tuple
"""
# Convert to single block
single_block = self.as_WolfArray()
return single_block.plot_matplotlib(figax=figax,
getdata_im=getdata_im,
update_palette=update_palette,
vmin=vmin, vmax=vmax,
figsize=figsize,
Walonmap=Walonmap,
cat=cat,
first_mask_data=first_mask_data,
with_legend=with_legend)