Source code for wolfhece.wolf_array._mb_model

"""
WolfArrayMBModel - Data-only multi-block array class (no GUI/OpenGL dependency).

For GUI support (OpenGL rendering, wx dialogs), use :class:`WolfArrayMB`
from ``_mb.py``.

Extracted from ``_mb.py`` to follow the same Model/GUI separation pattern as
:class:`WolfArrayModel` / :class:`WolfArray`.

Author: HECE - University of Liege, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import os
import logging
import math

import numpy as np
import numpy.ma as ma

from typing import Union, Literal

try:
    from ..PyTranslate import _
except ImportError as e:
    print(e)
    raise Exception(_('Error importing modules'))

from ..PyVertexvectors import vector, zone

from ._header_wolf import (
    header_wolf,
    getkeyblock,
    decodekeyblock,
    WOLF_ARRAY_FULL_SINGLE,
    WOLF_ARRAY_FULL_INTEGER,
    WOLF_ARRAY_FULL_INTEGER16,
    WOLF_ARRAY_FULL_INTEGER16_2,
    WOLF_ARRAY_MB_SINGLE,
    WOLF_ARRAY_MB_INTEGER,
    WOLF_ARRAY_FULL_INTEGER8,
)

from ._selection_data import SelectionDataMB
from ._base import WolfArrayModel


[docs] class WolfArrayMBModel(WolfArrayModel): """Data-only multi-block array class (no GUI/OpenGL dependency). A multi-block array is composed of several independent rectangular blocks (each a :class:`WolfArrayModel`) stored in the :attr:`myblocks` dictionary, keyed by block identifiers produced by :func:`getkeyblock`. The global header (:class:`header_wolf`) describes the bounding box that encloses all blocks. Individual block headers are kept in :attr:`head_blocks` (inherited from :class:`header_wolf`). For GUI support (OpenGL rendering, wx dialogs, palette management), use :class:`WolfArrayMB` from ``_mb.py``. :ivar myblocks: Dictionary mapping block keys to :class:`WolfArrayModel` instances. Keys are produced by :func:`getkeyblock`. :ivar _active_blocks: Number of currently active blocks (0 = all). :ivar mngselection: Selection manager (:class:`SelectionDataMB`). """
[docs] myblocks: dict[str, WolfArrayModel]
# ======================================================================== # Constructor & Factory # ======================================================================== def __init__(self, fname=None, mold=None, masknull=True, crop=None, whichtype=WOLF_ARRAY_MB_SINGLE, preload=True, nullvalue=0., srcheader=None, idx:str=''): """Create a multi-block array. :param fname: Path to a WOLF binary file. If provided the header (and optionally data) are read from disk. :param mold: Template :class:`WolfArrayModel` to copy layout from. :param masknull: If ``True``, automatically mask cells equal to *nullvalue* after reading. :param crop: Optional spatial crop ``[[xmin, xmax], [ymin, ymax]]``. :param whichtype: Array element type constant (default :data:`WOLF_ARRAY_MB_SINGLE`). :param preload: If ``True``, load data during construction. :param nullvalue: Value used as *no-data* marker. :param srcheader: A :class:`header_wolf` instance to initialise dimensions from (block sub-headers must already be set on it). :param idx: Identity string, used by the map viewer. """ super().__init__(fname, mold, masknull, crop, whichtype, preload, nullvalue, srcheader=srcheader, idx=idx)
[docs] self.mngselection = SelectionDataMB(self)
[docs] self._active_blocks = 0
if self.myblocks is None: self.myblocks = {} @classmethod
[docs] def make_multiblocks(cls, others: list, abs: bool = True, copy: bool = True) -> "WolfArrayMBModel": """Merge several single-block arrays into a new multi-block array. The global header is recomputed from the bounding box of all blocks. :param others: List of :class:`WolfArrayModel` instances to merge. :param abs: If ``True``, use global (absolute) world coordinates. :param copy: If ``True``, copy data from each source array. Set to ``False`` only if the source headers **can** be modified in place (saves memory). :return: A new :class:`WolfArrayMBModel` containing all blocks. """ newMBArray = cls() for curarray in others: newMBArray.add_block(curarray, force_idx=True, copyarray=copy) newMBArray.set_header_from_added_blocks() return newMBArray
# ======================================================================== # Properties # ======================================================================== @property
[docs] def zmin(self) -> float: """Minimum unmasked value across all blocks.""" return np.min([curblock.zmin for curblock in self.myblocks.values()])
@property
[docs] def zmax(self) -> float: """Maximum unmasked value across all blocks.""" return np.max([curblock.zmax for curblock in self.myblocks.values()])
@property
[docs] def zmin_global(self) -> float: """Minimum value across all blocks (including masked cells).""" return np.min([curblock.zmin_global for curblock in self.myblocks.values()])
@property
[docs] def zmax_global(self) -> float: """Maximum value across all blocks (including masked cells).""" return np.max([curblock.zmax_global for curblock in self.myblocks.values()])
@property
[docs] def nullvalue(self) -> float: """No-data / null value shared by all blocks.""" return self._nullvalue
@nullvalue.setter def nullvalue(self, value: float): """Set the null value and propagate to every block. :param value: New null value to apply. """ self._nullvalue = value if self.myblocks is not None: for curblock in self.myblocks.values(): curblock.nullvalue = value # ======================================================================== # Block Management # ======================================================================== def __getitem__(self, block_key: Union[int, str]) -> WolfArrayModel: """Access a block by integer index or string key. :param block_key: 0-based integer index or string key (e.g. ``getkeyblock(0)``). :return: The corresponding :class:`WolfArrayModel`, or ``None`` if the key does not exist. """ if isinstance(block_key, int): _key = getkeyblock(block_key) else: _key = block_key if _key in self.myblocks.keys(): return self.myblocks[_key] else: return None
[docs] def add_block(self, arr: WolfArrayModel, force_idx: bool = False, copyarray=False): """Add a block to this multi-block array. After adding all blocks, call :meth:`set_header_from_added_blocks` to recompute the global header. :param arr: The block to add. :param force_idx: If ``True``, the block key is set automatically based on the current number of blocks. If ``False``, the key must already be set on *arr*. :param copyarray: If ``True``, a deep copy of *arr* is stored instead of the original reference. :raises AssertionError: If *force_idx* is ``False`` and the key on *arr* is missing, duplicated, or out of sequence. """ if copyarray: arr = WolfArrayModel(mold=arr, nullvalue=arr.nullvalue) force_idx = True if force_idx: arr.idx = getkeyblock(len(self.myblocks)) else: assert arr.idx is not None and type(arr.idx) == str and arr.idx.strip() != '', \ f"The block index/key is wrong {arr.idx}" assert arr.idx not in self.myblocks, \ "You can't have the same block twice" pos = len(self.myblocks) posidx = decodekeyblock(arr.idx, False) assert pos == posidx, \ f"The block index/key is wrong {arr.idx}" self.myblocks[arr.idx] = arr arr.isblock = True arr.blockindex = len(self.myblocks) - 1
[docs] def count(self): """Count the number of unmasked (non-null) cells across all blocks. The result is stored in :attr:`nbnotnull` (total) and in each block's ``nbnotnull`` attribute. """ self.nbnotnull = 0 for i in range(self.nb_blocks): curblock = self.myblocks[getkeyblock(i)] curarray = curblock.array nbnotnull = curarray.count() curblock.nbnotnull = nbnotnull self.nbnotnull += nbnotnull
[docs] def check_consistency(self, other): """Check that *other* has the same block structure as *self*. Two multi-block arrays are consistent if they have the same number of blocks and each pair of corresponding blocks has identical spatial headers. :param other: Another :class:`WolfArrayMBModel` to compare. :return: ``True`` if the structures match. :rtype: bool """ test = isinstance(self, type(other)) if test: for curblock, curblockother in zip(self.myblocks.values(), other.myblocks.values()): curblock: WolfArrayModel curblockother: WolfArrayModel test &= curblock.get_header().is_like( curblockother.get_header()) return test
# ======================================================================== # Memory Allocation, Header & Initialization # ========================================================================
[docs] def allocate_ressources(self): """Allocate memory for every block defined in :attr:`head_blocks`. Creates :class:`WolfArrayModel` instances for each block header if :attr:`myblocks` is empty. Does nothing if blocks are already populated. """ if self.myblocks is None: logging.warning("No blocks to allocate") else: if len(self.myblocks) == 0: for id, (key, curhead) in enumerate(self.head_blocks.items()): if self.wolftype == WOLF_ARRAY_MB_SINGLE: self.myblocks[key] = WolfArrayModel( srcheader=curhead, whichtype=WOLF_ARRAY_FULL_SINGLE) elif self.wolftype == WOLF_ARRAY_MB_INTEGER: self.myblocks[key] = WolfArrayModel( srcheader=curhead, whichtype=WOLF_ARRAY_FULL_INTEGER) self.myblocks[key].isblock = True self.myblocks[key].blockindex = id self.myblocks[key].idx = key
[docs] def set_header_from_added_blocks(self): """Recompute the global header from the bounding box of all blocks. Must be called after the last :meth:`add_block` call. Updates :attr:`origx`, :attr:`origy`, :attr:`dx`, :attr:`dy` and the per-block sub-headers in :attr:`head_blocks`. """ if len(self.myblocks) > 0: origx = min([curblock.origx + curblock.translx for curblock in self.myblocks.values()]) origy = min([curblock.origy + curblock.transly for curblock in self.myblocks.values()]) endx = max([curblock.origx + curblock.translx + curblock.nbx * curblock.dx for curblock in self.myblocks.values()]) endy = max([curblock.origy + curblock.transly + curblock.nby * curblock.dy for curblock in self.myblocks.values()]) self.dx = endx - origx self.dy = endy - origy self.nbx = 1 self.nby = 1 self.origx = origx self.origy = origy self.translx = 0. self.transly = 0. self.head_blocks = {} for curblock in self.myblocks.values(): new_trx = self.origx + self.translx new_try = self.origy + self.transly ox = curblock.origx + curblock.translx oy = curblock.origy + curblock.transly curblock.origin = (ox - new_trx, oy - new_try) curblock.translation = (new_trx, new_try) self.head_blocks[curblock.idx] = curblock.get_header()
[docs] def reset(self): """Reset every block's data to its null value.""" for i in range(self.nb_blocks): self[i].reset()
# ======================================================================== # File I/O # ========================================================================
[docs] def read_data(self): """Read binary block data from :attr:`filename`. One :class:`WolfArrayModel` is created per block header and its binary payload is read sequentially from the file. """ with open(self.filename, 'rb') as f: for i in range(self.nb_blocks): if self.wolftype == WOLF_ARRAY_MB_SINGLE: curblock = WolfArrayModel( whichtype=WOLF_ARRAY_FULL_SINGLE, srcheader=self.head_blocks[getkeyblock(i)]) elif self.wolftype == WOLF_ARRAY_MB_INTEGER: curblock = WolfArrayModel( whichtype=WOLF_ARRAY_FULL_INTEGER) curblock.isblock = True curblock.blockindex = i curblock.idx = getkeyblock(i) curblock._read_binary_data(f) self.myblocks[getkeyblock(i)] = curblock
[docs] def write_array(self): """Write the raw binary data of every block to :attr:`filename`. Blocks are written sequentially in index order. """ with open(self.filename, 'wb') as f: for i in range(self.nb_blocks): curarray = self.myblocks[getkeyblock(i)] f.write(curarray.array.data.transpose().tobytes())
# ======================================================================== # Coordinate, Bounds & Grid Utilities # ========================================================================
[docs] def check_bounds_ij(self, i: int, j: int, which_block: int = 1): """Check whether indices *(i, j)* fall inside the global bounds. Converts indices to world coordinates via :meth:`get_xy_from_ij` and delegates to :meth:`check_bounds_xy`. :param i: Column index (0-based). :param j: Row index (0-based). :param which_block: 1-based block index used for the coordinate conversion. :return: ``True`` if the resulting coordinates are inside the global bounding box. :rtype: bool """ x, y = self.get_xy_from_ij(i, j, which_block=which_block) return self.check_bounds_xy(x, y)
[docs] def check_bounds_xy(self, x: float, y: float): """Check whether world coordinates *(x, y)* are inside the global bounding box. :param x: X world coordinate. :param y: Y world coordinate. :return: ``True`` if inside bounds. :rtype: bool """ bounds = self.get_bounds() return (x >= bounds[0][0] and x <= bounds[0][1] and y >= bounds[1][0] and y <= bounds[1][1])
[docs] def get_ij_from_xy(self, x: float, y: float, z: float = 0., scale: float = 1., aswolf: bool = False, abs: bool = True, which_block: int = 1): """Return cell indices *(i, j)* for the given world coordinates. Delegates to the single-block :meth:`WolfArrayModel.get_ij_from_xy` of block *which_block*. :param x: X world coordinate. :param y: Y world coordinate. :param z: Z coordinate (unused, kept for compatibility). :param scale: Scale factor applied before conversion. :param aswolf: If ``True``, return 1-based (Fortran) indices. :param abs: If ``True``, account for translation. :param which_block: 1-based block index. :return: Tuple *(i, j)*. """ return self.myblocks[getkeyblock(which_block, False)].get_ij_from_xy( x, y, z, scale, aswolf, abs)
[docs] def get_xy_from_ij(self, i: int, j: int, which_block: int = 1, aswolf: bool = False, abs: bool = True): """Return world coordinates *(x, y)* for cell indices *(i, j)*. Delegates to the single-block :meth:`WolfArrayModel.get_xy_from_ij` of block *which_block*. :param i: Column index. :param j: Row index. :param which_block: 1-based block index. :param aswolf: If ``True``, indices are 1-based (Fortran). :param abs: If ``True``, account for translation. :return: Tuple *(x, y)*. """ if which_block == 0: logging.warning( "Block index is probably 0-based. It should be 1-based.") return k = getkeyblock(which_block, False) assert k in self.myblocks, \ f"The block '{k}' you ask for doesn't exist." x, y = self.myblocks[k].get_xy_from_ij(i, j, aswolf=aswolf, abs=abs) return x, y
[docs] def get_value(self, x: float, y: float, abs: bool = True, nullvalue: float = -99999, convert_to_float: bool = True): """Read the value at world coordinate *(x, y)*. Iterates through all blocks and returns the first unmasked value found. If no block covers the coordinate, *nullvalue* is returned. :param x: X world coordinate. :param y: Y world coordinate. :param abs: If ``True``, account for translation. :param nullvalue: Value returned when no block covers *(x, y)*. :param convert_to_float: If ``True``, cast the result to Python ``float`` before returning. :return: Cell value, or *nullvalue* if not found. """ h = nullvalue for curblock in self.myblocks.values(): curblock: WolfArrayModel nbx = curblock.nbx nby = curblock.nby i, j = curblock.get_ij_from_xy(x, y, abs=abs) if (i >= 0 and i < nbx and j >= 0 and j < nby): if curblock.array.mask[i, j]: h = nullvalue else: h = curblock.array[i, j] break if convert_to_float: return float(h) else: return h
[docs] def get_values_as_wolf(self, i: int, j: int, which_block: int = 1): """Return the value at 1-based Fortran indices *(i, j)*. :param i: 1-based column index. :param j: 1-based row index. :param which_block: 1-based block index. :return: Cell value or ``np.nan`` if out of bounds. """ h = np.nan if which_block == 0: logging.warning( "Block index is probably 0-based. It should be 1-based.") return h keyblock = getkeyblock(which_block, False) curblock = self.myblocks[keyblock] nbx = curblock.nbx nby = curblock.nby if (i > 0 and i <= nbx and j > 0 and j <= nby): h = curblock.array[i - 1, j - 1] return h
[docs] def get_blockij_from_xy(self, x: float, y: float, abs: bool = True): """Find which block covers the world coordinate *(x, y)*. :param x: X world coordinate. :param y: Y world coordinate. :param abs: If ``True``, account for translation. :return: Tuple *(i, j, k)* where *i* and *j* are the 0-based cell indices within block *k* (1-based block index). Returns ``(-1, -1, -1)`` if no block covers the coordinate. """ exists = False k = 1 for curblock in self.myblocks.values(): curblock: WolfArrayModel nbx = curblock.nbx nby = curblock.nby i, j = curblock.get_ij_from_xy(x, y, abs=abs) if (i >= 0 and i < nbx and j >= 0 and j < nby): if not curblock.array.mask[i, j]: exists = True break k += 1 if exists: return i, j, k else: return -1, -1, -1
# ======================================================================== # Operator Overloading # ========================================================================
[docs] def _make_result_array(self): """Create an empty result array of the same type. Subclasses override to return their own type (e.g. :class:`WolfArrayMB`). :return: A new :class:`WolfArrayMBModel` with the same header. """ newArray = WolfArrayMBModel() newArray.set_header(self.get_header()) return newArray
[docs] def _make_block_op(self, op, curblock, other_block): """Apply a binary operator between two blocks (or a block and a scalar). :param op: Operator string: ``'+'``, ``'-'``, ``'*'``, ``'/'``, or ``'**'``. :param curblock: Left operand (:class:`WolfArrayModel`). :param other_block: Right operand (:class:`WolfArrayModel` or scalar). :return: A new :class:`WolfArrayModel` with the result. """ if op == '+': return curblock + other_block elif op == '-': return curblock - other_block elif op == '*': return curblock * other_block elif op == '/': return curblock / other_block elif op == '**': return curblock ** other_block
[docs] def _apply_op(self, other, op: str): """Generic element-wise arithmetic on multi-block arrays. Handles both *WolfArrayMBModel + WolfArrayMBModel* and *WolfArrayMBModel + scalar* cases. :param other: Another :class:`WolfArrayMBModel` (same structure) or a ``float`` scalar. :param op: Operator string (``'+'``, ``'-'``, etc.). :return: A new multi-block array with the result, or ``None`` on inconsistency. """ newArray = self._make_result_array() if isinstance(self, type(other)): if self.check_consistency(other): i = 0 for curblock, curblockother in zip( self.myblocks.values(), other.myblocks.values()): newblock = self._make_block_op(op, curblock, curblockother) newblock.isblock = True newblock.blockindex = i newArray.myblocks[getkeyblock(i)] = newblock i += 1 else: return None elif isinstance(other, float): if other != 0.: i = 0 for curblock in self.myblocks.values(): newblock = self._make_block_op(op, curblock, other) newblock.isblock = True newblock.blockindex = i newArray.myblocks[getkeyblock(i)] = newblock i += 1 else: return self return newArray
def __add__(self, other): """Element-wise addition (``self + other``).""" return self._apply_op(other, '+') def __mul__(self, other): """Element-wise multiplication (``self * other``).""" return self._apply_op(other, '*') def __sub__(self, other): """Element-wise subtraction (``self - other``).""" return self._apply_op(other, '-') def __pow__(self, other): """Element-wise power (``self ** other``).""" return self._apply_op(other, '**') def __truediv__(self, other): """Element-wise division (``self / other``).""" return self._apply_op(other, '/') # ======================================================================== # Masking & Null Value Operations # ========================================================================
[docs] def mask_data(self, value): """Mask cells whose data equals *value* in every block. For integer array types the value is cast to ``int`` first. Handles ``NaN`` comparisons correctly. :param value: The value to mask. ``None`` is a no-op. """ if self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]: value = int(value) if value is not None: for curblock in self.myblocks.values(): curarray = curblock.array if isinstance(curarray.mask, np.bool_): # Mask is a single boolean – replace with a full array if np.isnan(value) or math.isnan(value): curarray.mask = np.isnan(curarray.data) else: curarray.mask = curarray.data == value else: # Copy to prevent unlinking the mask (see `mask_reset`) if np.isnan(value) or math.isnan(value): np.copyto(curarray.mask, np.isnan(curarray.data)) else: np.copyto(curarray.mask, curarray.data == value) self.count()
[docs] def mask_reset(self): """Reset the mask of every block to ``False`` (all cells unmasked).""" for i in range(self.nb_blocks): self[i].mask_reset() self.count()
[docs] def mask_lower(self, value): """Mask cells whose value is strictly less than *value*. :param value: Threshold value. """ for i in range(self.nb_blocks): self[i].mask_lower(value) self.count()
[docs] def mask_lowerequal(self, value): """Mask cells whose value is less than or equal to *value*. :param value: Threshold value. """ for i in range(self.nb_blocks): self[i].mask_lowerequal(value) self.count()
[docs] def mask_greater(self, value): """Mask cells whose value is strictly greater than *value*. :param value: Threshold value. """ for i in range(self.nb_blocks): self[i].mask_greater(value) self.count()
[docs] def mask_greaterequal(self, value): """Mask cells whose value is greater than or equal to *value*. :param value: Threshold value. """ for i in range(self.nb_blocks): self[i].mask_greaterequal(value) self.count()
[docs] def mask_union(self, source: "WolfArrayMBModel"): """Compute the mask union (logical AND) with another multi-block array. The intersection of masked regions is applied block by block. :param source: Another :class:`WolfArrayMBModel` with the same block structure. """ if isinstance(self, type(source)): if self.check_consistency(source): i = 0 for curblock, curblockother in zip( self.myblocks.values(), source.myblocks.values()): curblock.mask_union(curblockother) i += 1 self.reset_plot()
[docs] def copy_mask(self, source: "WolfArrayMBModel", forcenullvalue: bool = False): """Copy the mask from *source* to *self*, block by block. :param source: Source :class:`WolfArrayMBModel` whose mask is copied. :param forcenullvalue: If ``True``, also set masked cells to the null value. """ if isinstance(self, type(source)): if self.check_consistency(source): i = 0 for curblock, curblockother in zip( self.myblocks.values(), source.myblocks.values()): curblock.copy_mask(curblockother, forcenullvalue) i += 1 self.reset_plot() else: logging.warning( _('Copy mask not supported between different types of arrays'))
# ======================================================================== # Interpolation & Rasterization # ========================================================================
[docs] def interpolate_on_polygon(self, working_vector: vector, method: Literal["nearest", "linear", "cubic"] = "linear", keep: Literal['all', 'below', 'above'] = 'all'): """Interpolate elevation values under a polygon for every block. For each block, cells inside the polygon (or the current selection if one exists) are filled with values interpolated from the 3-D vertices of *working_vector* using :func:`scipy.interpolate.griddata`. :param working_vector: Polygon whose 3-D vertices provide the interpolation source. :param method: Interpolation method (``'nearest'``, ``'linear'``, or ``'cubic'``). :param keep: Which interpolated values to keep: ``'all'``, ``'below'`` (only values lower than existing), or ``'above'``. """ for curblock in self.myblocks.values(): curblock.interpolate_on_polygon(working_vector, method, keep)
[docs] def interpolate_on_polygons(self, working_zone: zone, method: Literal["nearest", "linear", "cubic"] = "linear", keep: Literal['all', 'below', 'above'] = 'all'): """Interpolate elevation values under every polygon in a zone. Calls :meth:`interpolate_on_polygon` for each vector in *working_zone*. :param working_zone: Zone containing one or more polygon vectors. :param method: Interpolation method. :param keep: Which interpolated values to keep. """ for curvector in working_zone.myvectors: self.interpolate_on_polygon(curvector, method, keep)
[docs] def interpolate_on_polyline(self, working_vector: vector, usemask=True): """Interpolate elevation values under a polyline for every block. Cells under the polyline are filled with values interpolated along the 3-D vertices of *working_vector* using Shapely's ``line.interpolate()``. :param working_vector: Polyline whose 3-D vertices provide the interpolation source. :param usemask: If ``True``, only consider unmasked cells. """ for curblock in self.myblocks.values(): curblock.interpolate_on_polyline(working_vector, usemask)
[docs] def interpolate_on_polylines(self, working_zone: zone, usemask=True): """Interpolate elevation values under every polyline in a zone. Calls :meth:`interpolate_on_polyline` for each vector in *working_zone*. :param working_zone: Zone containing one or more polyline vectors. :param usemask: If ``True``, only consider unmasked cells. """ for curvec in working_zone.myvectors: self.interpolate_on_polyline(curvec, usemask)
# ======================================================================== # Filtering & Labelling # ========================================================================
[docs] def filter_zone(self, set_null: bool = False): """Filter zones, keeping only those that contain selected cells. Applied independently to every block. :param set_null: If ``True``, set filtered-out cells to the null value instead of just masking them. """ for curblock in self.myblocks.values(): curblock.filter_zone(set_null, reset_plot=False) self.reset_plot()
[docs] def labelling(self): """Label connected components in every block using SciPy. Each block is labelled independently via :func:`scipy.ndimage.label`. """ for curblock in self.myblocks.values(): curblock.labelling(reset_plot=False) self.reset_plot()
# ======================================================================== # Conversion & Export # ========================================================================
[docs] def as_WolfArray(self, abs: bool = True, forced_header: header_wolf = None) -> WolfArrayModel: """Convert to a single-block :class:`WolfArrayModel`. When blocks have different resolutions, the finer blocks are preserved and coarser blocks are rebinned to match the finest resolution. :param abs: If ``True``, use absolute coordinates (translation included). :param forced_header: If provided, override the target header instead of computing it from blocks. :return: A new single-block :class:`WolfArrayModel`. """ newArray = WolfArrayModel() if forced_header is None: myhead = self.get_header(abs=abs) else: myhead = forced_header myhead.wolftype = self.wolftype dx = set([curblock.get_header().dx for curblock in iter(self.myblocks.values())]) dy = set([curblock.get_header().dy for curblock in iter(self.myblocks.values())]) if len(dx) == 1 and len(dy) == 1: # ---- Uniform resolution ---- newArray.dx = list(dx)[0] newArray.dy = list(dy)[0] newArray.origx = myhead.origx newArray.origy = myhead.origy newArray.nbx = int((myhead.nbx * myhead.dx) // newArray.dx) newArray.nby = int((myhead.nby * myhead.dy) // newArray.dy) newArray.translx = myhead.translx newArray.transly = myhead.transly newArray.wolftype = (WOLF_ARRAY_FULL_SINGLE if myhead.wolftype == WOLF_ARRAY_MB_SINGLE else WOLF_ARRAY_FULL_INTEGER) newArray.allocate_ressources() newArray.array[:, :] = 0 for curblock in self.myblocks.values(): ij = np.where(~curblock.array.mask) if len(ij[0]) > 0: i = ij[0] j = ij[1] x0, y0 = curblock.get_xy_from_ij(0, 0, abs=True) i0, j0 = newArray.get_ij_from_xy(x0, y0, abs=True) i_dest = i + i0 j_dest = j + j0 newArray.array[i_dest, j_dest] = curblock.array[i, j] newArray.array.mask[i_dest, j_dest] = False else: logging.debug( f"Block {curblock.idx} is empty or totally masked.") else: # ---- Multiple resolutions – rebin to finest ---- dx = sorted(list(dx)) dy = sorted(list(dy)) newArray.dx = dx[0] newArray.dy = dy[0] newArray.origx = myhead.origx newArray.origy = myhead.origy newArray.nbx = int((myhead.nbx * myhead.dx) // newArray.dx) newArray.nby = int((myhead.nby * myhead.dy) // newArray.dy) newArray.translx = myhead.translx newArray.transly = myhead.transly newArray.wolftype = (WOLF_ARRAY_FULL_SINGLE if myhead.wolftype == WOLF_ARRAY_MB_SINGLE else WOLF_ARRAY_FULL_INTEGER) newArray.allocate_ressources() newArray.array[:, :] = 0 for curblock in self.myblocks.values(): if curblock.dx == dx[0] and curblock.dy == dy[0]: blockArray = curblock else: factor = dx[0] / curblock.dx blockArray = WolfArrayModel(mold=curblock) blockArray.rebin(factor) ij = np.where(~blockArray.array.mask) if len(ij[0]) > 0: i = ij[0] j = ij[1] x0, y0 = blockArray.get_xy_from_ij(0, 0, abs=True) i0, j0 = newArray.get_ij_from_xy(x0, y0, abs=True) i_dest = i + i0 j_dest = j + j0 newArray.array[i_dest, j_dest] = blockArray.array[i, j] newArray.array.mask[i_dest, j_dest] = False else: logging.debug( f"Block {curblock.idx} is empty or totally masked.") return newArray