"""
WolfArrayMBModel - Data-only multi-block array class (no GUI/OpenGL dependency).
For GUI support (OpenGL rendering, wx dialogs), use :class:`WolfArrayMB`
from ``_mb.py``.
Extracted from ``_mb.py`` to follow the same Model/GUI separation pattern as
:class:`WolfArrayModel` / :class:`WolfArray`.
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
import os
import logging
import math
import numpy as np
import numpy.ma as ma
from typing import Union, Literal
try:
from ..PyTranslate import _
except ImportError as e:
print(e)
raise Exception(_('Error importing modules'))
from ..PyVertexvectors import vector, zone
from ._header_wolf import (
header_wolf,
getkeyblock,
decodekeyblock,
WOLF_ARRAY_FULL_SINGLE,
WOLF_ARRAY_FULL_INTEGER,
WOLF_ARRAY_FULL_INTEGER16,
WOLF_ARRAY_FULL_INTEGER16_2,
WOLF_ARRAY_MB_SINGLE,
WOLF_ARRAY_MB_INTEGER,
WOLF_ARRAY_FULL_INTEGER8,
)
from ._selection_data import SelectionDataMB
from ._base import WolfArrayModel
[docs]
class WolfArrayMBModel(WolfArrayModel):
"""Data-only multi-block array class (no GUI/OpenGL dependency).
A multi-block array is composed of several independent rectangular
blocks (each a :class:`WolfArrayModel`) stored in the :attr:`myblocks`
dictionary, keyed by block identifiers produced by :func:`getkeyblock`.
The global header (:class:`header_wolf`) describes the bounding box
that encloses all blocks. Individual block headers are kept in
:attr:`head_blocks` (inherited from :class:`header_wolf`).
For GUI support (OpenGL rendering, wx dialogs, palette management),
use :class:`WolfArrayMB` from ``_mb.py``.
:ivar myblocks: Dictionary mapping block keys to :class:`WolfArrayModel`
instances. Keys are produced by :func:`getkeyblock`.
:ivar _active_blocks: Number of currently active blocks (0 = all).
:ivar mngselection: Selection manager (:class:`SelectionDataMB`).
"""
[docs]
myblocks: dict[str, WolfArrayModel]
# ========================================================================
# Constructor & Factory
# ========================================================================
def __init__(self,
fname=None,
mold=None,
masknull=True,
crop=None,
whichtype=WOLF_ARRAY_MB_SINGLE,
preload=True,
nullvalue=0.,
srcheader=None,
idx:str=''):
"""Create a multi-block array.
:param fname: Path to a WOLF binary file. If provided the header
(and optionally data) are read from disk.
:param mold: Template :class:`WolfArrayModel` to copy layout from.
:param masknull: If ``True``, automatically mask cells equal to
*nullvalue* after reading.
:param crop: Optional spatial crop ``[[xmin, xmax], [ymin, ymax]]``.
:param whichtype: Array element type constant
(default :data:`WOLF_ARRAY_MB_SINGLE`).
:param preload: If ``True``, load data during construction.
:param nullvalue: Value used as *no-data* marker.
:param srcheader: A :class:`header_wolf` instance to initialise
dimensions from (block sub-headers must already be set on it).
:param idx: Identity string, used by the map viewer.
"""
super().__init__(fname, mold, masknull, crop, whichtype, preload,
nullvalue, srcheader=srcheader, idx=idx)
[docs]
self.mngselection = SelectionDataMB(self)
[docs]
self._active_blocks = 0
if self.myblocks is None:
self.myblocks = {}
@classmethod
[docs]
def make_multiblocks(cls, others: list, abs: bool = True,
copy: bool = True) -> "WolfArrayMBModel":
"""Merge several single-block arrays into a new multi-block array.
The global header is recomputed from the bounding box of all blocks.
:param others: List of :class:`WolfArrayModel` instances to merge.
:param abs: If ``True``, use global (absolute) world coordinates.
:param copy: If ``True``, copy data from each source array.
Set to ``False`` only if the source headers **can** be modified
in place (saves memory).
:return: A new :class:`WolfArrayMBModel` containing all blocks.
"""
newMBArray = cls()
for curarray in others:
newMBArray.add_block(curarray, force_idx=True, copyarray=copy)
newMBArray.set_header_from_added_blocks()
return newMBArray
# ========================================================================
# Properties
# ========================================================================
@property
[docs]
def zmin(self) -> float:
"""Minimum unmasked value across all blocks."""
return np.min([curblock.zmin for curblock in self.myblocks.values()])
@property
[docs]
def zmax(self) -> float:
"""Maximum unmasked value across all blocks."""
return np.max([curblock.zmax for curblock in self.myblocks.values()])
@property
[docs]
def zmin_global(self) -> float:
"""Minimum value across all blocks (including masked cells)."""
return np.min([curblock.zmin_global
for curblock in self.myblocks.values()])
@property
[docs]
def zmax_global(self) -> float:
"""Maximum value across all blocks (including masked cells)."""
return np.max([curblock.zmax_global
for curblock in self.myblocks.values()])
@property
[docs]
def nullvalue(self) -> float:
"""No-data / null value shared by all blocks."""
return self._nullvalue
@nullvalue.setter
def nullvalue(self, value: float):
"""Set the null value and propagate to every block.
:param value: New null value to apply.
"""
self._nullvalue = value
if self.myblocks is not None:
for curblock in self.myblocks.values():
curblock.nullvalue = value
# ========================================================================
# Block Management
# ========================================================================
def __getitem__(self, block_key: Union[int, str]) -> WolfArrayModel:
"""Access a block by integer index or string key.
:param block_key: 0-based integer index or string key
(e.g. ``getkeyblock(0)``).
:return: The corresponding :class:`WolfArrayModel`, or ``None``
if the key does not exist.
"""
if isinstance(block_key, int):
_key = getkeyblock(block_key)
else:
_key = block_key
if _key in self.myblocks.keys():
return self.myblocks[_key]
else:
return None
[docs]
def add_block(self, arr: WolfArrayModel, force_idx: bool = False,
copyarray=False):
"""Add a block to this multi-block array.
After adding all blocks, call :meth:`set_header_from_added_blocks`
to recompute the global header.
:param arr: The block to add.
:param force_idx: If ``True``, the block key is set automatically
based on the current number of blocks. If ``False``, the key
must already be set on *arr*.
:param copyarray: If ``True``, a deep copy of *arr* is stored
instead of the original reference.
:raises AssertionError: If *force_idx* is ``False`` and the key on
*arr* is missing, duplicated, or out of sequence.
"""
if copyarray:
arr = WolfArrayModel(mold=arr, nullvalue=arr.nullvalue)
force_idx = True
if force_idx:
arr.idx = getkeyblock(len(self.myblocks))
else:
assert arr.idx is not None and type(arr.idx) == str and arr.idx.strip() != '', \
f"The block index/key is wrong {arr.idx}"
assert arr.idx not in self.myblocks, \
"You can't have the same block twice"
pos = len(self.myblocks)
posidx = decodekeyblock(arr.idx, False)
assert pos == posidx, \
f"The block index/key is wrong {arr.idx}"
self.myblocks[arr.idx] = arr
arr.isblock = True
arr.blockindex = len(self.myblocks) - 1
[docs]
def count(self):
"""Count the number of unmasked (non-null) cells across all blocks.
The result is stored in :attr:`nbnotnull` (total) and in each
block's ``nbnotnull`` attribute.
"""
self.nbnotnull = 0
for i in range(self.nb_blocks):
curblock = self.myblocks[getkeyblock(i)]
curarray = curblock.array
nbnotnull = curarray.count()
curblock.nbnotnull = nbnotnull
self.nbnotnull += nbnotnull
[docs]
def check_consistency(self, other):
"""Check that *other* has the same block structure as *self*.
Two multi-block arrays are consistent if they have the same
number of blocks and each pair of corresponding blocks has
identical spatial headers.
:param other: Another :class:`WolfArrayMBModel` to compare.
:return: ``True`` if the structures match.
:rtype: bool
"""
test = isinstance(self, type(other))
if test:
for curblock, curblockother in zip(self.myblocks.values(),
other.myblocks.values()):
curblock: WolfArrayModel
curblockother: WolfArrayModel
test &= curblock.get_header().is_like(
curblockother.get_header())
return test
# ========================================================================
# Memory Allocation, Header & Initialization
# ========================================================================
[docs]
def allocate_ressources(self):
"""Allocate memory for every block defined in :attr:`head_blocks`.
Creates :class:`WolfArrayModel` instances for each block header
if :attr:`myblocks` is empty. Does nothing if blocks are already
populated.
"""
if self.myblocks is None:
logging.warning("No blocks to allocate")
else:
if len(self.myblocks) == 0:
for id, (key, curhead) in enumerate(self.head_blocks.items()):
if self.wolftype == WOLF_ARRAY_MB_SINGLE:
self.myblocks[key] = WolfArrayModel(
srcheader=curhead,
whichtype=WOLF_ARRAY_FULL_SINGLE)
elif self.wolftype == WOLF_ARRAY_MB_INTEGER:
self.myblocks[key] = WolfArrayModel(
srcheader=curhead,
whichtype=WOLF_ARRAY_FULL_INTEGER)
self.myblocks[key].isblock = True
self.myblocks[key].blockindex = id
self.myblocks[key].idx = key
[docs]
def reset(self):
"""Reset every block's data to its null value."""
for i in range(self.nb_blocks):
self[i].reset()
# ========================================================================
# File I/O
# ========================================================================
[docs]
def read_data(self):
"""Read binary block data from :attr:`filename`.
One :class:`WolfArrayModel` is created per block header and its
binary payload is read sequentially from the file.
"""
with open(self.filename, 'rb') as f:
for i in range(self.nb_blocks):
if self.wolftype == WOLF_ARRAY_MB_SINGLE:
curblock = WolfArrayModel(
whichtype=WOLF_ARRAY_FULL_SINGLE,
srcheader=self.head_blocks[getkeyblock(i)])
elif self.wolftype == WOLF_ARRAY_MB_INTEGER:
curblock = WolfArrayModel(
whichtype=WOLF_ARRAY_FULL_INTEGER)
curblock.isblock = True
curblock.blockindex = i
curblock.idx = getkeyblock(i)
curblock._read_binary_data(f)
self.myblocks[getkeyblock(i)] = curblock
[docs]
def write_array(self):
"""Write the raw binary data of every block to :attr:`filename`.
Blocks are written sequentially in index order.
"""
with open(self.filename, 'wb') as f:
for i in range(self.nb_blocks):
curarray = self.myblocks[getkeyblock(i)]
f.write(curarray.array.data.transpose().tobytes())
# ========================================================================
# Coordinate, Bounds & Grid Utilities
# ========================================================================
[docs]
def check_bounds_ij(self, i: int, j: int, which_block: int = 1):
"""Check whether indices *(i, j)* fall inside the global bounds.
Converts indices to world coordinates via :meth:`get_xy_from_ij`
and delegates to :meth:`check_bounds_xy`.
:param i: Column index (0-based).
:param j: Row index (0-based).
:param which_block: 1-based block index used for the coordinate
conversion.
:return: ``True`` if the resulting coordinates are inside the
global bounding box.
:rtype: bool
"""
x, y = self.get_xy_from_ij(i, j, which_block=which_block)
return self.check_bounds_xy(x, y)
[docs]
def check_bounds_xy(self, x: float, y: float):
"""Check whether world coordinates *(x, y)* are inside the global
bounding box.
:param x: X world coordinate.
:param y: Y world coordinate.
:return: ``True`` if inside bounds.
:rtype: bool
"""
bounds = self.get_bounds()
return (x >= bounds[0][0] and x <= bounds[0][1]
and y >= bounds[1][0] and y <= bounds[1][1])
[docs]
def get_ij_from_xy(self, x: float, y: float, z: float = 0.,
scale: float = 1., aswolf: bool = False,
abs: bool = True, which_block: int = 1):
"""Return cell indices *(i, j)* for the given world coordinates.
Delegates to the single-block :meth:`WolfArrayModel.get_ij_from_xy`
of block *which_block*.
:param x: X world coordinate.
:param y: Y world coordinate.
:param z: Z coordinate (unused, kept for compatibility).
:param scale: Scale factor applied before conversion.
:param aswolf: If ``True``, return 1-based (Fortran) indices.
:param abs: If ``True``, account for translation.
:param which_block: 1-based block index.
:return: Tuple *(i, j)*.
"""
return self.myblocks[getkeyblock(which_block, False)].get_ij_from_xy(
x, y, z, scale, aswolf, abs)
[docs]
def get_xy_from_ij(self, i: int, j: int, which_block: int = 1,
aswolf: bool = False, abs: bool = True):
"""Return world coordinates *(x, y)* for cell indices *(i, j)*.
Delegates to the single-block :meth:`WolfArrayModel.get_xy_from_ij`
of block *which_block*.
:param i: Column index.
:param j: Row index.
:param which_block: 1-based block index.
:param aswolf: If ``True``, indices are 1-based (Fortran).
:param abs: If ``True``, account for translation.
:return: Tuple *(x, y)*.
"""
if which_block == 0:
logging.warning(
"Block index is probably 0-based. It should be 1-based.")
return
k = getkeyblock(which_block, False)
assert k in self.myblocks, \
f"The block '{k}' you ask for doesn't exist."
x, y = self.myblocks[k].get_xy_from_ij(i, j, aswolf=aswolf, abs=abs)
return x, y
[docs]
def get_value(self, x: float, y: float, abs: bool = True,
nullvalue: float = -99999,
convert_to_float: bool = True):
"""Read the value at world coordinate *(x, y)*.
Iterates through all blocks and returns the first unmasked value
found. If no block covers the coordinate, *nullvalue* is returned.
:param x: X world coordinate.
:param y: Y world coordinate.
:param abs: If ``True``, account for translation.
:param nullvalue: Value returned when no block covers *(x, y)*.
:param convert_to_float: If ``True``, cast the result to Python
``float`` before returning.
:return: Cell value, or *nullvalue* if not found.
"""
h = nullvalue
for curblock in self.myblocks.values():
curblock: WolfArrayModel
nbx = curblock.nbx
nby = curblock.nby
i, j = curblock.get_ij_from_xy(x, y, abs=abs)
if (i >= 0 and i < nbx and j >= 0 and j < nby):
if curblock.array.mask[i, j]:
h = nullvalue
else:
h = curblock.array[i, j]
break
if convert_to_float:
return float(h)
else:
return h
[docs]
def get_values_as_wolf(self, i: int, j: int, which_block: int = 1):
"""Return the value at 1-based Fortran indices *(i, j)*.
:param i: 1-based column index.
:param j: 1-based row index.
:param which_block: 1-based block index.
:return: Cell value or ``np.nan`` if out of bounds.
"""
h = np.nan
if which_block == 0:
logging.warning(
"Block index is probably 0-based. It should be 1-based.")
return h
keyblock = getkeyblock(which_block, False)
curblock = self.myblocks[keyblock]
nbx = curblock.nbx
nby = curblock.nby
if (i > 0 and i <= nbx and j > 0 and j <= nby):
h = curblock.array[i - 1, j - 1]
return h
[docs]
def get_blockij_from_xy(self, x: float, y: float, abs: bool = True):
"""Find which block covers the world coordinate *(x, y)*.
:param x: X world coordinate.
:param y: Y world coordinate.
:param abs: If ``True``, account for translation.
:return: Tuple *(i, j, k)* where *i* and *j* are the 0-based cell
indices within block *k* (1-based block index).
Returns ``(-1, -1, -1)`` if no block covers the coordinate.
"""
exists = False
k = 1
for curblock in self.myblocks.values():
curblock: WolfArrayModel
nbx = curblock.nbx
nby = curblock.nby
i, j = curblock.get_ij_from_xy(x, y, abs=abs)
if (i >= 0 and i < nbx and j >= 0 and j < nby):
if not curblock.array.mask[i, j]:
exists = True
break
k += 1
if exists:
return i, j, k
else:
return -1, -1, -1
# ========================================================================
# Operator Overloading
# ========================================================================
[docs]
def _make_result_array(self):
"""Create an empty result array of the same type.
Subclasses override to return their own type (e.g.
:class:`WolfArrayMB`).
:return: A new :class:`WolfArrayMBModel` with the same header.
"""
newArray = WolfArrayMBModel()
newArray.set_header(self.get_header())
return newArray
[docs]
def _make_block_op(self, op, curblock, other_block):
"""Apply a binary operator between two blocks (or a block and a scalar).
:param op: Operator string: ``'+'``, ``'-'``, ``'*'``, ``'/'``,
or ``'**'``.
:param curblock: Left operand (:class:`WolfArrayModel`).
:param other_block: Right operand (:class:`WolfArrayModel` or scalar).
:return: A new :class:`WolfArrayModel` with the result.
"""
if op == '+':
return curblock + other_block
elif op == '-':
return curblock - other_block
elif op == '*':
return curblock * other_block
elif op == '/':
return curblock / other_block
elif op == '**':
return curblock ** other_block
[docs]
def _apply_op(self, other, op: str):
"""Generic element-wise arithmetic on multi-block arrays.
Handles both *WolfArrayMBModel + WolfArrayMBModel* and
*WolfArrayMBModel + scalar* cases.
:param other: Another :class:`WolfArrayMBModel` (same structure) or
a ``float`` scalar.
:param op: Operator string (``'+'``, ``'-'``, etc.).
:return: A new multi-block array with the result, or ``None`` on
inconsistency.
"""
newArray = self._make_result_array()
if isinstance(self, type(other)):
if self.check_consistency(other):
i = 0
for curblock, curblockother in zip(
self.myblocks.values(), other.myblocks.values()):
newblock = self._make_block_op(op, curblock, curblockother)
newblock.isblock = True
newblock.blockindex = i
newArray.myblocks[getkeyblock(i)] = newblock
i += 1
else:
return None
elif isinstance(other, float):
if other != 0.:
i = 0
for curblock in self.myblocks.values():
newblock = self._make_block_op(op, curblock, other)
newblock.isblock = True
newblock.blockindex = i
newArray.myblocks[getkeyblock(i)] = newblock
i += 1
else:
return self
return newArray
def __add__(self, other):
"""Element-wise addition (``self + other``)."""
return self._apply_op(other, '+')
def __mul__(self, other):
"""Element-wise multiplication (``self * other``)."""
return self._apply_op(other, '*')
def __sub__(self, other):
"""Element-wise subtraction (``self - other``)."""
return self._apply_op(other, '-')
def __pow__(self, other):
"""Element-wise power (``self ** other``)."""
return self._apply_op(other, '**')
def __truediv__(self, other):
"""Element-wise division (``self / other``)."""
return self._apply_op(other, '/')
# ========================================================================
# Masking & Null Value Operations
# ========================================================================
[docs]
def mask_data(self, value):
"""Mask cells whose data equals *value* in every block.
For integer array types the value is cast to ``int`` first.
Handles ``NaN`` comparisons correctly.
:param value: The value to mask. ``None`` is a no-op.
"""
if self.wolftype in [WOLF_ARRAY_FULL_INTEGER,
WOLF_ARRAY_FULL_INTEGER16,
WOLF_ARRAY_FULL_INTEGER16_2]:
value = int(value)
if value is not None:
for curblock in self.myblocks.values():
curarray = curblock.array
if isinstance(curarray.mask, np.bool_):
# Mask is a single boolean – replace with a full array
if np.isnan(value) or math.isnan(value):
curarray.mask = np.isnan(curarray.data)
else:
curarray.mask = curarray.data == value
else:
# Copy to prevent unlinking the mask (see `mask_reset`)
if np.isnan(value) or math.isnan(value):
np.copyto(curarray.mask, np.isnan(curarray.data))
else:
np.copyto(curarray.mask, curarray.data == value)
self.count()
[docs]
def mask_reset(self):
"""Reset the mask of every block to ``False`` (all cells unmasked)."""
for i in range(self.nb_blocks):
self[i].mask_reset()
self.count()
[docs]
def mask_lower(self, value):
"""Mask cells whose value is strictly less than *value*.
:param value: Threshold value.
"""
for i in range(self.nb_blocks):
self[i].mask_lower(value)
self.count()
[docs]
def mask_lowerequal(self, value):
"""Mask cells whose value is less than or equal to *value*.
:param value: Threshold value.
"""
for i in range(self.nb_blocks):
self[i].mask_lowerequal(value)
self.count()
[docs]
def mask_greater(self, value):
"""Mask cells whose value is strictly greater than *value*.
:param value: Threshold value.
"""
for i in range(self.nb_blocks):
self[i].mask_greater(value)
self.count()
[docs]
def mask_greaterequal(self, value):
"""Mask cells whose value is greater than or equal to *value*.
:param value: Threshold value.
"""
for i in range(self.nb_blocks):
self[i].mask_greaterequal(value)
self.count()
[docs]
def mask_union(self, source: "WolfArrayMBModel"):
"""Compute the mask union (logical AND) with another multi-block array.
The intersection of masked regions is applied block by block.
:param source: Another :class:`WolfArrayMBModel` with the same
block structure.
"""
if isinstance(self, type(source)):
if self.check_consistency(source):
i = 0
for curblock, curblockother in zip(
self.myblocks.values(), source.myblocks.values()):
curblock.mask_union(curblockother)
i += 1
self.reset_plot()
[docs]
def copy_mask(self, source: "WolfArrayMBModel",
forcenullvalue: bool = False):
"""Copy the mask from *source* to *self*, block by block.
:param source: Source :class:`WolfArrayMBModel` whose mask is copied.
:param forcenullvalue: If ``True``, also set masked cells to the
null value.
"""
if isinstance(self, type(source)):
if self.check_consistency(source):
i = 0
for curblock, curblockother in zip(
self.myblocks.values(), source.myblocks.values()):
curblock.copy_mask(curblockother, forcenullvalue)
i += 1
self.reset_plot()
else:
logging.warning(
_('Copy mask not supported between different types of arrays'))
# ========================================================================
# Interpolation & Rasterization
# ========================================================================
[docs]
def interpolate_on_polygon(self, working_vector: vector,
method: Literal["nearest", "linear",
"cubic"] = "linear",
keep: Literal['all', 'below',
'above'] = 'all'):
"""Interpolate elevation values under a polygon for every block.
For each block, cells inside the polygon (or the current selection
if one exists) are filled with values interpolated from the 3-D
vertices of *working_vector* using :func:`scipy.interpolate.griddata`.
:param working_vector: Polygon whose 3-D vertices provide the
interpolation source.
:param method: Interpolation method (``'nearest'``, ``'linear'``,
or ``'cubic'``).
:param keep: Which interpolated values to keep: ``'all'``,
``'below'`` (only values lower than existing), or ``'above'``.
"""
for curblock in self.myblocks.values():
curblock.interpolate_on_polygon(working_vector, method, keep)
[docs]
def interpolate_on_polygons(self, working_zone: zone,
method: Literal["nearest", "linear",
"cubic"] = "linear",
keep: Literal['all', 'below',
'above'] = 'all'):
"""Interpolate elevation values under every polygon in a zone.
Calls :meth:`interpolate_on_polygon` for each vector in
*working_zone*.
:param working_zone: Zone containing one or more polygon vectors.
:param method: Interpolation method.
:param keep: Which interpolated values to keep.
"""
for curvector in working_zone.myvectors:
self.interpolate_on_polygon(curvector, method, keep)
[docs]
def interpolate_on_polyline(self, working_vector: vector,
usemask=True):
"""Interpolate elevation values under a polyline for every block.
Cells under the polyline are filled with values interpolated along
the 3-D vertices of *working_vector* using Shapely's
``line.interpolate()``.
:param working_vector: Polyline whose 3-D vertices provide the
interpolation source.
:param usemask: If ``True``, only consider unmasked cells.
"""
for curblock in self.myblocks.values():
curblock.interpolate_on_polyline(working_vector, usemask)
[docs]
def interpolate_on_polylines(self, working_zone: zone, usemask=True):
"""Interpolate elevation values under every polyline in a zone.
Calls :meth:`interpolate_on_polyline` for each vector in
*working_zone*.
:param working_zone: Zone containing one or more polyline vectors.
:param usemask: If ``True``, only consider unmasked cells.
"""
for curvec in working_zone.myvectors:
self.interpolate_on_polyline(curvec, usemask)
# ========================================================================
# Filtering & Labelling
# ========================================================================
[docs]
def filter_zone(self, set_null: bool = False):
"""Filter zones, keeping only those that contain selected cells.
Applied independently to every block.
:param set_null: If ``True``, set filtered-out cells to the null
value instead of just masking them.
"""
for curblock in self.myblocks.values():
curblock.filter_zone(set_null, reset_plot=False)
self.reset_plot()
[docs]
def labelling(self):
"""Label connected components in every block using SciPy.
Each block is labelled independently via
:func:`scipy.ndimage.label`.
"""
for curblock in self.myblocks.values():
curblock.labelling(reset_plot=False)
self.reset_plot()
# ========================================================================
# Conversion & Export
# ========================================================================
[docs]
def as_WolfArray(self, abs: bool = True,
forced_header: header_wolf = None) -> WolfArrayModel:
"""Convert to a single-block :class:`WolfArrayModel`.
When blocks have different resolutions, the finer blocks are
preserved and coarser blocks are rebinned to match the finest
resolution.
:param abs: If ``True``, use absolute coordinates (translation
included).
:param forced_header: If provided, override the target header
instead of computing it from blocks.
:return: A new single-block :class:`WolfArrayModel`.
"""
newArray = WolfArrayModel()
if forced_header is None:
myhead = self.get_header(abs=abs)
else:
myhead = forced_header
myhead.wolftype = self.wolftype
dx = set([curblock.get_header().dx
for curblock in iter(self.myblocks.values())])
dy = set([curblock.get_header().dy
for curblock in iter(self.myblocks.values())])
if len(dx) == 1 and len(dy) == 1:
# ---- Uniform resolution ----
newArray.dx = list(dx)[0]
newArray.dy = list(dy)[0]
newArray.origx = myhead.origx
newArray.origy = myhead.origy
newArray.nbx = int((myhead.nbx * myhead.dx) // newArray.dx)
newArray.nby = int((myhead.nby * myhead.dy) // newArray.dy)
newArray.translx = myhead.translx
newArray.transly = myhead.transly
newArray.wolftype = (WOLF_ARRAY_FULL_SINGLE
if myhead.wolftype == WOLF_ARRAY_MB_SINGLE
else WOLF_ARRAY_FULL_INTEGER)
newArray.allocate_ressources()
newArray.array[:, :] = 0
for curblock in self.myblocks.values():
ij = np.where(~curblock.array.mask)
if len(ij[0]) > 0:
i = ij[0]
j = ij[1]
x0, y0 = curblock.get_xy_from_ij(0, 0, abs=True)
i0, j0 = newArray.get_ij_from_xy(x0, y0, abs=True)
i_dest = i + i0
j_dest = j + j0
newArray.array[i_dest, j_dest] = curblock.array[i, j]
newArray.array.mask[i_dest, j_dest] = False
else:
logging.debug(
f"Block {curblock.idx} is empty or totally masked.")
else:
# ---- Multiple resolutions – rebin to finest ----
dx = sorted(list(dx))
dy = sorted(list(dy))
newArray.dx = dx[0]
newArray.dy = dy[0]
newArray.origx = myhead.origx
newArray.origy = myhead.origy
newArray.nbx = int((myhead.nbx * myhead.dx) // newArray.dx)
newArray.nby = int((myhead.nby * myhead.dy) // newArray.dy)
newArray.translx = myhead.translx
newArray.transly = myhead.transly
newArray.wolftype = (WOLF_ARRAY_FULL_SINGLE
if myhead.wolftype == WOLF_ARRAY_MB_SINGLE
else WOLF_ARRAY_FULL_INTEGER)
newArray.allocate_ressources()
newArray.array[:, :] = 0
for curblock in self.myblocks.values():
if curblock.dx == dx[0] and curblock.dy == dy[0]:
blockArray = curblock
else:
factor = dx[0] / curblock.dx
blockArray = WolfArrayModel(mold=curblock)
blockArray.rebin(factor)
ij = np.where(~blockArray.array.mask)
if len(ij[0]) > 0:
i = ij[0]
j = ij[1]
x0, y0 = blockArray.get_xy_from_ij(0, 0, abs=True)
i0, j0 = newArray.get_ij_from_xy(x0, y0, abs=True)
i_dest = i + i0
j_dest = j + j0
newArray.array[i_dest, j_dest] = blockArray.array[i, j]
newArray.array.mask[i_dest, j_dest] = False
else:
logging.debug(
f"Block {curblock.idx} is empty or totally masked.")
return newArray