"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024-2026
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
try:
from osgeo import gdal, osr
gdal.UseExceptions()
except ImportError as e:
print(e)
raise Exception(_('Error importing GDAL library'))
import os
import sys
import warnings
import numpy as np
import numpy.ma as ma
from typing import Union, Literal, TYPE_CHECKING
from matplotlib.axis import Axis
from matplotlib.figure import Figure
import matplotlib.pyplot as plt
from matplotlib.colors import Colormap
import logging
import json
import tempfile
from pathlib import Path
from tqdm import tqdm
import matplotlib.pyplot as plt
import matplotlib.path as mpltPath
from scipy.interpolate import interp2d, griddata
from scipy.ndimage import laplace, label, sum_labels
from shapely.geometry import Point, LineString, MultiLineString, Polygon, MultiPolygon, MultiPoint
from shapely.ops import linemerge, substring, polygonize_full
from shapely import contains, contains_properly, contains_xy, touches, prepare, destroy_prepared, is_prepared
from os.path import dirname,basename,join
import logging
from typing import Literal
from enum import Enum
try:
from ..PyTranslate import _
except ImportError as e:
print(e)
raise Exception(_('Error importing modules'))
try:
from ..Coordinates_operations import reproject_and_resample_raster
except ImportError as e:
print(e)
raise Exception(_('Error importing modules'))
try:
from ..xyz_file import XYZFile
from ..PyPalette import wolfpalette
from ..PyVertexvectors import Zones, vector, wolfvertex, zone, Triangulation
from ..PyVertex import cloud_vertices
except ImportError as e:
print(e)
raise Exception(_('Error importing modules'))
try:
from ..pydownloader import download_file, toys_dataset
except ImportError as e:
raise Exception(_('Error importing pydownloader module'))
# try:
# from wolfhece_tools.wolf_array import header_wolf
# except ImportError as e:
# logging.error(e)
# raise Exception(_('Error importing modules wolfhece_tools - please install it via pip install wolfhece_tools'))
import math
from fractions import Fraction
[docs]
def pgcd_decimal(a, b):
# Convertir les décimaux en fractions exactes
fa = Fraction(a).limit_denominator()
fb = Fraction(b).limit_denominator()
# Extraire les numérateurs et dénominateurs
num_a, den_a = fa.numerator, fa.denominator
num_b, den_b = fb.numerator, fb.denominator
# Mettre au même dénominateur
lcm_den = math.lcm(den_a, den_b)
a_int = num_a * (lcm_den // den_a)
b_int = num_b * (lcm_den // den_b)
# PGCD des entiers
pgcd_int = math.gcd(a_int, b_int)
# Revenir à l’échelle décimale
return pgcd_int / lcm_den
# Re-exported from wolfhece.header_wolf for backward compatibility
from ._header_wolf import (
header_wolf,
getkeyblock,
decodekeyblock,
WOLF_ARRAY_HILLSHAPE,
WOLF_ARRAY_FULL_SINGLE,
WOLF_ARRAY_FULL_DOUBLE,
WOLF_ARRAY_SYM_DOUBLE,
WOLF_ARRAY_FULL_LOGICAL,
WOLF_ARRAY_CSR_DOUBLE,
WOLF_ARRAY_FULL_INTEGER,
WOLF_ARRAY_FULL_SINGLE_3D,
WOLF_ARRAY_FULL_INTEGER8,
WOLF_ARRAY_FULL_UINTEGER8,
WOLF_ARRAY_MB_SINGLE,
WOLF_ARRAY_MB_INTEGER,
WOLF_ARRAY_FULL_INTEGER16_2,
WOLF_ARRAY_FULL_INTEGER16,
WOLF_ARRAY_MNAP_INTEGER,
WOLF_ARRAY_MB,
WOLF_ARRAY_FULL_UINTEGER16,
WOLF_ARRAY_FULL_UINTEGER32
)
[docs]
class OGLRenderer(Enum):
@classmethod
[docs]
def get(cls, value:int):
""" Return the OGLRenderer value corresponding to an integer value """
for e in cls:
if e.value == value:
return e
[docs]
DEFAULT_OGLRENDERER = OGLRenderer.SHADER
from numba import jit
@jit(nopython=True)
[docs]
def custom_gradient(array: np.ndarray):
""" Calculate the gradient manually """
grad_x = np.zeros_like(array)
grad_y = np.zeros_like(array)
for i in range(1, array.shape[0] - 1):
for j in range(1, array.shape[1] - 1):
grad_x[i, j] = (array[i + 1, j] - array[i - 1, j]) / 2.0
grad_y[i, j] = (array[i, j + 1] - array[i, j - 1]) / 2.0
return grad_x, grad_y
@jit(nopython=True)
[docs]
def hillshade(array:np.ndarray, azimuth:float, angle_altitude:float) -> np.ndarray:
""" Create a hillshade array """
azimuth = 360.0 - azimuth
x, y = custom_gradient(array)
slope = np.pi / 2. - np.arctan(np.sqrt(x * x + y * y))
aspect = np.arctan2(-x, y)
azimuthrad = azimuth * np.pi / 180.
altituderad = angle_altitude * np.pi / 180.
shaded = np.sin(altituderad) * np.sin(slope) + np.cos(altituderad) * \
np.cos(slope) * np.cos((azimuthrad - np.pi / 2.) - aspect)
shaded += 1.
shaded *= .5
return shaded.astype(np.float32)
# Re-exported from wolfhece.ui.wolf_array_ui for backward compatibility
# Lazy-loaded in methods that need them to avoid wx dependency at module level
# Direct imports available from wolfhece.wolf_array.__init__
[docs]
class Rebin_Ops(Enum):
"""Enum for rebin/downsampling operations (no wx dependency)."""
@classmethod
[docs]
def get_numpy_ops(cls):
""" Return a list of numpy functions corresponding to the enum values """
# CAUTION : Order is important and must match the enum values
return [np.ma.min, np.ma.mean, np.ma.max, np.ma.sum, np.ma.median]
@classmethod
[docs]
def get_ops(cls, name:str):
""" Return the numpy function corresponding to a string """
if isinstance(name, Rebin_Ops):
return cls.get_numpy_ops()[name.value]
elif isinstance(name, str):
if name == 'min':
return np.ma.min
elif name == 'mean':
return np.ma.mean
elif name == 'max':
return np.ma.max
elif name == 'sum':
return np.ma.sum
elif name == 'median':
return np.ma.median
else:
return None
else:
return None
# Re-exported from wolfhece.selection_data for backward compatibility
from ._selection_data import (
ALL_SELECTED,
StorageMode,
SelectionData,
SelectionDataMB,
)
if TYPE_CHECKING:
from ..ui.wolf_array_ops import Ops_Array
from ..opengl.py3d import WolfArrayPlotShader
from ..opengl.wolf_array_shader2d import WolfArrayShader2D
[docs]
class WolfArrayModel(header_wolf):
"""
Data-only WolfArrayModel class (no GUI/OpenGL dependency at class level).
For GUI support (OpenGL rendering, wx dialogs), use WolfArray from _base_gui.py.
simple précision, double précision, entier...
"""
[docs]
_cache_grid: dict # For OpenGL
[docs]
linkedvec: vector # used in some operations
[docs]
linkedarrays: list["WolfArrayModel"] # used in some operations
# Origin and translation of the coordinate-system
# in which the array-data coordinates are expressed.
# ========================================================================
# Constructor, Destructor & Serialization
# ========================================================================
def __init__(self,
fname:str = None,
mold:"WolfArrayModel" = None,
masknull:bool = True,
crop:list[list[float],list[float]]=None,
whichtype = WOLF_ARRAY_FULL_SINGLE,
preload:bool = True,
nullvalue:float = 0.,
srcheader:header_wolf = None,
idx:str = '',
mask_source:np.ndarray = None,
np_source:np.ndarray = None,
) -> None:
"""
Constructor of the WolfArrayModel class
:param fname: filename/filepath - if provided, the file will be read on disk
:param mold: initialize from a copy a the mold object --> must be a WolfArrayModel if not None - Do not impose the type of the array. The source array will be converted if necessary. Change "whichtype" to impose a type.
:param masknull: mask data based on the nullvalue
:param crop: crop data based on the spatial extent [[xmin, xmax], [ymin,ymax]]
:param whichtype: type of the numpy array (float32 as default)
:param preload: True = load data during initialization ; False = waits for the display to be required
:param nullvalue: null value used to mask data
:param srcheader: initialize dimension from header_wolf instance
:param idx: indentity --> required by the mapviewer
:param mask_source: mask to link to the data
:param np_source: numpy array to link to the data
"""
header_wolf.__init__(self)
# Defaults for attributes typically set by Element_To_Draw.
# In WolfArray (GUI subclass), Element_To_Draw.__init__ is called first,
# so these attributes already exist and the hasattr checks skip them.
if not hasattr(self, 'mapviewer'):
self.mapviewer = None
if not hasattr(self, 'wx_exists'):
self.wx_exists = False
if not hasattr(self, 'idx'):
self.idx = idx
if not hasattr(self, 'plotted'):
self.plotted = False
if not hasattr(self, 'plotting'):
self.plotting = False
self.myops = None
[docs]
self._rendering_machine = DEFAULT_OGLRENDERER
[docs]
self.mngselection = None
[docs]
self._active_blocks = None
self.array:ma.masked_array = None # numpy masked array to stored numerical data
self.linkedvec = None
self.linkedarrays = []
[docs]
self.wolftype = whichtype
[docs]
self.masknull = masknull
if VERSION_RGB==1 : self.rgb = None # numpy array with colorize values
[docs]
self.alpha = 1. # transparency alpha value
[docs]
self.shading = False # if True, rgb will be shaded - Not available in SHADER mode
[docs]
self.azimuthhill = 315. # sun position - azimuth
[docs]
self.altitudehill = 0. # sun position - altitude
[docs]
self._nullvalue = nullvalue
[docs]
self.nbnotnull = 99999 # number of non-null values in the entire aray
[docs]
self.nbnotnullzoom = 99999 # number of non-null values in the current visible part in mapviwer
[docs]
self._gridsize = 256 # virtual grid for plotting operations
[docs]
self._lod_max_whole_array = -1 # maximum scale used
# colormap
[docs]
self.mypal = wolfpalette(None, "Palette of colors")
self.mypal.default16()
self.mypal.automatic = True
self._cache_grid = {}
[docs]
self._cache_grid_last_call = None
[docs]
self._cache_grid_frequency_useconds = 1000
[docs]
self._shader_2d: "WolfArrayShader2D | None" = None
[docs]
self.viewers3d:list["WolfArrayPlotShader"] = []
if isinstance(srcheader, header_wolf):
header=srcheader
self.origx = header.origx
self.origy = header.origy
self.origz = header.origz
self.translx = header.translx
self.transly = header.transly
self.translz = header.translz
self.dx = header.dx
self.dy = header.dy
self.dz = header.dz
self.nbx = int(header.nbx)
self.nby = int(header.nby)
self.nbz = int(header.nbz)
self.head_blocks = header.head_blocks.copy()
if self.nb_blocks>0:
self.myblocks = {}
if np_source is None:
self.allocate_ressources()
else:
assert np_source.shape == (self.nbx, self.nby), _('Shape of np_source is not compatible with header')
if self.dtype != np_source.dtype:
logging.warning(_(f"dtype of np_source is not compatible with header -- Conversion will be done to match wolf header's type {self.dtype}"))
np_source = np_source.astype(self.dtype)
self.array = ma.MaskedArray(np_source, mask= np_source[:,:] == self.nullvalue if self.masknull else np.zeros(np_source.shape, dtype=bool), copy=False, order='C')
# # FIXME Why not initialize with nullvalue ?
# self.array = ma.MaskedArray(np.ones((self.nbx, self.nby), order='F', dtype=self.dtype))
if fname is not None:
# check if fname is an url
if str(fname).startswith('http:') or str(fname).startswith('https:'):
try:
fname = download_file(fname)
except Exception as e:
logging.error(_('Error while downloading file: %s') % e)
return
self.filename = str(fname)
logging.debug(_('Loading file : %s') % self.filename)
self.read_all()
if mask_source is not None:
logging.debug(_('Applying mask from source'))
self.copy_mask_log(mask_source)
logging.debug(_('Data masked'))
elif masknull and (self.preload or self.loaded):
logging.debug(_('Masking data with nullvalue'))
self.mask_data(self.nullvalue)
logging.debug(_('Data masked'))
elif mold is not None:
if self.cropini is None:
self.nbx = mold.nbx
self.nby = mold.nby
self.nbz = mold.nbz
self.dx = mold.dx
self.dy = mold.dy
self.dz = mold.dz
self.origx = mold.origx
self.origy = mold.origy
self.origz = mold.origz
self.translx = mold.translx
self.transly = mold.transly
self.translz = mold.translz
self.array = ma.copy(mold.array.astype(self.dtype))
if idx=='':
self.idx = mold.idx
else:
imin, jmin = mold.get_ij_from_xy(self.cropini[0][0], self.cropini[1][0])
imax, jmax = mold.get_ij_from_xy(self.cropini[0][1], self.cropini[1][1])
imin = int(imin)
jmin = int(jmin)
imax = int(imax)
jmax = int(jmax)
self.nbx = imax - imin
self.nby = jmax - jmin
self.dx = mold.dx
self.dy = mold.dy
self.origx, self.origy = mold.get_xy_from_ij(imin, jmin)
self.origx -= self.dx / 2.
self.origy -= self.dy / 2.
self.translx = mold.translx
self.transly = mold.transly
if idx=='':
self.idx = mold.idx
self.array = ma.copy(mold.array[imin:imax, jmin:jmax].astype(self.dtype))
# Selection manager (without GUI)
if self.mngselection is None:
from ._selection_data import SelectionData, SelectionDataMB
if self.nb_blocks > 0:
self.mngselection = SelectionDataMB(self)
else:
self.mngselection = SelectionData(self)
def __del__(self):
""" Destructeur de la classe """
try:
# Perform cleanup tasks safely
self.delete_lists()
if hasattr(self, 'array'):
del self.array
if VERSION_RGB == 1 and hasattr(self, 'rgb'):
del self.rgb
if hasattr(self, '_array3d'):
del self._array3d
if hasattr(self, '_array2d'):
del self._array2d
if hasattr(self, 'mypal'):
del self.mypal
if hasattr(self, 'shaded'):
del self.shaded
try:
if sys.meta_path is not None:
# Perform garbage collection if gc is available
import gc
gc.collect()
except Exception:
# Try/except to avoid issues during interpreter shutdown
pass
except Exception as e:
print(f"Exception in WolfArrayModel destructor: {e} -- Please report this issue")
def __setstate__(self, state):
"""Restore the object state during deserialization (unpickling).
:param state: dictionary of object attributes for deserialization
"""
self.__dict__.update(state)
# ========================================================================
# Properties
# ========================================================================
@property
[docs]
def memory_usage(self):
"""
Return the memory usage of the header
"""
if self.nbz == 0:
size = self.nbx * self.nby
else:
size = self.nbx * self.nby * self.nbz
if self.wolftype == WOLF_ARRAY_FULL_SINGLE:
return size * 4
elif self.wolftype == WOLF_ARRAY_FULL_DOUBLE:
return size * 8
elif self.wolftype == WOLF_ARRAY_FULL_INTEGER:
return size * 4
elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]:
return size * 2
elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]:
return size
else:
return size * 4
@property
[docs]
def memory_usage_mask(self):
"""
Return the memory usage of the mask
"""
if self.nbz == 0:
size = self.nbx * self.nby
else:
size = self.nbx * self.nby * self.nbz
return size * 1
@property
[docs]
def nullvalue(self) -> float:
""" Return the null value """
return self._nullvalue
@nullvalue.setter
def nullvalue(self, value:float):
"""Set the null value
:param value: new null (no-data) value
"""
self._nullvalue = value
@property
[docs]
def nodata(self) -> float:
""" alias for nullvalue """
return self._nullvalue
@nodata.setter
def nodata(self, value:float):
"""alias for nullvalue
:param value: new null (no-data) value (alias for :attr:`nullvalue`)
"""
self._nullvalue = value
@property
[docs]
def SelectionData(self) -> SelectionData:
""" Return the data of the selection """
return self.mngselection
@property
[docs]
def active_blocks(self) -> list["WolfArrayModel"]:
""" Return the active blocks """
if self.nb_blocks>0 and self._active_blocks is not None:
if isinstance(self._active_blocks, list):
return [self.myblocks[cur] for cur in self._active_blocks]
elif self._active_blocks == 0:
return [k for k in self.myblocks.values()]
elif self._active_blocks in self.myblocks:
return [self.myblocks[self._active_blocks]]
else:
return None
else:
return [self]
@active_blocks.setter
def active_blocks(self, value:Union[str, int, list[int]]):
"""
Set the active blocks
:param value: name of the block or index 1-based or list of index 1-based
"""
if isinstance(value, str):
if value in self.myblocks:
self._active_blocks = value
logging.info(_(f'Block found - {value}'))
else:
self._active_blocks = None
logging.info(_('Block not found'))
elif isinstance(value, int):
if value == 0:
self._active_blocks = 0
logging.info(_('All blocks selected'))
else:
value = getkeyblock(value, addone=False)
if value in self.myblocks:
self._active_blocks = value
logging.info(_(f'Block found - {value}'))
else:
self._active_blocks = None
logging.info(_('Block not found'))
elif isinstance(value, list):
if 0 in value:
self._active_blocks = 0
logging.info(_('All blocks selected'))
else:
value = [getkeyblock(cur, addone=False) for cur in value]
value = [cur for cur in value if cur in self.myblocks]
if len(value)>0:
self._active_blocks = value
logging.info(_('List of blocks selected'))
else:
self._active_blocks = None
logging.info(_('No block found'))
else:
logging.error(_('Unknown type for active_blocks'))
@property
[docs]
def dtype(self):
"""
Return the numpy dtype corresponding to the WOLF type
Pay ettention to the difference between :
- LOGICAL : Fortran and VB6
- Bool : Python
In VB6, logical is stored as int16
In Fortran, there are Logical*1, Logical*2, Logical*4, Logical*8
In Python, bool is one byte
In Numpy, np.bool_ is one byte
"""
if self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_SYM_DOUBLE, WOLF_ARRAY_CSR_DOUBLE]:
dtype = np.float64
elif self.wolftype in [WOLF_ARRAY_FULL_SINGLE, WOLF_ARRAY_FULL_SINGLE_3D, WOLF_ARRAY_MB_SINGLE]:
dtype = np.float32
elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_MB_INTEGER, WOLF_ARRAY_MNAP_INTEGER]:
dtype = np.int32
elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]:
dtype = np.int16
elif self.wolftype == WOLF_ARRAY_FULL_INTEGER8:
dtype = np.int8
elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER8:
dtype = np.uint8
elif self.wolftype == WOLF_ARRAY_FULL_LOGICAL:
dtype = np.int16
elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER16:
dtype = np.uint16
elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER32:
dtype = np.uint32
return dtype
@property
[docs]
def dtype_gdal(self):
""" Return the GDAL dtype corresponding to the WOLF type """
if self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_SYM_DOUBLE, WOLF_ARRAY_CSR_DOUBLE]:
dtype = gdal.GDT_Float64
elif self.wolftype in [WOLF_ARRAY_FULL_SINGLE, WOLF_ARRAY_FULL_SINGLE_3D, WOLF_ARRAY_MB_SINGLE]:
dtype = gdal.GDT_Float32
elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_MB_INTEGER, WOLF_ARRAY_MNAP_INTEGER]:
dtype = gdal.GDT_Int32
elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]:
dtype = gdal.GDT_Int16
elif self.wolftype == WOLF_ARRAY_FULL_INTEGER8:
dtype = gdal.GDT_Byte
elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER8:
dtype = gdal.GDT_Byte
elif self.wolftype == WOLF_ARRAY_FULL_LOGICAL:
dtype = gdal.GDT_Int16
elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER16:
dtype = gdal.GDT_UInt16
elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER32:
dtype = gdal.GDT_UInt32
return dtype
@property
[docs]
def dtype_str(self):
"""
Return the numpy dtype corresponding to the WOLF type, as a string
Pay ettention to the difference between :
- LOGICAL : Fortran and VB6
- Bool : Python
In VB6, logical is stored as int16
In Fortran, there are Logical*1, Logical*2, Logical*4, Logical*8
In Python, bool is one byte
In Numpy, np.bool_ is one byte
"""
if self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_SYM_DOUBLE, WOLF_ARRAY_CSR_DOUBLE]:
dtype = _('float64 - 8 bytes per values')
elif self.wolftype in [WOLF_ARRAY_FULL_SINGLE, WOLF_ARRAY_FULL_SINGLE_3D, WOLF_ARRAY_MB_SINGLE]:
dtype = _('float32 - 4 bytes per values')
elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_MB_INTEGER, WOLF_ARRAY_MNAP_INTEGER]:
dtype = _('int32 - 4 bytes per values')
elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]:
dtype = _('int16 - 2 bytes per values')
elif self.wolftype == WOLF_ARRAY_FULL_INTEGER8:
dtype = _('int8 - 1 byte per values')
elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER8:
dtype = _('uint8 - 1 byte per values')
elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER16:
dtype = _('uint16 - 2 bytes per values')
elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER32:
dtype = _('uint32 - 4 bytes per values')
elif self.wolftype == WOLF_ARRAY_FULL_LOGICAL:
dtype = _('int16 - 2 bytes per values')
return dtype
@property
[docs]
def zmin(self):
""" Return the minimum value of the masked array """
return np.ma.min(self.array)
@property
[docs]
def zmax(self):
""" Return the maximum value of the masked array """
return np.ma.max(self.array)
@property
[docs]
def zmin_global(self):
""" Return the minimum value of the array -- all data (masked or not) """
return np.min(self.array.data)
@property
[docs]
def zmax_global(self):
""" Return the maximum value of the array -- all data (masked or not) """
return np.max(self.array.data)
# ========================================================================
# Mapviewer & GUI Hooks
# ========================================================================
[docs]
def get_mapviewer(self):
"""Return the mapviewer"""
return self.mapviewer
[docs]
def set_mapviewer(self, newmapviewer=None):
"""Attach a (new) mapviewer to the object
:param newmapviewer: new mapviewer instance to attach (``None`` to detach)
"""
self.mapviewer = newmapviewer
# --- Hook methods for GUI interaction ---
# These are no-ops in data-only mode; overridden in WolfArray (GUI).
[docs]
def _prompt_crop(self, dx:float, dy:float) -> list | None:
"""Prompt user for crop bounds. Returns [[xmin,xmax],[ymin,ymax]] or None.
In data-only mode, returns None (no interaction possible).
Overridden in WolfArray to show CropDialog.
:param dx: cell size in x direction (used for pre-filling the dialog)
:param dy: cell size in y direction (used for pre-filling the dialog)
"""
logging.error(_('Crop bounds required but no GUI available'))
return None
[docs]
def _prompt_band_selection(self, band_names:list[str]) -> int | None:
"""Prompt user for band selection. Returns 1-based band index or None.
In data-only mode, returns 1 (first band).
Overridden in WolfArray to show SingleChoiceDialog.
:param band_names: list of band name strings to choose from
"""
return 1
[docs]
def _prompt_save_file(self, message:str, wildcard:str) -> str | None:
"""Prompt user for a file path to save. Returns path or None.
In data-only mode, returns None.
Overridden in WolfArray to show FileDialog.
:param message: dialog title / prompt message
:param wildcard: file filter pattern (e.g. ``"TIFF files (*.tif)|*.tif"``)
"""
return None
[docs]
def _show_error_dialog(self, message:str):
"""Show an error dialog to the user.
In data-only mode, just logs.
Overridden in WolfArray to show wx.MessageDialog.
:param message: error message to display or log
"""
logging.error(message)
[docs]
def reset_plot(self, whichpal=0, mimic=True):
"""Reset plot - no-op in data-only mode; overridden in WolfArray (GUI)
:param whichpal: palette index (default: 0)
:param mimic: if ``True``, propagate to linked arrays
"""
self.count()
[docs]
def updatepalette(self, which:int=0, onzoom=[]):
"""Update palette - minimal version without OGL; overridden in WolfArray (GUI)
:param which: palette index (default: 0)
:param onzoom: zoom window ``[xmin, xmax, ymin, ymax]``; empty list for whole array
"""
if self.array is None:
return
if self.mypal.automatic:
if onzoom != []:
self.mypal.isopop(self.get_working_array(onzoom), self.nbnotnullzoom)
else:
self.mypal.isopop(self.get_working_array(), self.nbnotnull)
[docs]
def delete_lists(self):
"""Delete OpenGL lists - no-op in data-only mode; overridden in WolfArray (GUI)"""
pass
[docs]
def find_minmax(self, update=False):
"""Find min/max spatial extent
:param update: if ``True``, recalculate spatial bounds from array dimensions
"""
if update:
[self.xmin, self.xmax], [self.ymin, self.ymax] = self.get_bounds()
# ========================================================================
# Memory Allocation, Header & Initialization
# ========================================================================
[docs]
def _new_like(self) -> "WolfArrayModel":
"""Create a new empty instance of the same concrete type, copying grid geometry but not array data.
Uses ``type(self)(...)`` so that subclasses (e.g. ``WolfArray``)
return an instance of the subclass without needing to override
this method. Override in a subclass only if extra initialisation
is required (e.g. attaching a *mapviewer*).
:return: new instance with the same spatial properties and wolftype
:rtype: same type as ``self``
"""
obj = type(self)(whichtype=self.wolftype)
obj.nbx = self.nbx
obj.nby = self.nby
obj.dx = self.dx
obj.dy = self.dy
obj.origx = self.origx
obj.origy = self.origy
obj.origz = self.origz
obj.translx = self.translx
obj.transly = self.transly
obj.translz = self.translz
if self.nbdims == 3:
obj.nbz = self.nbz
obj.dz = self.dz
return obj
[docs]
def allocate_ressources(self):
""" Allocate the numpy masked array according to the current dtype and dimensions. """
if self.nbdims == 2:
self.array = ma.ones([self.nbx, self.nby], order='F', dtype=self.dtype)
elif self.nbdims == 3:
self.array = ma.ones([self.nbx, self.nby, self.nbz], order='F', dtype=self.dtype)
self.mask_reset()
# *************************************************************************************************************************
# END POSITION and VALUES associated to a vector/polygon/polyline
# *************************************************************************************************************************
[docs]
def reset(self):
""" Reset the array to nullvalue """
if self.nbdims == 2:
self.array[:, :] = self.nullvalue
elif self.nbdims == 3:
self.array[:, :, :] = self.nullvalue
[docs]
def set_array_from_numpy(self, array:np.ndarray, nullvalue:float = None):
"""
Set array from numpy array
:param array: numpy array with shape ``(nbx, nby)``
:param nullvalue: null value to apply and mask; if ``None``, use current :attr:`nullvalue`
"""
if array.shape != (self.nbx, self.nby):
logging.warning(f"Array shape {array.shape} is not compatible with WolfArrayModel shape {self.nbx, self.nby}")
return
def wolftype_from_npz(curarray:np.ndarray):
if curarray.dtype == np.float64:
return WOLF_ARRAY_FULL_DOUBLE
elif curarray.dtype == np.float32:
return WOLF_ARRAY_FULL_SINGLE
elif curarray.dtype == np.int32:
return WOLF_ARRAY_FULL_INTEGER
elif curarray.dtype == np.int8:
return WOLF_ARRAY_FULL_INTEGER8
elif curarray.dtype == np.uint8:
return WOLF_ARRAY_FULL_UINTEGER8
elif curarray.dtype == np.int16:
return WOLF_ARRAY_FULL_INTEGER16
elif curarray.dtype == np.uint16:
return WOLF_ARRAY_FULL_UINTEGER16
elif curarray.dtype == np.uint32:
return WOLF_ARRAY_FULL_UINTEGER32
self.array = np.ma.array(array.copy())
self.wolftype = wolftype_from_npz(array)
if nullvalue is not None:
self.nullvalue = nullvalue
self.mask_data(self.nullvalue)
self.reset_plot()
# ========================================================================
# File I/O
# ========================================================================
[docs]
def read_all(self, which_band = None):
"""Read the array from disk, auto-detecting the file format (WOLF binary, GeoTIFF, VRT, NumPy).
:param which_band: band index (1-based) for multi-band rasters (GeoTIFF, VRT)
"""
THRESHOLD = 100_000_000
if not os.path.exists(self.filename):
self._show_error_dialog(_('No data file : ')+self.filename)
return
def check_threshold(nbx, nby, THRESHOLD) -> bool:
if nbx * nby > THRESHOLD:
logging.info(_('The array is very large > 100M pixels'))
logging.info(_('Preloading is not recommended for efficiency reasons'))
logging.info(_('Maybe could you crop the array to a smaller size'))
logging.info(_('Disabling automatic colormap update'))
self.mypal.automatic = False
return True
else:
return False
if self.filename.endswith('.tif') or self.filename.endswith('.tiff'):
self.read_txt_header()
if self.preload:
update_min_max = check_threshold(self.nbx, self.nby, THRESHOLD)
self.import_geotif(which= which_band, crop = self.cropini)
self.loaded = True
if update_min_max:
self.mypal.distribute_values(self.array.min(), self.array.max())
elif self.filename.endswith('.npy'):
self.read_txt_header()
if self.preload:
update_min_max = check_threshold(self.nbx, self.nby, THRESHOLD)
self._import_npy(crop = self.cropini)
self.loaded = True
if update_min_max:
self.mypal.distribute_values(self.array.min(), self.array.max())
elif self.filename.endswith('.vrt'):
self.read_txt_header()
if self.preload:
update_min_max = check_threshold(self.nbx, self.nby, THRESHOLD)
self.import_vrt(which= which_band, crop = self.cropini)
self.loaded = True
if update_min_max:
self.mypal.distribute_values(self.array.min(), self.array.max())
else:
self.read_txt_header()
if self.nb_blocks > 0:
# At this point, we have the header, we know the number of blocks, if exists
self.myblocks = {}
if self.preload:
update_min_max = check_threshold(self.nbx, self.nby, THRESHOLD)
self.read_data()
self.loaded = True
if update_min_max:
self.mypal.distribute_values(self.array.min(), self.array.max())
[docs]
def read_data(self):
""" Read binary array data from the current file, with optional cropping. """
if not os.path.exists(self.filename):
logging.warning(_('No data file : ')+self.filename)
return
if self.cropini is None:
with open(self.filename, 'rb') as f:
self._read_binary_data(f)
else:
tmpdx = self.dx
if type(self.cropini) is np.ndarray:
pass
elif type(self.cropini) is list:
pass
else:
# cropini is not a valid list/array - ask user via hook
crop_result = self._prompt_crop(self.dx, self.dy)
if crop_result is None:
logging.warning(_('No crop defined'))
logging.info(_('Abort reading data !'))
return
self.cropini = crop_result
self.cropini = [[self.cropini[0][0] + self.dx/2., self.cropini[0][1] + self.dx/2.],
[self.cropini[1][0] + self.dy/2., self.cropini[1][1] + self.dy/2.]]
with open(self.filename, 'rb') as f:
if self.wolftype == WOLF_ARRAY_FULL_SINGLE or self.wolftype == WOLF_ARRAY_FULL_SINGLE_3D:
imin, jmin = self.get_ij_from_xy(self.cropini[0][0], self.cropini[1][0])
imax, jmax = self.get_ij_from_xy(self.cropini[0][1], self.cropini[1][1])
imin = int(imin)
jmin = int(jmin)
imax = int(imax)
jmax = int(jmax)
oldnbx = self.nbx
oldnby = self.nby
self.nbx = imax - imin
self.nby = jmax - jmin
self.origx, self.origy = self.get_xy_from_ij(imin, jmin)
self.origx -= self.dx / 2.
self.origy -= self.dy / 2.
locarray = np.zeros([self.nbx, self.nby])
# on boucle sur les 'j'
nbi = imax - imin
if self.filename.endswith('.flt'):
f.seek(((oldnby - jmax) * oldnbx + imin) * 4)
else:
f.seek((imin + jmin * oldnbx) * 4)
for j in range(jmin, jmax):
locarray[0:imax - imin, j - jmin] = np.frombuffer(f.read(4 * nbi), dtype=np.float32)
f.seek((oldnbx - nbi) * 4, 1)
self.array = ma.masked_array(locarray, dtype=np.float32)
else:
logging.warning(_('Unsupported wolftype for cropping'))
return
if self.filename.endswith('.flt'):
# fichier .flt --> miroir "horizontal"
self.array = np.fliplr(self.array)
if self.dx != tmpdx:
self.rebin(tmpdx / self.dx)
self.loaded = True
[docs]
def _read_binary_data(self, f, seek=0):
"""Read binary data from file
:param f: open file handle in binary read mode
:param seek: byte offset to seek to before reading (0 = read from current position)
"""
if seek > 0:
f.seek(0)
if self.wolftype == WOLF_ARRAY_FULL_SINGLE or self.wolftype == WOLF_ARRAY_FULL_SINGLE_3D:
locarray = np.frombuffer(f.read(self.nbx * self.nby * 4), dtype=np.float32)
self.array = ma.masked_array(locarray.copy(), dtype=np.float32)
elif self.wolftype == WOLF_ARRAY_FULL_LOGICAL:
locarray = np.frombuffer(f.read(self.nbx * self.nby * 2), dtype=np.int16)
self.array = ma.masked_array(locarray.copy(), dtype=np.int16)
elif self.wolftype == WOLF_ARRAY_FULL_DOUBLE:
locarray = np.frombuffer(f.read(self.nbx * self.nby * 8), dtype=np.float64)
self.array = ma.masked_array(locarray.copy(), dtype=np.float64)
elif self.wolftype == WOLF_ARRAY_FULL_INTEGER:
locarray = np.frombuffer(f.read(self.nbx * self.nby * 4), dtype=np.int32)
self.array = ma.masked_array(locarray.copy(), dtype=np.int32)
elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]:
locarray = np.frombuffer(f.read(self.nbx * self.nby * 2), dtype=np.int16)
self.array = ma.masked_array(locarray.copy(), dtype=np.int16)
elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER8]:
locarray = np.frombuffer(f.read(self.nbx * self.nby * 2), dtype=np.int8)
self.array = ma.masked_array(locarray.copy(), dtype=np.int8)
elif self.wolftype in [WOLF_ARRAY_FULL_UINTEGER8]:
locarray = np.frombuffer(f.read(self.nbx * self.nby * 2), dtype=np.uint8)
self.array = ma.masked_array(locarray.copy(), dtype=np.uint8)
elif self.wolftype in [WOLF_ARRAY_FULL_UINTEGER16]:
locarray = np.frombuffer(f.read(self.nbx * self.nby * 2), dtype=np.uint16)
self.array = ma.masked_array(locarray.copy(), dtype=np.uint16)
elif self.wolftype in [WOLF_ARRAY_FULL_UINTEGER32]:
locarray = np.frombuffer(f.read(self.nbx * self.nby * 4), dtype=np.uint32)
self.array = ma.masked_array(locarray.copy(), dtype=np.uint32)
if self.nbdims == 2:
self.array = self.array.reshape(self.nbx, self.nby, order='F')
if self.flipupd:
self.array=np.fliplr(self.array)
elif self.nbdims == 3:
self.array = self.array.reshape(self.nbx, self.nby, self.nbz, order='F')
[docs]
def _reload(self):
""" Reload the data from the file """
self.read_all()
self.mask_force_null()
[docs]
def write_all(self, newpath:str | Path = None, EPSG:int = None):
"""
Ecriture de tous les fichiers d'un Wolf array
:param newpath: new path and filename with extension -- if None, use the current filename
:param EPSG: EPSG code for geotiff
"""
if isinstance(newpath, Path):
newpath = str(newpath)
if newpath is not None:
self.filename = newpath
self.filename = str(self.filename)
if self.filename.endswith('.tif') or self.filename.endswith('.tiff'):
self.export_geotif(EPSG=EPSG)
elif self.filename.endswith('.npy'):
writing_header = True
if self.dtype != self.array.data.dtype:
logging.warning(_('Data type changed -- Force conversion to internal numpy array'))
locarray = self.array.data
if locarray.dtype == np.float32:
self.wolftype = WOLF_ARRAY_FULL_SINGLE
logging.warning(_('Data type changed to float32'))
elif locarray.dtype == np.float64:
self.wolftype = WOLF_ARRAY_FULL_DOUBLE
logging.warning(_('Data type changed to float64'))
elif locarray.dtype == np.int32:
self.wolftype = WOLF_ARRAY_FULL_INTEGER
logging.warning(_('Data type changed to int32'))
elif locarray.dtype == np.int16:
self.wolftype = WOLF_ARRAY_FULL_INTEGER16
logging.warning(_('Data type changed to int16'))
elif locarray.dtype == np.int8:
self.wolftype = WOLF_ARRAY_FULL_INTEGER8
logging.warning(_('Data type changed to int8'))
elif locarray.dtype == np.uint8:
self.wolftype = WOLF_ARRAY_FULL_UINTEGER8
logging.warning(_('Data type changed to uint8'))
elif locarray.dtype == np.uint16:
self.wolftype = WOLF_ARRAY_FULL_UINTEGER16
logging.warning(_('Data type changed to uint16'))
elif locarray.dtype == np.uint32:
self.wolftype = WOLF_ARRAY_FULL_UINTEGER32
logging.warning(_('Data type changed to uint32'))
else:
logging.error(_('Unsupported type in numpy file -- Abort wrting header file'))
writing_header = False
np.save(self.filename, self.array.data)
if writing_header:
self.write_txt_header()
else:
self.write_txt_header()
self.write_array()
[docs]
def write_array(self):
""" Write the raw binary array data to the current file. """
self.array.data.transpose().tofile(self.filename, "")
[docs]
def write_xyz(self, fname:str):
"""Write all unmasked cell data as an XYZ text file.
:param fname: output file path for the XYZ file
"""
my_file = XYZFile(fname)
my_file.fill_from_wolf_array(self)
my_file.write_to_file()
[docs]
def get_xyz(self, which='all') -> np.ndarray:
"""Return an array of xyz coordinates and values
:param which: which values to export: ``'all'`` includes all unmasked cells
:return: Nx3 array of ``[x, y, z]`` coordinates for all unmasked cells
:rtype: numpy.ndarray
"""
x1, y1 = self.get_xy_from_ij(0, 0)
x2, y2 = self.get_xy_from_ij(self.nbx, self.nby, aswolf=True)
xloc = np.linspace(x1, x2, self.nbx)
yloc = np.linspace(y1, y2, self.nby)
xy = np.meshgrid(xloc, yloc, indexing='xy')
xyz = np.column_stack([xy[0].flatten(), xy[1].flatten(), self.array.flatten()])
filter = np.invert(ma.getmaskarray(self.array).flatten())
return xyz[filter]
# ========================================================================
# Import (External Formats)
# ========================================================================
[docs]
def _import_npy(self, fn:str='', crop:list[float]=None):
"""
Import a numpy file.
Must be called after the header is initialized, e.g. read_txt_header.
:param fn: filename
:param crop: crop the data - [xmin, xmax, ymin, ymax]
"""
if fn !='':
pass
elif self.filename !='':
fn = self.filename
else:
return
# Numpy format
locarray = np.load(self.filename)
assert locarray.shape[0] == self.nbx, _('Incompatible dimensions')
assert locarray.shape[1] == self.nby, _('Incompatible dimensions')
if crop is not None :
# logging.error(_('Cropping not yet implemented for numpy files'))
imin, jmin = self.get_ij_from_xy(crop[0][0], crop[1][0])
imax, jmax = self.get_ij_from_xy(crop[0][1], crop[1][1])
imin = int(imin)
jmin = int(jmin)
imax = int(imax)
jmax = int(jmax)
self.nbx = imax - imin
self.nby = jmax - jmin
self.dx = self.dx
self.dy = self.dy
self.origx, self.origy = self.get_xy_from_ij(imin, jmin)
self.origx -= self.dx / 2.
self.origy -= self.dy / 2.
self.translx = self.translx
self.transly = self.transly
locarray = locarray[imin:imax, jmin:jmax]
self.array = np.ma.asarray(locarray)
[docs]
def import_vrt(self, fn:str='', which:int = None, crop:list[float]=None):
""" Import a VRT file.
A Virtual Raster Tile (VRT) is an XML-based format used by GDAL to describe
a raster dataset. It allows users to create virtual datasets that reference
other raster files without duplicating the actual data. VRT files can be used
to mosaic multiple raster files, apply transformations, or define custom
raster structures.
:param fn: filename
:param which: band to import
:param crop: crop the data - [xmin, xmax, ymin, ymax]
"""
if fn !='':
pass
elif self.filename !='':
fn = self.filename
else:
return
if not os.path.exists(fn):
logging.error(_('File not found'))
return
# convert vrt to tif and mport tif
with tempfile.TemporaryDirectory() as tmpdirname:
tmpfile = join(tmpdirname, 'tmp.tif')
if crop is not None:
tmpdx = self.dx
fn_crop = fn + '_crop.tif'
if type(crop) is np.ndarray:
if crop.shape == (4,):
logging.error(_('Crop must be a list or a numpy array with 4 values - [[xmin, xmax], [ymin, ymax]]'))
crop = [[crop[0], crop[1]], [crop[2], crop[3]]]
elif isinstance(crop, list | tuple):
if len(crop) == 4:
logging.error(_('Crop must be a list or a numpy array with 4 values - [[xmin, xmax], [ymin, ymax]]'))
crop = [[crop[0], crop[1]], [crop[2], crop[3]]]
else:
# crop is not a valid list/array - ask user via hook
raster_in:gdal.Dataset
raster_in = gdal.Open(fn)
geotr = raster_in.GetGeoTransform()
self.dx = geotr[1]
self.dy = abs(geotr[5])
crop = self._prompt_crop(self.dx, self.dy)
if crop is None:
return
[xmin, xmax], [ymin, ymax] = crop
gdal.Translate(tmpfile, fn, projWin=[xmin, ymax, xmax, ymin])
else:
gdal.Translate(tmpfile, fn)
self.import_geotif(tmpfile, which)
[docs]
def import_geotif(self, fn:str='', which:int = None, crop:list[float]=None):
"""
Import Geotiff Raster file.
A Geotiff (Geographic Tagged Image File Format) is a widely used raster file format
that embeds geographic metadata within the TIFF file structure. It allows for the storage
of georeferencing information, such as coordinate systems, map projections, and spatial
extents, alongside the raster image data. This makes Geotiff files suitable for use in
Geographic Information Systems (GIS) and remote sensing applications.
Formats supportés :
- Int8
- Int16
- Int32
- Float32
- Float64
- uInt8
- uInt16
- uInt32
:param fn: filename
:param which: band to import
:param crop: crop the data - [xmin, xmax, ymin, ymax] or [[xmin, xmax], [ymin, ymax]]
"""
# from osgeo import gdal, osr, gdalconst
if fn !='':
pass
elif self.filename !='':
fn = self.filename
else:
logging.error(_('No filename provided in import_geotif'))
return
need_wx = False
raster:gdal.Dataset
if crop is not None :
if not os.path.exists(fn):
logging.error(_('File not found'))
return
tmpdx = self.dx
fn_crop = fn + '_crop.tif'
raster_in:gdal.Dataset
raster_in = gdal.Open(fn)
geotr = raster_in.GetGeoTransform()
self.dx = geotr[1]
self.dy = abs(geotr[5])
ulx = geotr[0]
uly = geotr[3]
llx = ulx + geotr[1] * raster_in.RasterXSize
lly = uly + geotr[5] * raster_in.RasterYSize
if type(crop) is np.ndarray:
if crop.shape == (4,):
pass
elif crop.shape == (2,2):
crop = [crop[0][0], crop[0][1], crop[1][0], crop[1][1]]
else:
logging.error(_('Crop must be a list or a numpy array with 4 values - xmin, xmax, ymin, ymax'))
elif type(crop) in [list, tuple]:
if len(crop) == 2:
if len(crop[0]) == 2 and len(crop[1]) == 2:
crop = [crop[0][0], crop[0][1], crop[1][0], crop[1][1]]
else:
logging.error(_('Crop must be a list or a numpy array with 4 values - xmin, xmax, ymin, ymax'))
return
elif len(crop) ==4:
pass
else:
# crop is not a valid list/array - ask user via hook
crop = self._prompt_crop(self.dx, self.dy)
if crop is None:
return
# _prompt_crop returns [[xmin,xmax],[ymin,ymax]], flatten to [xmin,xmax,ymin,ymax]
if isinstance(crop, list) and len(crop) == 2:
crop = [crop[0][0], crop[0][1], crop[1][0], crop[1][1]]
need_wx = True
xmin, xmax, ymin, ymax = crop
if need_wx:
fn_crop = self._prompt_save_file(_('Save the cropped file for later'), "Tiff files (*.tif)|*.tif")
if fn_crop is None:
from tempfile import NamedTemporaryFile
fn_crop = str(NamedTemporaryFile(suffix='.tif').name)
if uly > lly:
raster = gdal.Translate(fn_crop, fn, projWin=[xmin, ymax, xmax, ymin])
else:
raster = gdal.Translate(fn_crop, fn, projWin=[xmin, ymin, xmax, ymax])
fn = fn_crop
else:
fn_tmp = '__tmp_wolf.tif'
if uly > lly:
raster = gdal.Translate(fn_tmp, fn, projWin=[xmin, ymax, xmax, ymin])
else:
raster = gdal.Translate(fn_tmp, fn, projWin=[xmin, ymin, xmax, ymax])
fn = fn_tmp
else:
raster = gdal.Open(fn)
if raster is None:
logging.error(_('Could not open the file {fn}'))
return
# Dimensions
self.nbx = raster.RasterXSize
self.nby = raster.RasterYSize
# Number of bands
nb = raster.RasterCount
if which is None:
names = [raster.GetRasterBand(which+1).GetDescription() for which in range(nb)]
if nb>1:
which = self._prompt_band_selection(names)
if which is None:
return
else:
which=1
else:
if which > nb:
which = nb
if which < 1:
which = 1
# Metadata for the raster dataset
# meta = raster.GetMetadata()
# Read the raster band as separate variable
band = raster.GetRasterBand(which)
# Data type of the values
self.nullvalue = band.GetNoDataValue()
if self.nullvalue is None:
self.nullvalue = 0.
geotr = raster.GetGeoTransform()
self.origx = geotr[0]
self.dx = geotr[1]
self.dy = abs(geotr[5])
try:
if geotr[5] <0.:
self.origy = geotr[3]+geotr[5]*self.nby
tmp_array = np.transpose(np.flipud(band.ReadAsArray()))
else:
self.origy = geotr[3]
tmp_array = np.transpose(band.ReadAsArray())
self.array = np.ma.asarray(tmp_array.copy())
assert self.array.iscontiguous(), _('Array is not contiguous')
del tmp_array
if self.array.dtype == np.float64:
self.wolftype = WOLF_ARRAY_FULL_DOUBLE
elif self.array.dtype == np.float32:
self.wolftype = WOLF_ARRAY_FULL_SINGLE
elif self.array.dtype == np.int32:
self.wolftype = WOLF_ARRAY_FULL_INTEGER
# elif self.array.dtype == np.uint32:
# logging.warning(_('***************************************************'))
# logging.warning(_(' Conversion of uint32 to int32'))
# logging.warning(_(' If you save this file, it will be converted to int32'))
# logging.warning(_('***************************************************'))
# self.array = self.array.astype(np.int32)
# self.wolftype = WOLF_ARRAY_FULL_INTEGER
# elif self.array.dtype == np.uint8:
# logging.warning(_('***************************************************'))
# logging.warning(_(' Conversion of uint8 to int8'))
# logging.warning(_(' If you save this file, it will be converted to int8'))
# logging.warning(_('***************************************************'))
# self.array = self.array.astype(np.int8)
# self.wolftype = WOLF_ARRAY_FULL_INTEGER8
elif self.array.dtype == np.uint32:
self.wolftype = WOLF_ARRAY_FULL_UINTEGER32
elif self.array.dtype == np.uint8:
self.wolftype = WOLF_ARRAY_FULL_UINTEGER8
elif self.array.dtype == np.uint16:
self.wolftype = WOLF_ARRAY_FULL_UINTEGER16
elif self.array.dtype == np.int8:
self.wolftype = WOLF_ARRAY_FULL_INTEGER8
elif self.array.dtype == np.int16:
self.wolftype = WOLF_ARRAY_FULL_INTEGER16
except:
logging.warning(_('Error during importing tif file'))
# Find the EPSG code
proj = raster.GetProjection()
srs = osr.SpatialReference()
srs.ImportFromWkt(proj)
if srs.IsProjected:
self.epsg = int(srs.GetAttrValue('AUTHORITY',1))
if "EPSG:" in proj:
try:
idx1 = proj.index("EPSG:")
idx2 = proj.index('"', idx1)
epsg2 = int(proj[idx1+5:idx2])
if epsg2 != self.epsg:
logging.warning(_('Mismatch in EPSG code detection: {} and {}').format(self.epsg, epsg2))
logging.warning(_("Check your file's projection -- You can use gdalinfo command line tool"))
self.epsg = epsg2
logging.warning(_('Using EPSG: {}').format(self.epsg))
except:
logging.warning(_('Could not check EPSG code in the projection string'))
logging.warning(_("Check your file's projection -- You can use gdalinfo command line tool"))
else:
self.epsg = self.epsg_parent
# Close the raster
raster.FlushCache()
raster = None
try:
if fn_tmp == '__tmp_wolf.tif':
os.remove(fn_tmp)
except:
pass
[docs]
def import_from_gltf(self, fn:str='', fnpos:str='', interp_method:Literal['matplotlib','scipy'] = 'matplotlib'):
"""
Import from GLTF/GLB format
:param fn: filename
:param fnpos: filename for the position's information
:param interp_method: method for the interpolation -- 'matplotlib' or 'scipy'
"""
#FIXME : add the possibility to use PyVista
if fn == '' or fnpos == '':
logging.info(_('Retry !! -- Bad files'))
return
if self.mapviewer is not None:
if self.mapviewer.link_params is None:
self.mapviewer.link_params = {}
self.mapviewer.link_params['gltf file'] = fn
self.mapviewer.link_params['gltf pos'] = fnpos
mytri = Triangulation()
mytri.import_from_gltf(fn)
with open(fnpos, 'r') as f:
mylines = f.read().splitlines()
ox = float(mylines[0])
oy = float(mylines[1])
nbx = int(mylines[2])
nby = int(mylines[3])
i1 = int(mylines[4])
j1 = int(mylines[5])
i2 = int(mylines[6])
j2 = int(mylines[7])
xmin = float(mylines[8])
xmax = float(mylines[9])
ymin = float(mylines[10])
ymax = float(mylines[11])
try:
znull = float(mylines[12])
except:
znull=-99999.
x = np.arange(self.dx / 2. + ox, float(nbx) * self.dx + self.dx / 2 + ox, self.dx)
y = np.arange(self.dy / 2. + oy, float(nby) * self.dy + self.dy / 2 + oy, self.dy)
if interp_method =='matplotlib':
grid_x, grid_y = np.meshgrid(x, y, indexing='ij')
self.interpolate_on_triangulation(np.asarray(mytri.pts),mytri.tri, grid_x, grid_y, mytri.get_mask())
else:
grid_x, grid_y = np.meshgrid(x, y, sparse=True, indexing='xy')
newvalues = griddata(np.asarray(mytri.pts)[:,0:2], np.asarray(mytri.pts)[:,2], (grid_x, grid_y), method='linear')
locmask = np.logical_and(np.logical_not(self.array.mask[i1:i2, j1:j2]),
np.logical_not(np.isnan(newvalues.transpose())))
self.array.data[i1:i2, j1:j2][locmask] = newvalues.transpose()[locmask]
self.reset_plot()
# ========================================================================
# Export (External Formats)
# ========================================================================
[docs]
def export_geotif(self, outdir:str= '', extent:str= '', EPSG:int = None):
"""
Export de la matrice au format Geotiff
Pour sauvegarder l'objet au format Geotiff, il est recommandé d'utiliser
la fonction write_all plutôt que celle-ci directement.
Formats supportés :
- Int32
- Int16
- Int8
- Byte
- Float32
- Float64
:param outdir: directory - If provided, the file will be savd as "outdir/idx+extent.tif"
If not provided, we use the filename attribute
:param extent: suffix to add to the filename before the extension '.tif' (only if outdir is provided)
:param EPSG: EPSG code, by default 31370 (Lambert 72)
"""
if EPSG is None and self.epsg is None:
EPSG = 31370 # Default EPSG code is Belgian Lambert 72
elif EPSG is None:
EPSG = self.epsg
outdir = str(outdir)
extent = str(extent)
srs = osr.SpatialReference()
srs.ImportFromEPSG(EPSG)
if outdir=='' and extent=='':
filename = self.filename
else:
filename = join(outdir,self.idx)+extent+'.tif'
arr=self.array
if arr.dtype == np.float32:
arr_type = gdal.GDT_Float32
nullvalue = self.nullvalue
elif arr.dtype == np.float64:
arr_type = gdal.GDT_Float64
nullvalue = self.nullvalue
elif arr.dtype == np.int8:
arr_type = gdal.GDT_Byte
nullvalue = int(self.nullvalue)
elif arr.dtype == np.int16:
arr_type = gdal.GDT_Int16
nullvalue = int(self.nullvalue)
elif arr.dtype == np.uint8:
arr_type = gdal.GDT_Byte
nullvalue = int(self.nullvalue)
else:
arr_type = gdal.GDT_Int32
nullvalue = int(self.nullvalue)
driver: gdal.Driver
out_ds: gdal.Dataset
band: gdal.Band
driver = gdal.GetDriverByName("GTiff")
# bytes_per_pixel = arr.data.dtype.itemsize
estimated_file_size = self.memory_usage #arr.shape[0] * arr.shape[1] * bytes_per_pixel
# Check if estimated file size exceeds 4GB
if (estimated_file_size > 4 * 1024**3): # 4GB in bytes
options = ['COMPRESS=LZW', 'BIGTIFF=YES']
logging.info('BigTIFF format will be used!')
else:
options = ['COMPRESS=LZW']
out_ds = driver.Create(str(filename), arr.shape[0], arr.shape[1], 1, arr_type, options=options)
if out_ds is None:
logging.error(_('Could not create the file {filename}'))
return
out_ds.SetProjection(srs.ExportToWkt())
# On utilise le coin supérieur gauche de la matrice et la taille des pixels selon y est négative
# !! gdalBuiltvrt ne supporte que cette convention !!
out_ds.SetGeoTransform([self.origx+self.translx,
self.dx,
0.,
self.origy+self.transly+float(self.nby)*self.dy,
0.,
-self.dy])
band = out_ds.GetRasterBand(1)
band.SetNoDataValue(nullvalue)
band.WriteArray(np.flipud(arr.data.transpose()))
band.FlushCache()
try:
band.ComputeStatistics(True)
except RuntimeError as e:
# If the array is mainly composed of null values, ComputeStatistics(True) will fail because ComputeStatistics(True) takes a subset of the data to compute statistics (and it may only find null values resulting to an error while in the array, it is not the case)
try:
band.ComputeStatistics(0) # Using 0 for exact stats instead of True (which implies approximate)
except RuntimeError as e:
print("Warning: ComputeStatistics failed:", e)
[docs]
def export_cityjson(self, outdir:str= '', extent:str= '', EPSG:int = None):
"""Export the array as a CityJSON file with one polygon per grid cell.
:param outdir: output directory; empty string uses :attr:`filename`
:param extent: suffix appended to filename before ``.json`` extension
:param EPSG: EPSG code for the coordinate reference system (default: 31370)
"""
if EPSG is None and self.epsg is None:
EPSG = 31370 # Default EPSG code is Belgian Lambert 72
elif EPSG is None:
EPSG = self.epsg
outdir = str(outdir)
extent = str(extent)
if outdir=='' and extent=='':
filename = self.filename
else:
filename = join(outdir,self.idx)+extent+'.json'
# Create a CityJSON object
cityjson = {
"type": "CityJSON",
"version": "1.0",
"metadata": {
"referenceSystem": f"urn:ogc:def:crs:EPSG::{EPSG}"
},
"CityObjects": {},
"vertices": []
}
# Iterate through the array and create CityObjects for each cell
for i in tqdm(range(self.nbx)):
for j in range(self.nby):
x, y = self.get_xy_from_ij(i, j)
cityjson["CityObjects"][f"cell_{i}_{j}"] = {
"type": "CityObject",
"geometry": {
"type": "Polygon",
"coordinates": [
[
[x - self.dx / 2., y - self.dy / 2.],
[x + self.dx / 2., y - self.dy / 2.],
[x + self.dx / 2., y + self.dy / 2.],
[x - self.dx / 2., y + self.dy / 2.],
[x - self.dx / 2., y - self.dy / 2.]
]
]
}
}
with open(filename, 'w') as f:
json.dump(cityjson, f, indent=2)
[docs]
def export_to_gltf(self, bounds:list[float]=None, fn:str=''):
"""
Export to GLTF/GLB format
:param bounds: [[xmin,xmax],[ymin,ymax]]
:param fn: filename
"""
mytri, znull = self.get_triangulation(bounds)
mytri.export_to_gltf(fn)
mytri.saveas(fn+'.tri')
if bounds is None:
ox = self.origx + self.translx
oy = self.origy + self.transly
nbx = self.nbx
nby = self.nby
i1 = 0
i2 = self.nbx
j1 = 0
j2 = self.nby
bounds = [[ox,ox+float(nbx)*self.dx],[oy,oy+float(nby)*self.dy]]
else:
ox = max(self.origx, bounds[0][0])
oy = max(self.origy, bounds[1][0])
i1, j1 = self.get_ij_from_xy(ox, oy)
i2, j2 = self.get_ij_from_xy(bounds[0][1], bounds[1][1])
i1 = max(i1, 0)
j1 = max(j1, 0)
i2 = min(i2 + 1, self.nbx)
j2 = min(j2 + 1, self.nby)
nbx = i2 - i1
nby = j2 - j1
with open(fn + '.pos', 'w') as f:
f.write(str(ox) + '\n')
f.write(str(oy) + '\n')
f.write(str(nbx) + '\n')
f.write(str(nby) + '\n')
f.write(str(i1) + '\n')
f.write(str(j1) + '\n')
f.write(str(i2) + '\n')
f.write(str(j2) + '\n')
f.write(str(bounds[0][0]) + '\n')
f.write(str(bounds[0][1]) + '\n')
f.write(str(bounds[1][0]) + '\n')
f.write(str(bounds[1][1]) + '\n')
f.write(str(znull))
# ========================================================================
# Coordinate, Bounds & Grid Utilities
# ========================================================================
[docs]
def check_bounds_ij(self, i:int, j:int):
"""Check if i and j are inside the array bounds
:param i: column index (0-based)
:param j: row index (0-based)
:return: ``True`` if (i, j) is inside the array bounds
:rtype: bool
"""
return i>=0 and j>=0 and i<self.nbx and j<self.nby
[docs]
def check_bounds_xy(self, x:float, y:float):
"""Check if i and j are inside the array bounds
:param x: x-coordinate in world space
:param y: y-coordinate in world space
:return: ``True`` if (x, y) falls inside the array spatial extent
:rtype: bool
"""
i,j = self.get_ij_from_xy(x,y)
return self.check_bounds_ij(i,j)
[docs]
def get_centers(self, usenap:bool = True):
"""Get the centers of the cells
:param usenap: if ``True``, return only unmasked cell centers; if ``False``, return all
:return: flattened array of cell center coordinates ``[x1, y1, x2, y2, ...]``
:rtype: numpy.ndarray
"""
if usenap:
ij = np.argwhere(self.array.mask==False,)
xy = self.get_xy_from_ij_array(ij).copy().flatten()
else:
ij = np.meshgrid(np.arange(self.nbx), np.arange(self.nby))
ij = np.asarray([ij[0].flatten(), ij[1].flatten()]).T
xy = self.get_xy_from_ij_array(ij).copy().flatten()
return xy.astype(np.float32)
[docs]
def get_dxdy_min(self):
"""
Return the minimal size
Mainly useful in PyVertexVectors to get the minimal size of the cells
and ensure compatibility with the 2D results (GPU and Multiblocks)
"""
return min(self.dx, self.dy)
[docs]
def get_dxdy_max(self):
"""
Return the maximal size
Mainly useful in PyVertexVectors to get the minimal size of the cells
and ensure compatibility with the 2D results (GPU and Multiblocks)
"""
return max(self.dx, self.dy)
[docs]
def meshgrid(self, mode:Literal['gc', 'borders']='gc'):
"""
Création d'un maillage 2D
:param mode: 'gc' pour les centres de mailles, 'borders' pour les bords de mailles
"""
x_start = self.translx + self.origx
y_start = self.transly + self.origy
if mode == 'gc':
x_discr = np.linspace(x_start + self.dx / 2, x_start + self.nbx * self.dx - self.dx / 2, self.nbx)
y_discr = np.linspace(y_start + self.dy / 2, y_start + self.nby * self.dy - self.dy / 2, self.nby)
elif mode == 'borders':
x_discr = np.linspace(x_start, x_start + self.nbx * self.dx, self.nbx + 1)
y_discr = np.linspace(y_start, y_start + self.nby * self.dy, self.nby + 1)
y, x = np.meshgrid(y_discr, x_discr)
return x, y
[docs]
def get_value(self, x:float, y:float, z:float=0., nullvalue:float=-99999, convert_to_float:bool=True):
"""
Return the value at given coordinates
:param x: x coordinate
:param y: y coordinate
:param z: z coordinate
:param nullvalue: value to return if the point is outside the array
:param convert_to_float: if ``True``, cast the result to ``float``
"""
if isinstance(self.array.mask, np.bool_):
logging.error(_('Mask is not an array - Please check your data'))
return nullvalue
if self.nbdims == 2:
i, j = self.get_ij_from_xy(x, y)
if i >= 0 and i < self.nbx and j >= 0 and j < self.nby:
if self.array.mask[i, j]:
value = nullvalue
else:
value = self.array[i, j]
else:
value = nullvalue
elif self.nbdims == 3:
i, j, k = self.get_ij_from_xy(x, y, z)
if i >= 0 and i < self.nbx and j >= 0 and j < self.nby and k >= 0 and k < self.nbz:
if self.array.mask[i, j, k]:
value = nullvalue
else:
value = self.array[i, j, k]
else:
value = nullvalue
if convert_to_float:
return float(value)
else:
return value
[docs]
def get_xlim(self, window_x:float, window_y:float):
"""
Return the limits in x for a given window size
:param window_x: window size in x
:param window_y: window size in y
"""
a_x = window_x / (float(self.nbx) * self.dx)
a_y = window_y / (float(self.nby) * self.dy)
if a_x < a_y:
# C'est la mise à l'échelle selon x qui compte
return (self.origx + self.translx, self.origx + self.translx + self.nbx * self.dx)
else:
# C'est la mise à l'échelle selon y qui compte
l = (self.nby * self.dy) / window_y * window_x
return (self.origx + self.translx + self.nbx * self.dx * 0.5 - l * 0.5,
self.origx + self.translx + self.nbx * self.dx * 0.5 + l * 0.5)
[docs]
def get_ylim(self, window_x:float, window_y:float):
"""
Retrun the limits in y for a given window size
:param window_x: window size in x
:param window_y: window size in y
"""
a_x = window_x / (float(self.nbx) * self.dx)
a_y = window_y / (float(self.nby) * self.dy)
if a_x < a_y:
# C'est la mise à l'échelle selon x qui compte
l = (self.nbx * self.dx) / window_x * window_y
return (self.origy + self.transly + self.nby * self.dy * 0.5 - l * 0.5,
self.origy + self.transly + self.nby * self.dy * 0.5 + l * 0.5)
else:
# C'est la mise à l'échelle selon y qui compte
return (self.origy + self.transly, self.origy + self.transly + self.nby * self.dy)
[docs]
def get_working_array(self, onzoom:list[float]=[]):
"""
Return the part of the array in the zoom window
:param onzoom: zoom window -- [xmin, xmax, ymin, ymax]
"""
if onzoom != []:
istart, jstart = self.get_ij_from_xy(onzoom[0], onzoom[2])
iend, jend = self.get_ij_from_xy(onzoom[1], onzoom[3])
istart = 0 if istart < 0 else istart
jstart = 0 if jstart < 0 else jstart
iend = self.nbx if iend > self.nbx else iend
jend = self.nby if jend > self.nby else jend
partarray = self.array[istart:iend, jstart:jend]
self.nbnotnullzoom = partarray.count()
return partarray[partarray.mask == False]
else:
return self.array[self.array.mask == False]
# ========================================================================
# Operator Overloading
# ========================================================================
def __add__(self, other):
"""Addition operator (``+``). Return a new instance of the same type.
:param other: right-hand operand -- either a scalar (``float`` / ``int``) or another :class:`WolfArrayModel`
:return: new array with element-wise sum
:rtype: same type as ``self``
"""
newArray = self._new_like()
if type(other) in [float, int]:
if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]:
other = float(other)
elif type(other) == float and self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]:
other = int(other)
if other != 0.:
newArray.array = np.ma.masked_array(self.array + other, self.array.mask, dtype=self.array.dtype)
else:
newArray.array = np.ma.masked_array(self.array + other.array, self.array.mask, dtype=self.array.dtype)
newArray.count()
assert newArray.array.dtype == self.array.dtype, _('Bad dtype')
return newArray
def __sub__(self, other):
"""Subtraction operator (``-``). Return a new instance of the same type.
:param other: right-hand operand -- either a scalar (``float`` / ``int``) or another :class:`WolfArrayModel`
:return: new array with element-wise difference
:rtype: same type as ``self``
"""
newArray = self._new_like()
if type(other) in [float, int]:
if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]:
other = float(other)
elif type(other) == float and self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]:
other = int(other)
if other != 0.:
newArray.array = np.ma.masked_array(self.array - other, self.array.mask, dtype=self.array.dtype)
else:
newArray.array = np.ma.masked_array(self.array - other.array, self.array.mask, dtype=self.array.dtype)
newArray.count()
assert newArray.array.dtype == self.array.dtype, _('Bad dtype')
return newArray
def __mul__(self, other):
"""Multiplication operator (``*``). Return a new instance of the same type.
:param other: right-hand operand -- either a scalar (``float`` / ``int``) or another :class:`WolfArrayModel`
:return: new array with element-wise product
:rtype: same type as ``self``
"""
newArray = self._new_like()
if type(other) in [float, int]:
if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]:
other = float(other)
elif type(other) == float and self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]:
other = int(other)
if other != 0.:
newArray.array = np.ma.masked_array(self.array * other, self.array.mask, dtype=self.array.dtype)
else:
logging.debug(_('Multiplication by 0'))
newArray.array = np.ma.masked_array(self.array * other, self.array.mask, dtype=self.array.dtype)
else:
newArray.array = np.ma.masked_array(self.array * other.array, self.array.mask, dtype=self.array.dtype)
newArray.count()
assert newArray.array.dtype == self.array.dtype, _('Bad dtype')
return newArray
def __truediv__(self, other):
"""Division operator (``/``). Return a new instance of the same type.
:param other: right-hand operand -- either a scalar (``float`` / ``int``) or another :class:`WolfArrayModel`
:return: new array with element-wise quotient
:rtype: same type as ``self``
"""
newArray = self._new_like()
if type(other) in [float, int]:
if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]:
other = float(other)
elif type(other) == float and self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]:
other = int(other)
if other != 0.:
newArray.array = np.ma.masked_array(self.array / other, self.array.mask, dtype=self.array.dtype)
else:
logging.error(_('Division by 0 -- Nothing to do !'))
else:
newArray.array = np.ma.masked_array(np.where(other == 0., 0., self.array / other.array), self.array.mask, dtype=self.array.dtype)
newArray.count()
assert newArray.array.dtype == self.array.dtype, _('Bad dtype')
return newArray
def __pow__(self, other):
"""Power operator (``**``). Return a new instance of the same type.
:param other: right-hand operand -- either a scalar (``float`` / ``int``) or another :class:`WolfArrayModel`
:return: new array with element-wise power
:rtype: same type as ``self``
"""
newArray = self._new_like()
if type(other) in [float, int]:
if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]:
other = float(other)
elif type(other) == float and self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]:
other = int(other)
newArray.array = np.ma.masked_array(self.array ** other, self.array.mask, dtype=self.array.dtype)
else:
logging.error(_('Power operator only implemented for scalars'))
newArray.array[:,:] = 0
newArray.count()
assert newArray.array.dtype == self.array.dtype, _('Bad dtype')
return newArray
# ---- Unary operators --------------------------------------------------
def __neg__(self):
"""Unary negation (``-array``). Return a new instance of the same type.
:return: new array with element-wise negation
:rtype: same type as ``self``
"""
newArray = self._new_like()
newArray.array = np.ma.masked_array(-self.array, self.array.mask, dtype=self.array.dtype)
newArray.count()
return newArray
def __abs__(self):
"""Absolute value (``abs(array)``). Return a new instance of the same type.
:return: new array with element-wise absolute values
:rtype: same type as ``self``
"""
newArray = self._new_like()
newArray.array = np.ma.masked_array(np.abs(self.array), self.array.mask, dtype=self.array.dtype)
newArray.count()
return newArray
# ---- Reflected (right-hand) operators ---------------------------------
def __radd__(self, other):
"""Reflected addition (``scalar + array``).
:param other: left-hand scalar operand
:rtype: same type as ``self``
"""
return self.__add__(other)
def __rsub__(self, other):
"""Reflected subtraction (``scalar - array``).
:param other: left-hand scalar operand
:rtype: same type as ``self``
"""
return (-self).__add__(other)
def __rmul__(self, other):
"""Reflected multiplication (``scalar * array``).
:param other: left-hand scalar operand
:rtype: same type as ``self``
"""
return self.__mul__(other)
def __rtruediv__(self, other):
"""Reflected division (``scalar / array``).
:param other: left-hand scalar operand
:rtype: same type as ``self``
"""
newArray = self._new_like()
if type(other) in [float, int]:
if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]:
other = float(other)
safe = np.where(self.array == 0, 1, self.array)
result = np.where(self.array == 0, 0, other / safe)
newArray.array = np.ma.masked_array(result, self.array.mask, dtype=self.array.dtype)
else:
logging.error(_('Reflected division only implemented for scalars'))
newArray.array = np.ma.masked_array(np.zeros_like(self.array), self.array.mask, dtype=self.array.dtype)
newArray.count()
return newArray
# ---- Floor division ---------------------------------------------------
def __floordiv__(self, other):
"""Floor division operator (``//``). Return a new instance of the same type.
:param other: right-hand operand -- either a scalar (``float`` / ``int``) or another :class:`WolfArrayModel`
:return: new array with element-wise floor division
:rtype: same type as ``self``
"""
newArray = self._new_like()
if type(other) in [float, int]:
if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]:
other = float(other)
elif type(other) == float and self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]:
other = int(other)
if other != 0:
newArray.array = np.ma.masked_array(self.array // other, self.array.mask, dtype=self.array.dtype)
else:
logging.error(_('Floor division by 0 -- Nothing to do !'))
else:
newArray.array = np.ma.masked_array(np.where(other.array == 0, 0, self.array // np.where(other.array == 0, 1, other.array)), self.array.mask, dtype=self.array.dtype)
newArray.count()
assert newArray.array.dtype == self.array.dtype, _('Bad dtype')
return newArray
def __rfloordiv__(self, other):
"""Reflected floor division (``scalar // array``).
:param other: left-hand scalar operand
:rtype: same type as ``self``
"""
newArray = self._new_like()
if type(other) in [float, int]:
if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]:
other = float(other)
safe = np.where(self.array == 0, 1, self.array)
result = np.where(self.array == 0, 0, other // safe)
newArray.array = np.ma.masked_array(result, self.array.mask, dtype=self.array.dtype)
else:
logging.error(_('Reflected floor division only implemented for scalars'))
newArray.array = np.ma.masked_array(np.zeros_like(self.array), self.array.mask, dtype=self.array.dtype)
newArray.count()
return newArray
# ========================================================================
# Masking & Null Value Operations
# ========================================================================
[docs]
def loadnap_and_apply(self):
"""
Load a mask file (aka nap) and apply it to the array;
The mask values are set to the nullvalue.
The mask file must have the same name as the array file, with the extension .napbin.
It is useful for 2D WOLF simulations.
"""
file_name, file_extension = os.path.splitext(self.filename)
fnnap = file_name + '.napbin'
if os.path.exists(fnnap):
locnap = WolfArrayModel(fnnap)
self.array.data[np.where(locnap.array.mask)] = self.nullvalue
self.mask_data(self.nullvalue)
self.reset_plot()
[docs]
def mask_data(self, value):
"""Mask all cells whose value equals the given value.
:param value: cells equal to this value will be masked
"""
if self.array is None:
return
try:
if not (np.isnan(value) or math.isnan(value)):
if self.wolftype in [WOLF_ARRAY_FULL_INTEGER]:
value=np.int32(value)
elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]:
value=np.int16(value)
elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER8]:
try:
value = np.int8(value)
except:
value = 0
logging.warning(_('Value too high for int8 conversion'))
logging.warning(_('Value set to 0'))
elif self.wolftype in [WOLF_ARRAY_FULL_UINTEGER8]:
try:
value = np.uint8(value)
except:
value = 0
logging.warning(_('Value too high for int8 conversion'))
logging.warning(_('Value set to 0'))
except:
logging.error(_('Type not supported : {} - {}'.format(value, type(value))))
logging.warning(_('Masking operation compromised'))
if value is not None:
if isinstance(self.array.mask, np.bool_):
# mask is not an array, but a single boolean value
# we must create a new mask array
if np.isnan(value) or math.isnan(value):
self.array.mask = np.isnan(self.array.data)
else:
self.array.mask = self.array.data == value
else:
# Copy to prevent unlinking the mask (see `mask_reset`)
if np.isnan(value) or math.isnan(value):
np.copyto(self.array.mask, np.isnan(self.array.data))
# self.array.mask[:,:] = np.isnan(self.array.data)
else:
np.copyto(self.array.mask, self.array.data == value)
# self.array.mask[:,:] = self.array.data == value
self.count()
[docs]
def mask_lower(self, value):
"""Mask cell where values are strictly lower than `value`
:param value: cells strictly below this value will be masked
"""
if self.array is None:
return
# Copy to prevent unlinking the mask (see `mask_reset`)
np.copyto(self.array.mask, self.array.data < value)
self.count()
[docs]
def mask_lowerequal(self, value):
"""Mask cell where values are lower or equal than `value`
:param value: cells below or equal to this value will be masked
"""
if self.array is None:
return
# Copy to prevent unlinking the mask (see `mask_reset`)
np.copyto(self.array.mask, self.array.data <= value)
self.count()
[docs]
def mask_greater(self, value):
"""Mask cell where values are strictly greater than `value`
:param value: cells strictly above this value will be masked
"""
if self.array is None:
return
# Copy to prevent unlinking the mask (see `mask_reset`)
np.copyto(self.array.mask, self.array.data > value)
self.count()
[docs]
def mask_greaterequal(self, value):
"""Mask cell where values are greater or equal than `value`
:param value: cells above or equal to this value will be masked
"""
if self.array is None:
return
# Copy to prevent unlinking the mask (see `mask_reset`)
np.copyto(self.array.mask, self.array.data >= value)
self.count()
[docs]
def mask_allexceptdata(self, value):
"""Mask cell where values are different from `value`
:param value: only cells with this exact value remain unmasked
"""
if self.array is None:
return
# Copy to prevent unlinking the mask (see `mask_reset`)
np.copyto(self.array.mask, self.array.data != value)
self.count()
[docs]
def mask_invert(self):
""" Invert the mask """
if self.array is None:
return
# Copy to prevent unlinking the mask (see `mask_reset`)
np.copyto(self.array.mask, np.logical_not(self.array.mask))
self.count()
[docs]
def mask_outsidepoly(self, myvect: vector, eps:float = 0.,
set_nullvalue:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict'):
"""
Mask all cells located **outside** a polygon.
:param myvect: target vector in global coordinates
:param eps: tolerance for boundary detection
:param set_nullvalue: if ``True``, set masked cells to :attr:`nullvalue`
:param method: spatial method: ``'mpl'``, ``'shapely_strict'``, ``'shapely_wboundary'``, ``'rasterio'``
"""
# The polygon here is in world coordinates
# (coord will be converted back with translation, origin and dx/dy)
# (mesh coord, 0-based)
# trouve les indices dans le polygone
myij = self.get_ij_inside_polygon(myvect, usemask=True, eps=eps, method=method)
self.array.mask.fill(True) # Mask everything
# démasquage des mailles contenues
self.array.mask[myij[:,0],myij[:,1]] = False
if set_nullvalue:
# annulation des valeurs en dehors du polygone
self.set_nullvalue_in_mask()
self.count()
[docs]
def mask_insidepoly(self, myvect: vector, eps:float = 0.,
set_nullvalue:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='mpl'):
"""
Mask all cells located **inside** a polygon.
:param myvect: target vector in global coordinates
:param eps: tolerance for boundary detection
:param set_nullvalue: if ``True``, set masked cells to :attr:`nullvalue`
:param method: spatial method: ``'mpl'``, ``'shapely_strict'``, ``'shapely_wboundary'``, ``'rasterio'``
"""
# The polygon here is in world coordinates
# (coord will be converted back with translation, origin and dx/dy)
# (mesh coord, 0-based)
# trouve les indices dans le polygone
myij = self.get_ij_inside_polygon(myvect, usemask=False, eps=eps, method=method)
if set_nullvalue:
# annulation des valeurs en dehors du polygone
self.array.data[myij[:,0],myij[:,1]] = self.nullvalue
# masquage des mailles contenues
self.array.mask[myij[:,0],myij[:,1]] = True
self.count()
[docs]
def mask_insidepolys(self, myvects: list[vector], eps:float = 0.,
set_nullvalue:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='mpl'):
"""
Mask all cells located **inside** any of the given polygons.
:param myvect: target vector in global coordinates
:param myvects: list of polygon vectors to mask inside
:param eps: tolerance for boundary detection
:param set_nullvalue: if ``True``, set masked cells to :attr:`nullvalue`
:param method: spatial method: ``'mpl'``, ``'shapely_strict'``, ``'shapely_wboundary'``, ``'rasterio'``
"""
# The polygon here is in world coordinates
# (coord will be converted back with translation, origin and dx/dy)
# (mesh coord, 0-based)
# trouve les indices dans le polygone
for myvect in myvects:
myij = self.get_ij_inside_polygon(myvect, usemask=False, eps=eps, method=method)
if myij.shape[0] == 0:
logging.warning(_("No indices found in the polygon {}").format(myvect.myname))
else:
# masquage des mailles contenues
self.array.mask[myij[:,0],myij[:,1]] = True
if set_nullvalue:
# annulation des valeurs en dehors du polygone
self.array.data[myij[:,0],myij[:,1]] = self.nullvalue
self.count()
[docs]
def copy_mask(self, source:"WolfArrayModel", forcenullvalue:bool= False, link:bool=True):
"""
Copy/Link the mask from another WolfArrayModel
:param source: WolfArrayModel source
:param forcenullvalue: force nullvalue in the masked zone
:param link: link the mask if True (default), copy it otherwise
"""
assert self.shape == source.shape, _('Bad shape')
if forcenullvalue:
self.array[np.where(source.array.mask)] = self.nullvalue
if link:
self.array.mask = source.array.mask
else:
self.array.mask = source.array.mask.copy()
self.nbnotnull = source.nbnotnull
self.reset_plot()
[docs]
def mask_union(self, source:"WolfArrayModel", link:bool=True):
"""
Union of the mask with another WolfArrayModel
:param source: WolfArrayModel source
:param link: link the mask if True (default), copy it otherwise
"""
union = self.array.mask & source.array.mask
self.array[(~union) & (self.array.mask)] = self.nullvalue
source.array[(~union) & (source.array.mask)] = self.nullvalue
if link:
self.array.mask = union
source.array.mask = union
else:
self.array.mask = union.copy()
source.array.mask = union.copy()
self.reset_plot()
source.reset_plot()
[docs]
def mask_unions(self, sources:list["WolfArrayModel"], link:bool=True):
"""
Compute the union of masks from this array and a list of source arrays.
:param source: list of WolfArrayModel sourceq
:param link: link the mask if True (default), copy it otherwise
:param sources: list of :class:`WolfArrayModel` whose masks are unified
"""
for cursrc in sources:
union = self.array.mask & cursrc.array.mask
self.array[(~union) & (self.array.mask)] = self.nullvalue
for cursrc in sources:
cursrc.array[(~union) & (cursrc.array.mask)] = self.nullvalue
if link:
self.array.mask = union
for cursrc in sources:
cursrc.array.mask = union
else:
self.array.mask = union.copy()
for cursrc in sources:
cursrc.array.mask = union.copy()
self.reset_plot()
for cursrc in sources:
cursrc.reset_plot()
[docs]
def copy_mask_log(self, mask:np.ndarray, link:bool=True):
"""
Copy the mask from a numpy array
:param mask: numpy array
:param link: link the mask if True (default), copy it otherwise
"""
assert self.shape == mask.shape, _('Bad shape')
if self.array is None:
logging.debug(_('No array !!'))
return
if link:
self.array.mask = mask
else:
self.array.mask = mask.copy()
self.reset_plot()
[docs]
def set_nullvalue_in_mask(self):
""" Set nullvalue in masked cells """
if self.array is None:
return
self.array.data[self.array.mask] = self.nullvalue
[docs]
def mask_force_null(self):
"""
Force to unmask all and mask null value
"""
self.mask_reset()
self.mask_data(self.nullvalue)
self.reset_plot()
[docs]
def unmask(self):
""" alias to mask_reset """
self.mask_reset()
[docs]
def mask_clear(self):
""" alias to mask_reset """
self.mask_reset()
[docs]
def mask_reset(self):
"""
Unmask everything
"""
if self.nbdims == 2:
# FIXME if mask linking should work
# as expected, then we do: self.array.mask.fill(0.0)
# to avoid replacing the linked mask by a (non linked) one.
if isinstance(self.array.mask, np.bool_):
# mask is not an array, but a single boolean value
self.array.mask = np.zeros(self.array.shape)
else:
self.array.mask.fill(False) # False == not masked
self.nbnotnull = self.nbx * self.nby
elif self.nbdims == 3:
if isinstance(self.array.mask, np.bool_):
self.array.mask = np.zeros((self.nbx, self.nby, self.nbz))
else:
self.array.mask.fill(False) # False == not masked
self.nbnotnull = self.nbx * self.nby * self.nbz
[docs]
def create_binary_mask(self, threshold:float = 0.) -> "WolfArrayModel":
"""Create a binary mask from the array
:param threshold: values above this threshold become 1 (unmasked), others become 0 (masked)
:return: new binary array (1 where value > threshold, 0 otherwise)
:rtype: WolfArrayModel
"""
newarray = type(self)(mold=self)
newarray.array.data[:,:] = self.array.data > threshold
newarray.array.mask[:,:] = ~newarray.array.data.astype(bool)
newarray.set_nullvalue_in_mask()
return newarray
[docs]
def count(self):
""" Count the number of not masked values """
if self.array is None:
self.nbnotnull = 0
else:
self.nbnotnull = self.array.count()
return self.nbnotnull
# ========================================================================
# Spatial Queries (Polygon & Polyline)
# ========================================================================
# *************************************************************************************************************************
# POSITION and VALUES associated to a vector/polygon/polyline
# These functions can not be stored in header_wolf, because wa can use the mask of the array to limit the search
# These functions are also present in WolfResults_2D, but they are not exactly the same due to the structure of the results
# *************************************************************************************************************************
[docs]
def get_xy_inside_polygon(self,
myvect: vector | Polygon,
usemask:bool=True,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict',
epsilon:float = 0.):
"""
Return the coordinates inside a polygon.
Main principle:
- Get all the coordinates in the footprint of the polygon (taking into account the epsilon if provided)
- Test each coordinate if it is inside the polygon or not (and along boundary if method allows it)
- Apply the mask if requested
Returned values are stored in an array of shape (N,2) with N the number of points found inside the polygon.
:param myvect: target vector - vector or Shapely Polygon
:param usemask: limit potential nodes to unmaksed nodes
:param method: method to use ('mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio') - default is 'shapely_strict'
:param epsilon: tolerance for the point-in-polygon test - default is 0.
"""
if method == 'mpl':
return self.get_xy_inside_polygon_mpl(myvect, usemask= usemask, epsilon = epsilon)
elif method == 'shapely_strict':
return self.get_xy_inside_polygon_shapely(myvect, usemask, strictly=True, epsilon=epsilon)
elif method == 'shapely_wboundary':
return self.get_xy_inside_polygon_shapely(myvect, usemask, strictly=False, epsilon=epsilon)
elif method == 'rasterio':
return self.get_xy_inside_polygon_rasterio(myvect, usemask, epsilon=epsilon)
[docs]
def get_xy_strictly_on_borders(self, myvect: vector | Polygon | LineString, usemask:bool=True,
method:Literal['shapely']='shapely'):
"""
Return the coordinates strictly on the borders of a polygon.
ATTENTION : Tests are performed numerically, so the results may not be exact
in the sense of pure geometry.
Do not confuse with "get_xy_under_polyline" which
"rasterize" the polyline to find useful coordinates.
:param myvect: target vector
:param usemask: limit potential nodes to unmaksed nodes
:param method: method to use ('shapely' for now)
"""
# As we will use Shapely(contains) to test if points are on the border,
# we need to use a Linestring or a Polygon.boundary.
if isinstance(myvect, vector):
myvect.find_minmax()
boundary = myvect.linestring
elif isinstance(myvect, Polygon):
boundary = myvect.boundary
elif isinstance(myvect, LineString):
boundary = myvect
if not is_prepared(boundary):
prepare(boundary) # Prepare the polygon for **faster** contains check -- VERY IMPORTANT
to_destroy = True
else:
to_destroy = False
mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect)
points= np.array([Point(x,y) for x,y in mypointsxy])
on_border = list(map(lambda point: boundary.contains(point), points))
if to_destroy:
destroy_prepared(boundary) # Destroy the prepared polygon
to_keep = np.where(on_border)
mypointsxy = mypointsxy[to_keep]
if mypointsxy.shape[0] == 0:
return mypointsxy
if usemask:
mypointsij = mypointsij[to_keep]
mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]])
mypointsxy = mypointsxy[np.where(mymask)]
return mypointsxy
[docs]
def get_xy_strictly_on_vertices(self,
myvect: vector | Polygon | LineString,
usemask:bool=True,
method:Literal['shapely']='shapely',
eps:float = 1.e-4):
"""
Return the coordinates strictly on the vertices of a polygon, Linestring or vector.
ATTENTION : Tests are performed numerically, so the results may not be exact
in the sense of pure geometry.
:param myvect: target vector
:param usemask: limit potential nodes to unmaksed nodes
:param method: method to use ('shapely' for now)
:param eps: distance tolerance for matching a vertex to a cell center
"""
# As we will use Shapely(contains) to test if points are on the border,
# we need to use a Linestring or a Polygon.boundary.
if isinstance(myvect, vector):
myvect.find_minmax()
boundary = myvect.linestring
elif isinstance(myvect, Polygon):
boundary = myvect.boundary
elif isinstance(myvect, LineString):
boundary = myvect
if not is_prepared(boundary):
prepare(boundary) # Prepare the polygon for **faster** contains check -- VERY IMPORTANT
to_destroy = True
else:
to_destroy = False
mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect)
# Calculate distances to the vertices
# We use the vertices of the polygon or linestring to check if points are on the
# vertices.
points = np.array([Point(x,y) for x,y in mypointsxy])
vertices = [Point(pt[0], pt[1], pt[2]) for pt in boundary.coords]
on_vertices = list(map(lambda point: any(vert.distance(point) < eps for vert in vertices), points))
if to_destroy:
destroy_prepared(boundary) # Destroy the prepared polygon
to_keep = np.where(on_vertices)
mypointsxy = mypointsxy[to_keep]
if mypointsxy.shape[0] == 0:
return mypointsxy
if usemask:
mypointsij = mypointsij[to_keep]
mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]])
mypointsxy = mypointsxy[np.where(mymask)]
return mypointsxy
[docs]
def get_xy_inside_polygon_mpl(self,
myvect: vector | Polygon,
usemask:bool=True,
epsilon:float = 0.):
"""
Return the coordinates inside a polygon
:param myvect: target vector
:param usemask: limit potential nodes to unmaksed nodes
:param epsilon: tolerance for path containment (matplotlib)
"""
if isinstance(myvect, vector):
# force la mise à jour des min/max
myvect.find_minmax()
# Conversion des coordonnées en numpy pour plus d'efficacité (du moins on espère)
myvert = myvect.asnparray()
elif isinstance(myvect, Polygon):
myvert = myvect.exterior.coords[:-1]
mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect, eps = epsilon)
path = mpltPath.Path(myvert)
inside = path.contains_points(mypointsxy)
mypointsxy = mypointsxy[np.where(inside)]
if usemask:
mypointsij = mypointsij[np.where(inside)]
mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]])
mypointsxy = mypointsxy[np.where(mymask)]
return mypointsxy
[docs]
def get_xy_inside_polygon_shapely(self,
myvect: vector | Polygon,
usemask:bool=True,
strictly:bool=True,
epsilon:float=0.):
"""
Return the coordinates inside a polygon
:param myvect: target vector
:param usemask: limit potential nodes to unmaksed nodes
:param strictly: if ``True``, use strict containment; otherwise include boundary
:param epsilon: buffer tolerance for boundary inclusion
"""
if isinstance(myvect, vector):
myvect.find_minmax()
polygon = myvect.polygon
elif isinstance(myvect, Polygon):
polygon = myvect
if not is_prepared(polygon):
prepare(polygon) # Prepare the polygon for **faster** contains check -- VERY IMPORTANT
to_destroy = True
else:
to_destroy = False
mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect, eps = epsilon)
if strictly:
inside = contains_xy(polygon, mypointsxy[:,0], mypointsxy[:,1])
else:
points= np.array([Point(x,y) for x,y in mypointsxy])
boundary = polygon.boundary
inside = contains(polygon, points)
on_border = list(map(lambda point: boundary.contains(point), points))
inside = np.logical_or(inside, on_border)
if to_destroy:
destroy_prepared(polygon) # Destroy the prepared polygon
to_keep = np.where(inside)
mypointsxy = mypointsxy[to_keep]
if mypointsxy.shape[0] == 0:
return mypointsxy
if usemask:
mypointsij = mypointsij[to_keep]
mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]])
mypointsxy = mypointsxy[np.where(mymask)]
return mypointsxy
[docs]
def get_xy_inside_polygon_rasterio(self, myvect: vector | Polygon, usemask:bool=True, epsilon:float = 0.):
"""
Return the coordinates inside a polygon
:param myvect: polygon vector in global coordinates
:param usemask: if ``True``, consider only unmasked cells
:param epsilon: tolerance (not used in rasterio method)
"""
# force la mise à jour des min/max
myvect.find_minmax()
mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect, eps = epsilon)
from rasterio.features import geometry_mask
poly = myvect.polygon
mask = geometry_mask([poly], out_shape=(self.nbx, self.nby),
transform=self._transform_gmrio(),
invert=True,
all_touched=False)
# filter mypointsxy where mask is True
# mypointsij is a vector of (i,j) indices
mypointsxy = mypointsxy[mask[mypointsij[:, 0], mypointsij[:, 1]]]
if usemask:
mypointsij = mypointsij[mask[mypointsij[:, 0], mypointsij[:, 1]]]
mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]])
mypointsxy = mypointsxy[np.where(mymask)]
return mypointsxy
[docs]
def get_xy_under_polyline(self, myvect: vector, usemask:bool=True):
"""
Return the coordinates along a polyline
:param myvect: target vector
:param usemask: limit potential nodes to unmaksed nodes
"""
allij = self.get_ij_under_polyline(myvect, usemask)
mypoints = [self.get_xy_from_ij(cur[0], cur[1]) for cur in allij]
return mypoints
[docs]
def get_ij_inside_polygon(self,
myvect: vector | Polygon,
usemask:bool=True,
eps:float = 0.,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict'):
"""
Return the indices inside a polygon.
Main principle:
- First, get all indices in the footprint of the polygon (bounding box + epsilon)
- Then, test each point if it is inside the polygon with the selected method
- Filter with the mask if needed
Returned indices are stored in an array of shape (N,2) with N the number of points found inside the polygon.
:param myvect: target vector
:param usemask: limit potential nodes to unmaksed nodes
:param eps: epsilon for the intersection
:param method: method to use ('mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio') - default is 'shapely_strict'
"""
if method == 'mpl':
return self.get_ij_inside_polygon_mpl(myvect, usemask, eps)
elif method == 'shapely_strict':
return self.get_ij_inside_polygon_shapely(myvect, usemask, eps, strictly=True)
elif method == 'shapely_wboundary':
return self.get_ij_inside_polygon_shapely(myvect, usemask, eps, strictly=False)
elif method == 'rasterio':
return self._get_ij_inside_polygon_rasterio(myvect, usemask, eps)
[docs]
def get_ij_inside_listofpolygons(self,
myvects: list[vector | Polygon],
usemask:bool=True,
eps:float = 0.,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict'):
"""
Return the indices inside a list of polygons
:param myvects: target vector
:param usemask: limit potential nodes to unmaksed nodes
:param eps: epsilon for the intersection
:param method: method to use ('mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio') - default is 'shapely_strict'
"""
if isinstance(myvects, zone):
myvects = myvects.myvectors
alls = list(map(lambda vec: self.get_ij_inside_polygon(vec, usemask, method=method), myvects))
#remove empty arrays
alls = [cur for cur in alls if cur.shape[0] > 0]
if len(alls) == 0:
return np.empty((0, 2), dtype=int)
# Concatenate all arrays
mypointsij = np.concatenate(alls, axis=0)
# Remove duplicates
mypointsij = np.unique(mypointsij, axis=0)
if mypointsij.shape[0] == 0:
return np.empty((0, 2), dtype=int)
return mypointsij
[docs]
def get_ij_inside_polygon_mpl(self, myvect: vector | Polygon, usemask:bool=True, eps:float = 0.):
"""
Return the indices inside a polygon
:param myvect: target vector
:param usemask: limit potential nodes to unmaksed nodes
:param eps: epsilon for the intersection
"""
# force la mise à jour des min/max
if isinstance(myvect, vector):
myvect.find_minmax()
mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect, eps=eps)
# Conversion des coordonnées en numpy pour plus d'efficacité (du moins on espère)
myvert = myvect.asnparray()
path = mpltPath.Path(myvert)
inside = path.contains_points(mypointsxy)
mypointsij = mypointsij[np.where(inside)]
if usemask:
mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]])
mypointsij = mypointsij[np.where(mymask)]
return mypointsij
[docs]
def get_ij_inside_polygon_shapely(self, myvect: vector | Polygon,
usemask:bool=True,
eps:float = 0.,
strictly:bool=True):
"""
Return the indices inside a polygon with the contains method of shapely
:param myvect: target vector
:param usemask: limit potential nodes to unmaksed nodes
:param eps: epsilon for the intersection
:param strictly: if True, the points on the border are not considered inside
"""
# force la mise à jour des min/max
if isinstance(myvect, vector):
myvect.find_minmax()
poly = myvect.polygon
elif isinstance(myvect, Polygon):
poly = myvect
mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect, eps=eps)
if not is_prepared(poly):
prepare(poly) # Prepare the polygon for **faster** contains check -- VERY IMPORTANT
to_destroy = True
else:
to_destroy = False
if strictly:
inside = contains_xy(poly, mypointsxy[:,0], mypointsxy[:,1])
else:
points= np.array([Point(x,y) for x,y in mypointsxy])
boundary = poly.boundary
inside = contains(poly, points)
on_border = list(map(lambda point: boundary.contains(point), points))
inside = np.logical_or(inside, on_border)
if to_destroy:
destroy_prepared(poly) # Destroy the prepared polygon
mypointsij = mypointsij[np.where(inside)]
if usemask:
mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]])
mypointsij = mypointsij[np.where(mymask)]
return mypointsij
[docs]
def _get_ij_inside_polygon_rasterio(self, myvect: vector | Polygon, usemask:bool=True, eps:float = 0.):
"""
Return the indices inside a polygon with the geometry_mask method of rasterio.
:remark: get_ij_inside_polygon_shapely is more efficient
FIXME : geometry_mask seems strange -- it does not work as documented -- RatsrIO 1.3.x
:param myvect: target vector
:param usemask: limit potential nodes to unmaksed nodes
:param eps: epsilon for the intersection
"""
# force la mise à jour des min/max
if isinstance(myvect, vector):
myvect.find_minmax()
poly = myvect.polygon
elif isinstance(myvect, Polygon):
poly = myvect
mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect, eps=eps)
from rasterio.features import geometry_mask
mask = geometry_mask([poly], out_shape=(self.nbx, self.nby),
transform=self._transform_gmrio(),
invert=True,
all_touched=False)
# filter mypointsij where mask is True
# mypointsij is a vector of (i,j) indices
mypointsij = mypointsij[mask[mypointsij[:, 0], mypointsij[:, 1]]]
if usemask:
mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]])
mypointsij = mypointsij[np.where(mymask)]
return mypointsij
[docs]
def intersects_polygon(self, myvect: vector | Polygon,
usemask:bool=True,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict',
buffer:float= 0.) -> bool:
""" Return True if the array intersects the polygon
:param myvect: target vector
:param usemask: limit potential nodes to unmaksed nodes
:param method: spatial method: ``'mpl'``, ``'shapely_strict'``, ``'shapely_wboundary'``, or ``'rasterio'``
:param buffer: buffer distance around the polygon for intersection test
"""
if buffer > 0.:
bufvect = myvect.buffer(buffer, inplace= False)
return self.get_xy_inside_polygon(bufvect, usemask, method).shape[0] > 0
else:
return self.get_xy_inside_polygon(myvect, usemask, method).shape[0] > 0
[docs]
def intersects_polygon_shapely(self, myvect: vector | Polygon,
eps:float = 0.,
usemask:bool=True,
buffer:float= 0.) -> bool:
""" Return True if the array intersects the polygon
:param myvect: target vector
:param usemask: limit potential nodes to unmaksed nodes
:param eps: epsilon for the intersection
:param buffer: buffer for the intersection - using buffer from Shapely [m]
"""
warnings.warn("intersects_polygon_shapely is deprecated, use intersects_polygon with method='shapely_strict' instead", DeprecationWarning)
if buffer > 0.:
poly = myvect.polygon.buffer(buffer)
return self.get_xy_inside_polygon_shapely(poly, usemask, epsilon=eps).shape[0] > 0
else:
return self.get_xy_inside_polygon_shapely(myvect, usemask, epsilon=eps).shape[0] > 0
[docs]
def interects_listofpolygons(self, myvects:zone | list[vector],
usemask:bool=True,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict',
buffer:float= 0.) -> list[bool]:
"""
Element-wise intersection with a list of polygons
:param myvects: list of polygon vectors to test
:param usemask: if ``True``, consider only unmasked cells
:param method: spatial query method for intersection test
:param buffer: buffer distance around each polygon
"""
if isinstance(myvects, zone):
myvects = myvects.myvectors
elif isinstance(myvects, list):
pass
else:
logging.error("Bad type")
return {}
return list(map(lambda x: self.intersects_polygon(x, usemask, method, buffer), myvects))
[docs]
def intersects_zones(self, zones:Zones | list[zone],
usemask:bool=True,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict',
buffer:float = 0.) -> list[list[bool]]:
"""
Element-wise intersection with a list of zones
:param zones: :class:`Zones` instance containing polygons to test
:param usemask: if ``True``, consider only unmasked cells
:param method: spatial query method for intersection test
:param buffer: buffer distance around each polygon
"""
if isinstance(zones, Zones):
zones = zones.myzones
elif isinstance(zones, list):
pass
else:
logging.error("Bad type")
return {}
return list(map(lambda x: self.interects_listofpolygons(x, usemask, method, buffer), zones))
[docs]
def get_total_area_if_intersects(self, myvects:Zones | list[vector],
usemask:bool=True,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict',
buffer:float= 0.,
coefficient:float = 1.) -> float | list[float]:
"""
Return the area of the intersection with a list of polygons
:param myvects: list of polygon vectors
:param usemask: if ``True``, consider only unmasked cells
:param method: spatial query method for intersection test
:param buffer: buffer distance around each polygon
:param coefficient: area multiplier (e.g. for unit conversion)
"""
if isinstance(myvects, Zones):
ret = []
for curzone in myvects.myzones:
myvects = curzone.myvectors
intersects = self.interects_listofpolygons(myvects, usemask, method, buffer)
ret.append(sum([x.area for x in myvects if intersects[myvects.index(x)]]) * coefficient)
return ret
elif isinstance(myvects, list):
intersects = self.interects_listofpolygons(myvects, usemask, method, buffer)
return sum([x.area for x in myvects if intersects[myvects.index(x)]]) * coefficient
[docs]
def get_ij_under_polyline(self, myvect: vector, usemask:bool=True, step_factor:float=1.):
"""
Return the indices along a polyline
:param myvect: target vector
:param usedmask: limit potential nodes to unmaksed nodes
:param step_factor: step factor for the discretization of myvect (<1 for thinner, >1 for coarser)
:param usemask: if ``True``, exclude masked cells from result
:return: array of shape (N,2) with N the number of points found under the polyline
"""
if step_factor>1.0:
logging.warning("Step_factor greater than 1.0 may lead to lots of missing (i,j) even if faster")
else:
def mod_fix(x, y, tol=1e-10):
result = np.mod(x, y)
return 0 if np.isclose(result, 0, atol=tol) else (y if np.isclose(result, y, atol=tol) else result)
assert mod_fix(1.0,step_factor)==0, "1.0 should be a multiple of step_factor"
# assert step_factor<=1., "Step factor must be <= 1"
ds = step_factor*min(self.dx, self.dy)
pts = myvect._refine2D(ds)
allij = np.asarray([self.get_ij_from_xy(curpt.x, curpt.y) for curpt in pts])
allij = np.unique(allij, axis=0)
# filter negative indexes
allij = allij[np.where((allij[:, 0] >= 0) & (allij[:, 1] >= 0) & (allij[:, 0] < self.nbx) & (allij[:, 1] < self.nby))]
if usemask:
mymask = np.logical_not(self.array.mask[allij[:, 0], allij[:, 1]])
allij = allij[np.where(mymask)]
return allij
[docs]
def get_values_insidepoly(self, myvect: vector, usemask:bool=True, getxy:bool=False,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict') -> tuple[np.ndarray, np.ndarray | None]:
"""
Return the cell values located inside a polygon.
:param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice
:param getxy: (optional) retourne en plus les coordonnées des points
:param myvect: polygon vector in global coordinates
:param method: spatial method: ``'mpl'``, ``'shapely_strict'``, etc.
"""
mypoints = self.get_xy_inside_polygon(myvect, usemask, method= method)
myvalues = np.asarray([self.get_value(cur[0], cur[1]) for cur in mypoints])
if getxy:
return myvalues, mypoints
else:
return myvalues, None
[docs]
def get_values_inside_listofpolygons(self, myvects:zone | list[vector], usemask:bool=True, getxy:bool=False) -> dict:
"""
Return the cell values located inside a list of polygons.
:param myvects: list of polygon vectors
:param usemask: if ``True``, consider only unmasked cells
:param getxy: if ``True``, also return xy coordinates
"""
ret = {}
if isinstance(myvects, zone):
out = ret[myvects.myname] = {}
myvects = myvects.myvectors
elif isinstance(myvects, list):
out = ret
else:
logging.error("Bad type")
return {}
for vec in myvects:
out[vec.myname] = self.get_values_insidepoly(vec, usemask, getxy)
[docs]
def get_values_inside_zones(self, zones:Zones | list[zone], usemask:bool=True, getxy:bool=False) -> dict:
"""
Return the cell values located inside all polygons of a :class:`Zones` object.
:param zones: :class:`Zones` instance
:param usemask: if ``True``, consider only unmasked cells
:param getxy: if ``True``, also return xy coordinates
"""
ret = {}
if isinstance(zones, Zones):
out = ret[zones.idx] = {}
myzones = zones.myzones
elif isinstance(zones, list):
myzones = zones
out = ret
else:
logging.error("Bad type")
return {}
for zone in myzones:
out[zone.myname] = self.get_values_inside_listofpolygons(zone, usemask, getxy)
[docs]
def get_values_underpoly(self, myvect: vector, usemask:bool=True, getxy:bool=False):
"""
Return the cell values located under a polyline.
:param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice
:param getxy: (optional) retourne en plus les coordonnées des points
:param myvect: polyline vector in global coordinates
"""
mypoints = self.get_xy_under_polyline(myvect, usemask)
myvalues = np.asarray([self.get_value(cur[0], cur[1]) for cur in mypoints])
if getxy:
return myvalues, mypoints
else:
return myvalues, None
[docs]
def get_all_values_insidepoly(self, myvect: vector, usemask:bool=True, getxy:bool=False):
"""
Return all cell values (including masked) inside a polygon.
:param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice
:param getxy: (optional) retourne en plus les coordonnées des points
:param myvect: polygon vector in global coordinates
ICI on retourne le résultat de get_values_insidepoly, car une seule matrice, mais une autre classe pourrait vouloir faure autre chose
C'est le cas notamment de Wolfresults_2D
"""
return self.get_values_insidepoly(myvect, usemask, getxy)
[docs]
def get_all_values_underpoly(self, myvect: vector, usemask:bool=True, getxy:bool=False):
"""
Return all cell values (including masked) under a polyline.
:param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice
:param getxy: (optional) retourne en plus les coordonnées des points
:param myvect: polyline vector in global coordinates
ICI on retourne le résultat de get_values_underpoly, car une seule matrice, mais une autre classe pourrait vouloir faure autre chose
C'est le cas notamment de Wolfresults_2D
"""
return self.get_values_underpoly(myvect, usemask, getxy)
[docs]
def count_unique_pixels(self) -> dict[int, int]:
"""
Count the number of pixels for each unique value in the array
:return: dictionary with the code as key and the number of pixels as value
"""
if self.wolftype not in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]:
logging.error("Bad wolftype")
return {}
counts = np.bincount(self.array[~self.array.mask].ravel())
ret = {i: int(val) for i, val in enumerate(counts) if val > 0}
return ret
[docs]
def get_unique_areas(self, format:Literal['m2', 'ha', 'km2'] = 'm2') -> dict[int, float]:
"""
Get the areas of each code in the array
:param array: numpy array or WolfArrayModel
:param format: output format: ``'dict'`` returns a dict, ``'list'`` returns a list
:return: dictionary with the code as key and the area in m² as value
"""
ret = self.count_unique_pixels()
fact = 1.0
if format in ['ha', 'Ha']:
fact = 0.0001
elif format in ['km2', 'km²']:
fact = 1e-6
elif format not in ['m2', 'm²']:
logging.error("Bad format")
return {}
# Convert counts to areas in m²
return {code: float(count) * self.dx * self.dy * fact for code, count in ret.items()}
[docs]
def count_insidepoly(self, myvect: vector,
usemask:bool=True,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict',
coefficient:float = 1.):
"""
Compte le nombre de valeurs contenues dans un polygone
:param myvect: target vector
:param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice
:param method: (optional) method to use
:param coefficient: (optional) coefficient to apply to the count (default 1.)
"""
ij = self.get_ij_inside_polygon(myvect, usemask, method=method)
return ij.shape[0] * coefficient
[docs]
def count_inside_listofpolygons(self, myvects:zone | list[vector],
usemask:bool=True,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict',
coefficient:float = 1.):
"""
Compte le nombre de valeurs contenues dans une instance Zones ou une liste de vecteurs
:param myvects: target vectors or zone
:param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice
:param method: (optional) method to use
:param coefficient: (optional) coefficient to apply to the count (default 1.)
"""
if isinstance(myvects, zone):
myvects = myvects.myvectors
else:
logging.error("Bad type")
return {}
return list(map(lambda vec: self.count_insidepoly(vec, usemask, method=method, coefficient=coefficient), myvects))
[docs]
def count_inside_zones(self, zones:Zones | list[zone],
usemask:bool=True,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict',
coefficient:float = 1.):
"""
Compte le nombre de valeurs contenues dans une instance Zones ou une liste de "zone"
:param zones: target zones or list of zones
:param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice
:param method: (optional) method to use
:param coefficient: (optional) coefficient to apply to the count (default 1.)
"""
if isinstance(zones, Zones):
zones = zones.myzones
else:
logging.error("Bad type")
return list(map(lambda loczone: self.count_inside_listofpolygons(loczone, usemask, method=method, coefficient=coefficient), zones))
# ========================================================================
# Interpolation & Rasterization
# ========================================================================
[docs]
def interpolate_on_polygon(self, working_vector: vector,
method:Literal["nearest", "linear", "cubic"]="linear",
keep:Literal['all', 'below', 'above'] = 'all',
rescale:bool = False,
method_selection:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict",
usemask:bool = True,
epsilon:float = 0.0):
"""
Interpolation sous un polygone.
L'interpolation a lieu :
- uniquement dans les mailles sélectionnées si elles existent
- dans les mailles contenues dans le polygone sinon
On utilise ensuite "griddata" de Scipy pour interpoler les altitudes des mailles
depuis les vertices 3D du polygone.
:ATTENTION: l'interpolation est réalisée en réels flottants 64 bits. Le résultat sera ensuite transformé dans le type de la matrice.
:param working_vector: vector - polygon with z values
:param method: interpolation method - 'nearest', 'linear' or 'cubic'
:param keep: 'all' to keep all interpolated values
'below' to keep only values below the current value in the array
'above' to keep only values above the current value in the array
:param rescale: rescale the input data to [0, 1] for better numerical stability (only for 'linear' and 'cubic' methods)
:param method_selection: cell selection method: ``'mpl'``, ``'shapely_strict'``, ``'shapely_wboundary'``, or ``'rasterio'``
:param usemask: if ``True``, interpolate only within unmasked cells
:param epsilon: tolerance for boundary detection
:return: None
"""
if self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_INTEGER16]:
logging.info(_('Your array contains integer data. The interpolation will be done in float and then converted to integer using "np.ceil" operator.'))
if vector.area == 0.:
logging.error(_('The polygon has no area'))
return
if keep not in ['all', 'below', 'above']:
logging.error(_('keep must be "all", "below" or "above"'))
raise ValueError
if method not in ['nearest', 'linear', 'cubic']:
logging.error(_('method must be "nearest", "linear" or "cubic"'))
raise ValueError
if self.mngselection is None:
destxy = self.get_xy_inside_polygon(working_vector, method = method_selection, usemask = usemask, epsilon = epsilon)
if len(destxy)==0:
logging.debug(_('No points to interpolate'))
return
destij = self.xy2ij_np(destxy)
elif self.mngselection.myselection == 'all':
destij = np.where(self.array.mask == False)
destij = np.array(destij).transpose()
if len(destij)==0:
logging.debug(_('No points to interpolate'))
return
destxy = self.ij2xy_np(destij)
else:
if self.SelectionData.nb == 0:
destxy = self.get_xy_inside_polygon(working_vector, method= method_selection, usemask= usemask, epsilon= epsilon)
if destxy.shape[0] == 0:
logging.debug(_('No points to interpolate'))
return
else:
destxy = self.SelectionData.myselection
if len(destxy)==0:
logging.debug(_('No points to interpolate'))
return
destij = self.xy2ij_np(destxy)
xyz = working_vector.asnparray3d()
newvalues = griddata(xyz[:, :2], xyz[:, 2], destxy, method=method, fill_value=-99999., rescale = rescale)
if keep == 'all':
locmask = np.where(newvalues != -99999.)
elif keep == 'below':
locmask = np.where((newvalues != -99999.) & (newvalues < self.array.data[destij[:, 0], destij[:, 1]]))
elif keep == 'above':
locmask = np.where((newvalues != -99999.) & (newvalues > self.array.data[destij[:, 0], destij[:, 1]]))
if self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_INTEGER16]:
logging.debug(_('Converting interpolated values to integer using "np.ceil" operator'))
newvalues = np.ceil(newvalues)
newvalues = newvalues.astype(self.array.dtype)
self.array.data[destij[locmask][:, 0], destij[locmask][:, 1]] = newvalues[locmask]
[docs]
def interpolate_on_polygons(self, working_zone:zone,
method:Literal["nearest", "linear", "cubic"]="linear",
keep:Literal['all', 'below', 'above'] = 'all'):
"""
Interpolate values inside each polygon of a zone using vertex elevations.
:param working_zone: zone containing multiple polygons
:param method: interpolation method: ``'nearest'``, ``'linear'``, or ``'cubic'``
:param keep: value-keeping strategy: ``'all'``, ``'below'``, or ``'above'``
"""
for curvec in working_zone.myvectors:
self.interpolate_on_polygon(curvec, method, keep)
[docs]
def interpolate_strictly_on_polyline(self, working_vector:vector, usemask=True, keep:Literal['all', 'below', 'above'] = 'all'):
"""
Set cell values along a polyline using the z-coordinates of the polyline vertices.
On utilise ensuite "interpolate" de shapely pour interpoler les altitudes des mailles
depuis les vertices 3D de la polyligne
:param working_vector: polyline vector with 3D vertices
:param usemask: if ``True``, only update unmasked cells
:param keep: value-keeping strategy: ``'all'``, ``'below'``, or ``'above'``
"""
vecls = working_vector.linestring
xy = self.get_xy_strictly_on_borders(working_vector, usemask=usemask)
if xy.shape[0] == 0:
logging.debug(_('No points to interpolate'))
return
ij = np.asarray([self.get_ij_from_xy(x, y) for x, y in xy])
newz = np.asarray([vecls.interpolate(vecls.project(Point(x, y))).z for x, y in xy])
if keep == 'all':
self.array.data[ij[:, 0], ij[:, 1]] = newz
elif keep == 'below':
locmask = np.where((newz != -99999.) & (newz < self.array.data[ij[:, 0], ij[:, 1]]))
self.array.data[ij[locmask][:, 0], ij[locmask][:, 1]] = newz[locmask]
elif keep == 'above':
locmask = np.where((newz != -99999.) & (newz > self.array.data[ij[:, 0], ij[:, 1]]))
self.array.data[ij[locmask][:, 0], ij[locmask][:, 1]] = newz[locmask]
[docs]
def interpolate_strictly_on_vertices(self, working_vector:vector, usemask=True, keep:Literal['all', 'below', 'above'] = 'all'):
"""
Set cell values at the exact positions of the vector vertices.
On utilise ensuite "interpolate" de shapely pour interpoler les altitudes des mailles
depuis les vertices 3D de la polyligne
:param working_vector: vector with 3D vertices
:param usemask: if ``True``, only update unmasked cells
:param keep: value-keeping strategy: ``'all'``, ``'below'``, or ``'above'``
"""
vecls = working_vector.linestring
xy = self.get_xy_strictly_on_vertices(working_vector, usemask=usemask)
if xy.shape[0] == 0:
logging.debug(_('No points to interpolate'))
return
ij = np.asarray([self.get_ij_from_xy(x, y) for x, y in xy])
newz = np.asarray([vecls.interpolate(vecls.project(Point(x, y))).z for x, y in xy])
if keep == 'all':
self.array.data[ij[:, 0], ij[:, 1]] = newz
elif keep == 'below':
locmask = np.where((newz != -99999.) & (newz < self.array.data[ij[:, 0], ij[:, 1]]))
self.array.data[ij[locmask][:, 0], ij[locmask][:, 1]] = newz[locmask]
elif keep == 'above':
locmask = np.where((newz != -99999.) & (newz > self.array.data[ij[:, 0], ij[:, 1]]))
self.array.data[ij[locmask][:, 0], ij[locmask][:, 1]] = newz[locmask]
[docs]
def interpolate_on_polyline(self, working_vector:vector, usemask=True):
"""
Interpolate cell values along a polyline based on its 3D vertices.
L'interpolation a lieu :
- uniquement dans les mailles sélectionnées si elles existent
- dans les mailles sous la polyligne sinon
On utilise ensuite "interpolate" de shapely pour interpoler les altitudes des mailles
depuis les vertices 3D de la polyligne
:param working_vector: polyline vector with 3D vertices
:param usemask: if ``True``, only update unmasked cells
"""
vecls = working_vector.asshapely_ls()
if self.SelectionData is None:
allij = self.get_ij_under_polyline(working_vector, usemask)
allxy = [self.get_xy_from_ij(cur[0], cur[1]) for cur in allij]
else:
if self.SelectionData.nb == 0:
allij = self.get_ij_under_polyline(working_vector, usemask)
allxy = [self.get_xy_from_ij(cur[0], cur[1]) for cur in allij]
else:
allxy = self.SelectionData.myselection
allij = np.asarray([self.get_ij_from_xy(x,y) for x,y in allxy])
newz = np.asarray([vecls.interpolate(vecls.project(Point(x, y))).z for x, y in allxy])
self.array.data[allij[:, 0], allij[:, 1]] = newz
[docs]
def interpolate_on_polylines(self, working_zone:zone, usemask=True):
"""Interpolate cell values along all polylines in a zone.
:param working_zone: zone containing multiple polylines
:param usemask: if ``True``, only update unmasked cells
"""
for curvec in working_zone.myvectors:
self.interpolate_on_polyline(curvec, usemask)
[docs]
def interpolate_on_cloud(self, xy:np.ndarray, z:np.ndarray, method:Literal['linear', 'nearest', 'cubic']= 'linear'):
"""
Interpolation sur un nuage de points.
L'interpolation a lieu :
- uniquement dans les mailles sélectionnées si elles existent
- dans les mailles contenues dans le polygone convexe contenant les points sinon
Using griddata from Scipy.
:param xy: numpy.array of vertices - shape (n,2)
:param z: numpy.array of values - shape (n,)
:param method: method for the interpolation -- 'nearest', 'linear' or 'cubic'
See : https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.griddata.html
"""
if self.mngselection.myselection == [] or self.mngselection.myselection == ALL_SELECTED:
decalx = self.origx + self.translx
decaly = self.origy + self.transly
x = np.arange(self.dx / 2. + decalx, float(self.nbx) * self.dx + self.dx / 2 + decalx, self.dx)
y = np.arange(self.dy / 2. + decaly, float(self.nby) * self.dy + self.dy / 2 + decaly, self.dy)
grid_x, grid_y = np.meshgrid(x, y, sparse=True, indexing='xy')
newvalues = griddata(xy, z, (grid_x, grid_y), method=method, fill_value=-99999.).transpose()
self.array.data[np.where(newvalues != -99999.)] = newvalues[np.where(newvalues != -99999.)]
else:
ij = np.asarray([self.get_ij_from_xy(x, y) for x, y in self.mngselection.myselection])
newvalues = griddata(xy, z, self.mngselection.myselection, method=method, fill_value=-99999.)
ij = ij[np.where(newvalues != -99999.)]
newvalues = newvalues[np.where(newvalues != -99999.)]
self.array.data[ij[:, 0], ij[:, 1]] = newvalues
self.reset_plot()
[docs]
def interpolate_on_triangulation(self, coords, triangles,
grid_x=None, grid_y = None,
mask_tri=None,
interp_method:Literal['matplotlib','scipy'] = 'scipy',
keep:Literal['all', 'below', 'above'] = 'all',
points_along_edges:bool = True):
"""
Interpolation sur une triangulation.
L'interpolation a lieu :
- uniquement dans les mailles sélectionnées si elles existent
- dans les mailles contenues dans la triangulation sinon
Scipy(griddata) is used by default (more robust), but Matplotlib can be used as well (more strict). If Matplotlib crashes, try with Scipy.
**Matplotlib is more strict on the quality of the triangulation.**
:param coords: numpy.array of vertices - shape (n,3)
:param triangles: numpy.array of triangles - shape (m,3)
:param grid_x: numpy.array of x values where the interpolation will be done -- if None, the grid is created from the array
:param grid_y: numpy.array of y values where the interpolation will be done -- if None, the grid is created from the array
:param mask_tri: numpy.array of mask for the triangles
:param interp_method: method for the interpolation -- 'matplotlib' or 'scipy'
:param keep: 'all' to keep all values, 'below' to keep only values below the current array, 'above' to keep only values above the current array
:param points_along_edges: number of extra points to add along triangle edges for refinement
For matplotlib algo, see : https://matplotlib.org/stable/gallery/images_contours_and_fields/triinterp_demo.html
For scipy algo, see : https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.griddata.html
"""
if interp_method =='matplotlib':
# FIXME : This part could be optimized, I think
import matplotlib.tri as mtri
use_scipy=False
try:
if self.mngselection is not None:
if self.mngselection.myselection != [] and self.mngselection.myselection != ALL_SELECTED:
# interpolation only in the selected cells
# Convert coordinates to indices
ij = np.asarray([self.get_ij_from_xy(x, y) for x, y in self.mngselection.myselection])
try:
# Opérateur d'interpolation linéaire
triang = mtri.Triangulation(coords[:,0],coords[:,1],triangles)
if mask_tri is not None:
triang.set_mask(mask_tri)
interplin = mtri.LinearTriInterpolator(triang, coords[:,2]) # Interpolation et récupération dans le numpy.array de l'objet Wolf
except:
raise Warning(_('Bad triangulation - try with another method like Scipy'))
newvalues = np.ma.masked_array([interplin(x, y) for x, y in self.mngselection.myselection])
if newvalues.mask.shape!=():
# We have a mask
nonmasked_values = np.where(~newvalues.mask)
ij = ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
if keep == 'below':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[ij[:, 0], ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = ij[np.where(~newvalues.mask)]
newvalues = newvalues[nonmasked_values]
elif keep == 'above':
newvalues = np.ma.masked_array(newvalues[nonmasked_values], mask=(newvalues[nonmasked_values] < self.array.data[ij[:, 0], ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = ij[np.where(~newvalues.mask)]
newvalues = newvalues[nonmasked_values]
self.array.data[ij[:, 0], ij[:, 1]] = newvalues
else:
# No mask --> boolean
if keep == 'below':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[ij[:, 0], ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = ij[np.where(~newvalues.mask)]
newvalues = newvalues[nonmasked_values]
elif keep == 'above':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[ij[:, 0], ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = ij[np.where(~newvalues.mask)]
newvalues = newvalues[nonmasked_values]
self.array.data[ij[:, 0], ij[:, 1]] = newvalues
elif self.mngselection.myselection == ALL_SELECTED and (grid_x is None and grid_y is None):
# interpolation in all the cells
# Creating a grid for all cells
decalx = self.origx + self.translx
decaly = self.origy + self.transly
x = np.arange(self.dx / 2. + decalx, float(self.nbx) * self.dx + self.dx / 2 + decalx, self.dx)
y = np.arange(self.dy / 2. + decaly, float(self.nby) * self.dy + self.dy / 2 + decaly, self.dy)
grid_x, grid_y = np.meshgrid(x, y, indexing='ij')
grid_ij = self.xy2ij_np(np.concatenate([grid_x.ravel()[:, np.newaxis], grid_y.ravel()[:, np.newaxis]], axis=1))
try:
# Opérateur d'interpolation linéaire
triang = mtri.Triangulation(coords[:,0],coords[:,1],triangles)
if mask_tri is not None:
triang.set_mask(mask_tri)
interplin = mtri.LinearTriInterpolator(triang, coords[:,2])
except:
raise Warning(_('Bad triangulation - try with another method like Scipy'))
# Interpolation et récupération dans le numpy.array de l'objet Wolf
newvalues = interplin(grid_x,grid_y)
nonmasked_values = np.where(~newvalues.mask)
ij = grid_ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
if keep == 'below':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[grid_ij[:, 0], grid_ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = grid_ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
elif keep == 'above':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[grid_ij[:, 0], grid_ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = grid_ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
self.array.data[ij[:, 0], ij[:, 1]] = newvalues
elif (grid_x is not None and grid_y is not None):
# Working on an imposed grid
# Opérateur d'interpolation linéaire
try:
triang = mtri.Triangulation(coords[:,0],coords[:,1],triangles)
if mask_tri is not None:
triang.set_mask(mask_tri)
interplin = mtri.LinearTriInterpolator(triang, coords[:,2]) # Interpolation et récupération dans le numpy.array de l'objet Wolf
newvalues = np.ma.masked_array([interplin(x, y) for x, y in zip(grid_x.ravel(),grid_y.ravel())])
except:
raise Warning(_('Bad triangulation - try with another method like Scipy'))
# newvalues is a vector of values
# ij is a vector of indices
ij = np.asarray([self.get_ij_from_xy(x, y) for x, y in zip(grid_x.ravel(),grid_y.ravel())])
if newvalues.mask.shape!=():
nonmasked_values = np.where(~newvalues.mask)
ij = ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
if keep == 'below':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[ij[:, 0], ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
elif keep == 'above':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[ij[:, 0], ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
self.array.data[ij[:, 0], ij[:, 1]] = newvalues
else:
if keep == 'below':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[ij[:, 0], ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
elif keep == 'above':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[ij[:, 0], ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
self.array.data[ij[:, 0], ij[:, 1]] = newvalues
else:
# Working on the whole array
# Creating a grid for all cells
decalx = self.origx + self.translx
decaly = self.origy + self.transly
x = np.arange(self.dx / 2. + decalx, float(self.nbx) * self.dx + self.dx / 2 + decalx, self.dx)
y = np.arange(self.dy / 2. + decaly, float(self.nby) * self.dy + self.dy / 2 + decaly, self.dy)
grid_x, grid_y = np.meshgrid(x, y, indexing='ij')
try:
# Opérateur d'interpolation linéaire
triang = mtri.Triangulation(coords[:,0],coords[:,1],triangles)
if mask_tri is not None:
triang.set_mask(mask_tri)
interplin = mtri.LinearTriInterpolator(triang, coords[:,2])
# Interpolation et récupération dans le numpy.array de l'objet Wolf
# newvalues is an array
newvalues = interplin(grid_x,grid_y).astype(np.float32)
assert newvalues.shape == (self.nbx, self.nby), _('Bad shape for newvalues')
if keep == 'below':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data))
elif keep == 'above':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data))
nonmasked_values = np.where(~newvalues.mask)
self.array.data[nonmasked_values] = newvalues[nonmasked_values]
except:
raise Warning(_('Bad triangulation - try with another method like Scipy'))
else:
if grid_x is None and grid_y is None:
decalx = self.origx + self.translx
decaly = self.origy + self.transly
x = np.arange(self.dx / 2. + decalx, float(self.nbx) * self.dx + self.dx / 2 + decalx, self.dx)
y = np.arange(self.dy / 2. + decaly, float(self.nby) * self.dy + self.dy / 2 + decaly, self.dy)
grid_x, grid_y = np.meshgrid(x, y, indexing='ij')
# Opérateur d'interpolation linéaire
triang = mtri.Triangulation(coords[:,0],coords[:,1],triangles)
interplin = mtri.LinearTriInterpolator(triang, coords[:,2])
# Interpolation et récupération dans le numpy.array de l'objet Wolf
newvalues = np.ma.masked_array([interplin(x, y) for x, y in zip(grid_x.ravel(), grid_y.ravel())])
# newvalues = interplin(grid_x,grid_y).astype(np.float32)
nonmasked_values = np.where(~newvalues.mask)
grid_x = grid_x.ravel()[nonmasked_values]
grid_y = grid_y.ravel()[nonmasked_values]
newvalues = newvalues[nonmasked_values]
# make a shape (n, 2) from grid_x and grid_y
grid_x = grid_x[:, np.newaxis]
grid_y = grid_y[:, np.newaxis]
# concatenate grid_x and grid_y
grid_ij = self.xy2ij_np(np.concatenate([grid_x, grid_y], axis=1))
if keep == 'below':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[grid_ij[:,0], grid_ij[:,1]]))
nonmasked_values = np.where(~newvalues.mask)
grid_ij = grid_ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
elif keep == 'above':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[grid_ij[:,0], grid_ij[:,1]]))
nonmasked_values = np.where(~newvalues.mask)
grid_ij = grid_ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
self.array.data[grid_ij[:,0], grid_ij[:,1]] = newvalues
else:
ij = np.asarray([self.get_ij_from_xy(x, y) for x, y in zip(grid_x.ravel(),grid_y.ravel())])
# Opérateur d'interpolation linéaire
triang = mtri.Triangulation(coords[:,0],coords[:,1],triangles)
interplin = mtri.LinearTriInterpolator(triang, coords[:,2]) # Interpolation et récupération dans le numpy.array de l'objet Wolf
newvalues = np.ma.masked_array([interplin(x, y) for x, y in zip(grid_x.ravel(), grid_y.ravel())])
if newvalues.mask.shape!=():
nonmasked_values = np.where(~newvalues.mask)
ij = ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
if keep == 'below':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[ij[:, 0], ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
elif keep == 'above':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[ij[:, 0], ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
self.array.data[ij[:, 0], ij[:, 1]] = newvalues
else:
if keep == 'below':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[ij[:, 0], ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
elif keep == 'above':
newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[ij[:, 0], ij[:, 1]]))
nonmasked_values = np.where(~newvalues.mask)
ij = ij[nonmasked_values]
newvalues = newvalues[nonmasked_values]
self.array.data[ij[:, 0], ij[:, 1]] = newvalues
#on force les valeurs masquées à nullvalue afin que l'interpolation n'applique pas ses effets dans cette zone
self.array.data[self.array.mask]= self.nullvalue
except:
logging.error(_('Bad triangulation - We will try with Scipy'))
use_scipy=True
if interp_method != 'matplotlib' or use_scipy:
# We need to convert the triangle to a vector to call the interpolation
for curtri in triangles:
curvec = vector(is2D=False)
for curpt in curtri:
curvec.add_vertex(wolfvertex(coords[curpt,0], coords[curpt,1], coords[curpt,2]))
curvec.close_force()
# if bool(np.isnan(curvec.x).any() or np.isnan(curvec.y).any()):
# continue
self.interpolate_on_polygon(curvec, "linear", keep)
if points_along_edges:
# Test points along edges
self.interpolate_strictly_on_polyline(curvec, usemask=True, keep=keep)
# self.interpolate_strictly_on_vertices(curvec, usemask=True, keep=keep)
self.reset_plot()
return
[docs]
def interpolate_on_triangulations(self, multiple_triangulations:list[Triangulation],
interp_method:Literal['matplotlib','scipy'] = 'scipy',
keep:Literal['all', 'below', 'above'] | list[str] = 'all'):
"""
Interpolation sur plusieurs triangulations.
:param multiple_triangulations: list of Triangulation objects
:param interp_method: method for the interpolation -- 'matplotlib' or 'scipy'
:param keep: 'all' to keep all values, 'below' to keep only values below the current array, 'above' to keep only values above the current array
or a list of such values for each triangulation
"""
for idx, curtri in enumerate(multiple_triangulations):
if isinstance(keep, list):
curkeep = keep[idx]
else:
curkeep = keep
self.interpolate_on_triangulation(curtri.pts, curtri.tri,
interp_method=interp_method,
keep=curkeep)
[docs]
def interpolation2D(self, key:str='1'):
"""Perform a 2D interpolation over the selected cells using key reference points.
:param key: interpolation key identifying the source data or method
"""
#FIXME : auhtorize interpolation on other keys
key = str(key)
if key in self.mngselection.selections.keys():
if len(self.mngselection.myselection)>0:
curlist = self.mngselection.selections[key]['select']
cursel = self.mngselection.myselection
if len(curlist) > 0:
ij = [self.get_ij_from_xy(cur[0], cur[1]) for cur in curlist]
z = [self.array.data[curij[0], curij[1]] for curij in ij]
if cursel == ALL_SELECTED:
xall = np.linspace(self.origx + self.dx / 2., self.origx + (float(self.nbx) - .5) * self.dx,
self.nbx)
yall = np.linspace(self.origy + self.dy / 2., self.origy + (float(self.nby) - .5) * self.dy,
self.nby)
cursel = [(x, y) for x in xall for y in yall]
z = griddata(curlist, z, cursel, fill_value=np.nan)
for cur, curz in zip(cursel, z):
if not np.isnan(curz):
i, j = self.get_ij_from_xy(cur[0], cur[1])
self.array.data[i, j] = curz
self.reset_plot()
[docs]
def rasterize_vector_valuebyid(self, working_vector: vector,
id,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict",
usemask:bool = True,
epsilon:float = 0.0):
"""
Rasterize a vector using the value of the id
:param working_vector: vector to rasterize
:param id: attribute id whose value will fill the rasterized cells
:param method: spatial method: ``'mpl'``, ``'shapely_strict'``, ``'shapely_wboundary'``, ``'rasterio'``
:param usemask: if ``True``, only rasterize unmasked cells
:param epsilon: boundary tolerance
"""
if not isinstance(working_vector, vector):
logging.error(_('working_vector must be a vector. You provided a {}').format(type(working_vector)))
return
if working_vector.get_value(id) is None:
logging.error(_('No value for id {}').format(id))
return
val = working_vector.get_value(id)
if val is None:
logging.error(_('No value for id {}').format(id))
return
self.fillin_vector_with_value(working_vector, val, method=method, usemask=usemask, epsilon=epsilon)
[docs]
def rasterize_zone_valuebyid(self,
working_zone: zone,
id,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict",
usemask:bool = True,
epsilon:float = 0.0):
"""
Rasterize each vector of a zone using the value of the given attribute id.
:param working_zone: zone containing vectors to rasterize
:param id: attribute id whose value will fill rasterized cells
:param method: spatial method for cell selection
:param usemask: if ``True``, only rasterize unmasked cells
:param epsilon: boundary tolerance
"""
if not isinstance(working_zone, zone):
logging.error(_('working_zone must be a zone. You provided a {}').format(type(working_zone)))
return
for curvec in working_zone.myvectors:
self.rasterize_vector_valuebyid(curvec, id, method, usemask, epsilon)
[docs]
def rasterize_zones_valuebyid(self,
working_zones: Zones,
id,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict",
usemask:bool = True,
epsilon:float = 0.0):
"""
Rasterize each vector of a :class:`Zones` collection using the value of the given attribute id.
:param working_zones: :class:`Zones` containing vectors to rasterize
:param id: attribute id whose value will fill rasterized cells
:param method: spatial method for cell selection
:param usemask: if ``True``, only rasterize unmasked cells
:param epsilon: boundary tolerance
"""
if not isinstance(working_zones, Zones):
logging.error(_('working_zone must be a zone. You provided a {}').format(type(working_zones)))
return
for curzone in working_zones.myzones:
self.rasterize_zone_valuebyid(curzone, id, method, usemask, epsilon)
[docs]
def fillin_vector_with_value(self, working_vector: vector,
value: int | float | str,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict",
usemask:bool = True,
epsilon:float = 0.0):
"""
Fill in a vector using the provided value
:param working_vector: vector
:param value: value to fill in (float or int depending on the array type)
:param method: method to get points inside the polygon
- 'mpl' : matplotlib.path.Path.contains_points (fast for simple polygons)
- 'shapely_strict' : shapely contains (strictly inside the polygon)
- 'shapely_wboundary' : shapely contains or on the boundary (inside or on the polygon)
- 'rasterio' : rasterio.features.rasterize (fast for complex polygons with holes)
:param usemask: if True, only fill in the masked points
:param epsilon: tolerance for shapely methods (to consider points on the boundary)
"""
if not isinstance(working_vector, vector):
logging.error(_('working_vector must be a vector. You provided a {}').format(type(working_vector)))
return
try:
if self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_INTEGER16]:
val = int(float(value))
else:
val = float(value)
except:
logging.error(_('Value {} is not a number').format(value))
return
ij = self.get_ij_inside_polygon(working_vector, method = method, usemask = usemask, eps = epsilon)
if len(ij) == 0:
logging.debug(_('No points to rasterize'))
return
self.array.data[ij[:, 0], ij[:, 1]] = val
[docs]
def fillin_zone_with_value(self,
working_zone: zone,
value,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict",
usemask:bool = True,
epsilon:float = 0.0):
"""
Fill all cells inside each vector of a zone with the given value.
:param working_zone: zone whose vectors define the fill area
:param value: value to assign to cells inside the zone vectors
:param method: spatial method for cell selection
:param usemask: if ``True``, only fill unmasked cells
:param epsilon: boundary tolerance
"""
if not isinstance(working_zone, zone):
logging.error(_('working_zone must be a zone. You provided a {}').format(type(working_zone)))
return
for curvec in working_zone.myvectors:
self.fillin_vector_with_value(curvec, value, method, usemask, epsilon)
[docs]
def fillin_zones_with_value(self,
working_zones: Zones,
value,
method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict",
usemask:bool = True,
epsilon:float = 0.0):
"""
Fill all cells inside each vector of a :class:`Zones` collection with the given value.
:param working_zones: :class:`Zones` whose vectors define the fill areas
:param value: value to assign to cells inside the zone vectors
:param method: spatial method for cell selection
:param usemask: if ``True``, only fill unmasked cells
:param epsilon: boundary tolerance
"""
if not isinstance(working_zones, Zones):
logging.error(_('working_zones must be a Zones. You provided a {}').format(type(working_zones)))
return
for curzone in working_zones.myzones:
self.fillin_zone_with_value(curzone, value, method, usemask, epsilon)
[docs]
def fillin_from_xyz(self, xyz:np.ndarray):
""" Remplissage du tableau à partir d'un tableau xyz.
:param xyz: Numpy array - shape (n, 3)
"""
ij = self.get_ij_from_xy(xyz[:, 0], xyz[:, 1])
# filter points outside the array
filter = (ij[0] >= 0) & (ij[0] < self.nbx) & (ij[1] >= 0) & (ij[1] < self.nby)
xyz = xyz[filter]
ij = (ij[0][filter], ij[1][filter])
if self.dtype == np.float32:
self.array.data[ij] = np.float32(xyz[:, 2])
elif self.dtype == np.float64:
self.array.data[ij] = np.float64(xyz[:, 2])
elif self.dtype == np.int32:
self.array.data[ij] = np.int32(xyz[:, 2])
elif self.dtype == np.int16:
self.array.data[ij] = np.int16(xyz[:, 2])
elif self.dtype == np.int8:
self.array.data[ij] = np.int8(xyz[:, 2])
else:
logging.warning(_('Type not supported : ')+str(self.dtype))
[docs]
def fillin_from_ijz(self, ijz:np.ndarray):
""" Remplissage du tableau à partir d'un tableau ijz
:param ijz: Numpy array - shape (n, 3)
"""
try:
i = ijz[:, 0].astype(int)
j = ijz[:, 1].astype(int)
except Exception as e:
logging.error(_('Error in conversion of ijz to int : ')+str(e))
return
if self.dtype == np.float32:
self.array.data[i, j] = np.float32(ijz[:, 2])
elif self.dtype == np.float64:
self.array.data[i, j] = np.float64(ijz[:, 2])
elif self.dtype == np.int32:
self.array.data[i, j] = np.int32(ijz[:, 2])
elif self.dtype == np.int16:
self.array.data[i, j] = np.int16(ijz[:, 2])
elif self.dtype == np.int8:
self.array.data[i, j] = np.int8(ijz[:, 2])
else:
logging.warning(_('Type not supported : ')+str(self.dtype))
# ========================================================================
# Filtering & Labelling
# ========================================================================
[docs]
def filter_inundation(self, epsilon:float = None, mask:np.ndarray = None):
"""
Apply filter on array :
- mask data below eps
- mask data outisde linkedvec
:param epsilon: value under which data are masked
:param mask: mask to apply if eps is None
If all params are None, the function will mask NaN values
"""
if epsilon is not None:
self.array[np.where(self.array<epsilon)] = self.nullvalue
elif mask is not None:
self.array.data[mask] = self.nullvalue
idx_nan = np.where(np.isnan(self.array))
if len(idx_nan[0])>0:
self.array[idx_nan] = self.nullvalue
self.array.mask[idx_nan] = True
logging.warning(_('NaN values found in the array'))
if len(idx_nan[0])<10:
for i,j in zip(idx_nan[0],idx_nan[1]):
logging.warning(f'NaN at {i+1},{j+1} -- 1-based')
else:
for i,j in zip(idx_nan[0][:10],idx_nan[1][:10]):
logging.warning(f'NaN at {i+1},{j+1} -- 1-based')
logging.warning(f'... and {len(idx_nan[0])-10} more')
self.mask_data(self.nullvalue)
if self.linkedvec is not None:
self.mask_outsidepoly(self.linkedvec)
self.reset_plot()
[docs]
def filter_independent_zones(self, n_largest:int = 1, reset_plot:bool = True):
"""
Label independent connected regions and keep only the *n* largest.
:param n_largest: number of largest zones to keep
:param reset_plot: if ``True``, reset the plot after filtering
"""
# labellisation
labeled_array = self.array.data.copy()
labeled_array[np.where(self.array.mask)] = 0
labeled_array, num_features = label(labeled_array)
# convertion en masked array
labeled_array = ma.asarray(labeled_array)
labeled_array.mask = np.zeros(labeled_array.shape, dtype=bool)
# application du masque
labeled_array.mask[:,:] = self.array.mask[:,:]
longueur = []
labeled_array[labeled_array.mask] = 0
longueur = list(sum_labels(np.ones(labeled_array.shape, dtype=np.int32), labeled_array, range(1, num_features+1)))
longueur = [[longueur[j], j+1] for j in range(0, num_features)]
# longueur = [[np.sum(labeled_array[labeled_array == j]) // j, j] for j in range(1, num_features+1)]
longueur.sort(key=lambda x: x[0], reverse=True)
self.array.mask[:,:] = True
for j in range(0, n_largest):
self.array.mask[labeled_array == longueur[j][1]] = False
self.set_nullvalue_in_mask()
if reset_plot:
self.reset_plot()
[docs]
def filter_zone(self, set_null:bool = False, reset_plot:bool = True):
"""
Keep only the labeled regions that contain at least one selected cell.
mailles sont sélectionnées
:param set_null: if ``True``, set filtered-out cells to :attr:`nullvalue`
:param reset_plot: if ``True``, reset the plot after filtering
"""
if self.SelectionData.nb == 0:
logging.info(_('No selection -- no filtering'))
return
if self.SelectionData.myselection == ALL_SELECTED:
logging.info(_('All nodes selected -- no filtering'))
return
# labellisation
labeled_array = self.array.data.copy()
labeled_array[np.where(self.array.mask)] = 0
labeled_array, num_features = label(labeled_array)
# récupération des zones utiles
vals_ij = [self.get_ij_from_xy(cur[0], cur[1]) for cur in self.SelectionData.myselection]
vals = list(set([labeled_array[int(cur[0]), int(cur[1])] for cur in vals_ij]))
self.array.mask[:,:] = True
for j in vals:
self.array.mask[labeled_array == j] = False
if set_null:
self.set_nullvalue_in_mask()
if reset_plot:
self.reset_plot()
[docs]
def clean_small_patches(self, min_size:int = 1, set_null:bool = False, reset_plot:bool = True):
"""Clean small patches in the array
:param min_size: minimum patch size in cells; smaller patches are removed
:param set_null: if ``True``, set removed cells to :attr:`nullvalue`
:param reset_plot: if ``True``, reset the plot after cleaning
"""
if min_size < 1:
logging.error(_('Minimum size must be greater than 1'))
return
# labellisation
labeled_array = self.array.data.copy()
labeled_array[np.where(self.array.mask)] = 0
labeled_array, num_features = label(labeled_array,)
# count the number of elements in each patch
patch_size = np.bincount(labeled_array.ravel())
# mask the small patches
mask_small_patches = patch_size < min_size
labeled_array[mask_small_patches[labeled_array]] = 0
# mask data in the array based on the labeled array
self.array.mask[labeled_array == 0] = True
if set_null:
self.set_nullvalue_in_mask()
if reset_plot:
self.reset_plot()
[docs]
def labelling(self, reset_plot:bool = True):
"""
Labelling of the array using Scipy
:param reset_plot: if ``True``, reset the plot after labelling
"""
# labellisation
labeled_array = self.array.data.copy()
labeled_array[np.where(self.array.mask)] = 0
labeled_array, num_features = label(labeled_array)
self.array.data[:,:] = labeled_array[:,:].astype(self.dtype)
if reset_plot:
self.reset_plot()
# ========================================================================
# Statistics & Analysis
# ========================================================================
[docs]
def statistics(self, inside_polygon:vector | Polygon | np.ndarray = None) -> dict[str, float | np.ndarray]:
"""
Statistics on Selected data or the whole array if no selection
:param inside_polygon: vector or Polygon to select data inside the polygon
:return: mean, std, median, sum, volume (sum*dx*dy), values
:rtype: dict[str, float | np.ndarray]
Translated Keys are:
- _('Mean'): mean value of the selected data
- _('Std'): standard deviation of the selected data
- _('Median'): median value of the selected data
- _('Sum'): sum of the selected data
- _('Volume'): volume of the selected data (sum * dx * dy)
- _('Values'): values of the selected data as a numpy array
- _('Min'): minimum value of the selected data
- _('Max'): maximum value of the selected data
"""
if inside_polygon is not None:
if isinstance(inside_polygon, vector | Polygon):
ij = self.get_ij_inside_polygon(inside_polygon)
elif isinstance(inside_polygon, np.ndarray):
if inside_polygon.ndim == 2 and inside_polygon.shape[1] == 2:
ij = self.get_ij_from_xy_array(inside_polygon)
else:
logging.error(_('Invalid shape for inside_polygon numpy array'))
return {}
vals = self.array[ij[:,0], ij[:,1]]
elif self.SelectionData.nb == 0 or self.SelectionData.myselection == ALL_SELECTED:
logging.info(_('No selection -- statistics on the whole array'))
vals = self.array[~self.array.mask].ravel().data # all values
else:
vals = self.SelectionData.get_values_sel()
mean = np.ma.mean(vals)
std = np.ma.std(vals)
median = np.ma.median(vals)
sum = np.ma.sum(vals)
vol = sum * self.dx * self.dy
min_val = np.ma.min(vals)
max_val = np.ma.max(vals)
return {_('Mean'): mean, _('Std'): std,
_('Median'): median, _('Sum'): sum,
_('Volume'): vol, _('Values'): vals,
_('Min'): min_val, _('Max'): max_val}
[docs]
def get_unique_values(self):
"""
Return unique values in the array
"""
unique = np.ma.unique(self.array)
while unique[-1] is np.ma.masked and len(unique) > 1:
unique = unique[:-1]
return unique
[docs]
def map_values(self, keys_vals:dict, default:float=None):
"""
Mapping array values to new values defined by a dictionnary.
First, check if all values are in keys_vals. If not, set to default.
If default is None, set to nullvalue.
:param keys_vals: dictionary of values to map
:param default: default value if key not found
"""
vals = self.get_unique_values()
def_keys = []
for val in vals:
if val not in keys_vals:
logging.warning(_(f"Value {val} not in keys_vals -- Will be set to default or NullValue"))
def_keys.append(val)
continue
for key, val in keys_vals.items():
self.array.data[self.array.data == key] = val
if default is None:
default = self.nullvalue
for key in def_keys:
self.array.data[self.array.data == key] = default
self.mask_data(self.nullvalue)
self.reset_plot()
# ========================================================================
# Linked Arrays & Palette
# ========================================================================
[docs]
def add_crosslinked_array(self, newlink:"WolfArrayModel"):
"""Add a bidirectional cross-link between this array and another.
:param newlink: :class:`WolfArrayModel` to cross-link with this array
"""
self.linkedarrays.append(newlink)
newlink.linkedarrays.append(self)
[docs]
def share_palette(self):
""" Share the color palette object with all cross-linked arrays. """
for cur in self.linkedarrays:
if id(cur.mypal) != id(self.mypal):
cur.mypal = self.mypal
# ========================================================================
# Array Manipulation (Crop, Paste, Extend, ...)
# ========================================================================
[docs]
def paste_all(self, fromarray:"WolfArrayModel", mask_after:bool=True):
"""Paste the data from another array into this one where geometries overlap.
:param fromarray: source :class:`WolfArrayModel` from which to paste values
:param mask_after: if ``True``, mask cells equal to :attr:`nullvalue` after pasting
"""
fromarray: WolfArrayModel
# Récupération des bornes de la matrice source dans la matrice de destination
i1, j1 = self.get_ij_from_xy(fromarray.origx, fromarray.origy)
i2, j2 = self.get_ij_from_xy(fromarray.origx + fromarray.nbx * fromarray.dx,
fromarray.origy + fromarray.nby * fromarray.dy)
# Limitation des bornes à la matrice de destination
i1 = max(0, i1)
j1 = max(0, j1)
i2 = min(self.nbx, i2)
j2 = min(self.nby, j2)
# Conversion des bornes utiles en coordonnées
x1, y1 = self.get_xy_from_ij(i1, j1)
x2, y2 = self.get_xy_from_ij(i2, j2)
# Récupération des bornes utiles dans la matrice source
i3, j3 = fromarray.get_ij_from_xy(x1, y1)
i4, j4 = fromarray.get_ij_from_xy(x2, y2)
# Sélection des valeurs non masquées
# Attention : le résultat est en indices relatifs à [i3,j3] --> demande une conversion en indices absolus pour retrouver les valeurs dans la matrice complète
usefulij = np.where(np.logical_not(fromarray.array.mask[i3:i4, j3:j4]))
i5, j5 = self.get_ij_from_xy(x1, y1)
# Décalage des indices pour la matrice de destination
usefulij_dest = (usefulij[0] + i5, usefulij[1] + j5)
usefulij[0][:] += i3
usefulij[1][:] += j3
self.array.data[usefulij_dest] = fromarray.array.data[usefulij]
if mask_after:
self.mask_data(self.nullvalue)
self.reset_plot()
[docs]
def set_values_sel(self, xy:list[float], z:list[float], update:bool=True):
"""
Set values at the selected positions
:param xy: [[x1,y1],[x2,y2],...]
:param z: [z1,z2,...]
:param update: update the plot
"""
sel = np.asarray(xy)
z = np.asarray(z)
if len(sel) == 1:
ijall = self.xy2ij_np(sel)
i, j = ijall[0, 0], ijall[0, 1]
if i < 0 or i >= self.nbx or j < 0 or j >= self.nby:
logging.error(_('Selected point is out of bounds'))
return
else:
self.array[i, j] = z
else:
# ijall = np.asarray(self.get_ij_from_xy(sel[:, 0], sel[:, 1])).transpose()
ijall = self.xy2ij_np(sel)
useful = np.where((ijall[:, 0] >= 0) & (ijall[:, 0] < self.nbx) & (ijall[:, 1] >= 0) & (ijall[:, 1] < self.nby))
self.array[ijall[useful, 0], ijall[useful, 1]] = z[useful]
self.mask_data(self.nullvalue)
if update:
self.reset_plot()
[docs]
def crop_array(self, bbox:list[list[float],list[float]], setnull_trx_try:bool = False) -> "WolfArrayModel":
""" Crop the data based on the bounding box.
Beware of the grid:
- If the cropped region is smaller than a cell, then the cropped array
will be empty.
- If the cropped region doesn't align with the source array grid, it
will be forced to do so.
:param bbox: bounding box [[xmin, xmax], [ymin, ymax]].
:param setnull_trx_try: set the translation to 0 if True, origx and
origy will be set to the lower left corner of the bbox. Default is
`False`.
"""
xmin, xmax = bbox[0]
ymin, ymax = bbox[1]
# Make sure the bounding box can be used.
if xmin > xmax:
xmin, xmax = xmax, xmin
if ymin > ymax:
ymin, ymax = ymax, ymin
imin, jmin = self.get_ij_from_xy(xmin, ymin)
imax, jmax = self.get_ij_from_xy(xmax, ymax)
imin = int(imin)
jmin = int(jmin)
imax = int(imax)
jmax = int(jmax)
newheader = header_wolf()
newheader.nbx = imax-imin
newheader.nby = jmax-jmin
newheader.dx = self.dx
newheader.dy = self.dy
newheader.origx, newheader.origy = self.get_xy_from_ij(imin, jmin, abs = setnull_trx_try)
newheader.origx -= self.dx / 2.
newheader.origy -= self.dy / 2.
newheader.translx = self.translx
newheader.transly = self.transly
if setnull_trx_try:
newheader.translx = 0.
newheader.transly = 0.
newarray = type(self)(srcheader=newheader)
if imin < imax and jmin < jmax:
newarray.array[:,:] = self.array[imin:imax, jmin:jmax]
else:
# One of the dimensions has 0 size => the array is empty
pass
return newarray
[docs]
def crop_masked_at_edges(self):
"""
Crop the array to remove masked cells at the edges of the array
:return: cropped array, WolfArrayModel instance
"""
# Get max indexes
Existing_indexes = np.argwhere(self.array.mask!=True)
Max_index = np.max(Existing_indexes, 0)
Min_index = np.min(Existing_indexes, 0)
# convert index in location
xMax, yMax = self.convert_ij2xy_np(Max_index.reshape((1,2)))
xMin, yMin = self.convert_ij2xy_np(Min_index.reshape((1,2)))
# crop
nbx=np.ceil((xMax[0]-xMin[0])/self.dx).astype(int)+1 #+1 otherwise you remove one line
nby=np.ceil((yMax[0]-yMin[0])/self.dy).astype(int)+1 #+1 otherwise you remove one column
return self.crop(int(Min_index[0]),int(Min_index[1]),int(nbx),int(nby))
[docs]
def crop(self, i_start:int, j_start:int, nbx:int, nby:int, k_start:int=1, nbz:int=1):
"""
Crop the array
:param i_start: start index in x
:param j_start: start index in y
:param nbx: number of cells in x
:param nby: number of cells in y
:param k_start: start index in z
:param nbz: number of cells in z
:return: cropped array, WolfArrayModel instance
"""
assert type(i_start) == int, "i_start must be an integer"
assert type(j_start) == int, "j_start must be an integer"
assert type(nbx) == int, "nbx must be an integer"
assert type(nby) == int, "nby must be an integer"
assert type(k_start) == int, "k_start must be an integer"
assert type(nbz) == int, "nbz must be an integer"
newWolfArray = type(self)(nullvalue=self.nullvalue)
newWolfArray.nbx = nbx
newWolfArray.nby = nby
newWolfArray.dx = self.dx
newWolfArray.dy = self.dy
newWolfArray.origx = self.origx + float(i_start) * self.dx
newWolfArray.origy = self.origy + float(j_start) * self.dy
newWolfArray.translx = self.translx
newWolfArray.transly = self.transly
if self.nbdims == 3:
newWolfArray.nbz = nbz
newWolfArray.dz = self.dz
newWolfArray.origz = self.origz + float(k_start) * self.dz
newWolfArray.translz = self.translz
newWolfArray.array = self.array[i_start:i_start + nbx, j_start:j_start + nby, k_start:k_start + nbz].copy()
elif self.nbdims == 2:
newWolfArray.array = self.array[i_start:i_start + nbx, j_start:j_start + nby].copy()
return newWolfArray
[docs]
def extend(self, x_ext:int, y_ext:int):
"""
Extend the array
Crop is the opposite
:param x_ext: number of cells to add/remove in the x direction (positive = extend, negative = shrink)
:param y_ext: number of cells to add/remove in the y direction (positive = extend, negative = shrink)
"""
assert x_ext >= 0 and y_ext >= 0
assert self.nbdims == 2, "Only 2D arrays are supported"
# Remember WolfArrays are masked. Therefore
# we need to extend mask. In this case, not specifying
# anything will expand the mask with "dont mask"
# values.
# extend vertically
ex = self.array
if x_ext > 0:
# dtype is important: it allows to keep a Fortran friendly
# type I think.
ex = ma.append(
ex,
np.array([0] * ex.shape[1] * x_ext, dtype=ex.dtype).reshape((x_ext, -1)),
axis=0,
)
self.nbx += x_ext
# extend horizontally
if y_ext > 0:
ex = ma.append(
ex,
np.array([0] * ex.shape[0] * y_ext, dtype=ex.dtype).reshape((-1, y_ext)),
axis=1,
)
self.nby += y_ext
self.array = ex
self.mask_data(self.nullvalue)
[docs]
def extremum(self, which:Literal['min','max']='min'):
"""Return the extremum value
:param which: which extremum to find: ``'min'`` or ``'max'``
"""
if which == 'min':
my_extr = np.amin(self.array)
elif which == 'max':
my_extr = np.amax(self.array)
else:
logging.warning(_('Extremum not supported : ')+which)
my_extr = -99999.
return my_extr
[docs]
def nullify_border(self, width:int = 1):
"""
Set border to nullvalue
:param width: number of border cells to set to :attr:`nullvalue`
"""
self.array.data[:width,:] = self.nullvalue
self.array.data[-width:,:] = self.nullvalue
self.array.data[:,:width] = self.nullvalue
self.array.data[:,-width:] = self.nullvalue
self.array.mask[:width,:] = True
self.array.mask[-width:,:] = True
self.array.mask[:,:width] = True
self.array.mask[:,-width:] = True
[docs]
def convolve(self,
filter,
method:Literal['scipyfft',
'jaxfft',
'classic']='scipyfft',
inplace:bool=True) -> "WolfArrayModel":
"""
Convolve the array with a filter.
The array and the filter should have the same dtype. If not, a warning is issued and the array dtype is used.
Thus:
- An int8 array convolved with float32 filter will be converted to int8.
- A float32 array convolved with int8 filter will be converted to float32.
**User should be careful with the dtype of both the array AND the filter.**
:param filter: filter to convolve with
:param method: method to use for convolution ('scipyfft', 'jaxfft', 'classic')
:param inplace: if True, the array is modified in place, otherwise a new array is returned -- same type as the original one
:return: convolved array, WolfArrayModel instance
"""
if isinstance(filter, WolfArrayModel):
_filter = filter.array.data.copy()
_filter[filter.array.mask] = 0.0
elif isinstance(filter, np.ndarray):
_filter = filter.copy()
_filter[np.isnan(_filter)] = 0.0
else:
logging.error("Filter must be a WolfArrayModel or a numpy array")
return None
if self.dtype != _filter.dtype:
logging.warning(_("Filter and array should have the same dtype - {} - {}").format(self.dtype, _filter.dtype))
logging.warning(_("Convolution result will have the dtype - {}").format(self.dtype))
if method == 'classic':
from scipy.signal import convolve2d
convolved = convolve2d(self.array.data, _filter, mode='same', fillvalue=0.0)
elif method == 'scipyfft':
from scipy.signal import fftconvolve
convolved = fftconvolve(self.array.data, _filter, mode='same')
elif method == 'jaxfft':
import jax.numpy as jnp
from jax.scipy.signal import fftconvolve as jax_fftconvolve
jax_filter = jnp.array(_filter)
jax_array = jnp.array(self.array.data)
convolved = jax_fftconvolve(jax_array, jax_filter, mode='same')
convolved = jnp.asarray(convolved)
else:
logging.error("Method not supported")
return None
if inplace:
self.array.data[:,:] = convolved
return self
else:
newWolfArray = type(self)(mold=self, whichtype=self.wolftype)
newWolfArray.array.data[:,:] = convolved
newWolfArray.array.mask[:,:] = self.array.mask[:,:]
newWolfArray.count()
return newWolfArray
[docs]
def concatenate(self, list_arr:list["WolfArrayModel"], nullvalue:float = 0.):
"""
Concatenate the values from another WolfArrays into a new one
:param list_arr: list of WolfArrayModel objects
:param nullvalue: null value for the output array
:return: a new WolfArrayModel
:return_type: WolfArrayModel
"""
list_arr:list[WolfArrayModel]
for curarray in list_arr:
assert isinstance(curarray, WolfArrayModel), "The list must contain WolfArrayModel objects"
assert curarray.nbdims == self.nbdims, "The arrays must have the same number of dimensions"
assert curarray.dx == self.dx and curarray.dy == self.dy, "The arrays must have the same dx and dy"
assert curarray.translx == 0 and curarray.transly == 0, "The translations must be zero"
assert (np.abs(curarray.origx-self.origx)%int(self.dx) == 0)and(np.abs(curarray.origy-self.origy)%int(self.dy) == 0), "The origins are not compatible! You need to do some interpolation stuff"
assert self.translx == 0 and self.transly == 0, "The translations must be zero"
assert self.wolftype == curarray.wolftype, "The arrays must have the same wolftype"
# create an array
newArray = type(self)(nullvalue=nullvalue, whichtype=self.wolftype)
Xlim,Ylim = self.find_union(list_arr)
newArray.origx = Xlim[0]
newArray.origy = Ylim[0]
newArray.dx = self.dx
newArray.dy = self.dy
newArray.nbx = int(np.diff(Xlim)[0]/newArray.dx)
newArray.nby = int(np.diff(Ylim)[0]/newArray.dy)
newArray.translx = 0.
newArray.transly = 0.
newArray.array = np.ma.masked_array(np.ones((newArray.nbx, newArray.nby), dtype=self.dtype) * nullvalue, mask=True, dtype=self.dtype)
newArray.paste_all(self, mask_after=False)
for curarray in list_arr:
Array_intersect = curarray.find_intersection(self, ij=True)
if Array_intersect is not None:
logging.info(_("There is intersection. By default, the array {} overlaps the first one.".format(curarray.filename)))
newArray.paste_all(curarray, mask_after=False)
newArray.mask_data(nullvalue)
return newArray
[docs]
def as_WolfArray(self, abs:bool=True) -> "WolfArrayModel":
"""
Return a WolfArrayModel object from this WolfArrayModel
:param abs: if ``True``, use absolute coordinates
"""
NewArray = type(self)(mold=self)
if abs:
NewArray.origx += self.translx
NewArray.origy += self.transly
NewArray.translx = 0.
NewArray.transly = 0.
return NewArray
# ========================================================================
# Volume & Surface Estimation
# ========================================================================
[docs]
def volume_estimation(self,
zmax:float = None,
nb:int = 10,
labeled:bool = False,
use_memory:bool = False,
memory_key:str = None,
output_fn:str = None,
axs:plt.Axes = None):
"""Estimation of the volume of the selected zone.
:param zmax: maximum elevation (if None, uses array max)
:param nb: number of elevation steps
:param labeled: if True, use labeling to identify connected areas
:param use_memory: if True, use memory selection to identify areas
:param memory_key: key of the memory selection to use (if use_memory)
:param output_fn: filename for zoning result (if None, no file is written)
:param axs: matplotlib Axes to plot the results
:return: (fig, axs) or None if cancelled
"""
vect = self.array[np.logical_not(self.array.mask)].flatten()
zmin = np.amin(vect)
if zmax is None:
zmax = np.amax(vect)
deltaz = (zmax - zmin) / nb
curz = zmin
labeled_areas = []
stockage = []
z = []
keys_select = list(self.SelectionData.selections.keys())
if use_memory:
if memory_key is not None:
key = memory_key
elif len(keys_select) == 1:
key = keys_select[0]
elif len(keys_select) > 1:
key = keys_select[0]
logging.warning(_('Multiple memory selections available, using first one: %s') % key)
else:
logging.error(_('No memory selection available'))
return
xy_key = self.SelectionData.selections[key]['select']
if len(xy_key) == 0:
logging.error(_('Empty selection'))
return
ij_key = self.xy2ij_np(xy_key)
extensionmax = type(self)(mold=self)
extensionmax.array[:, :] = 0.
if labeled:
for i in tqdm(range(nb + 1)):
z.append(curz)
# Compute difference between the array and the current elevation
if i == 0:
diff = self.array - (curz + 1.e-3)
else:
diff = self.array - curz
# Keep only the negative values
diff[diff > 0] = 0.
diff.data[diff.mask] = 0.
if np.count_nonzero(diff < 0.) > 1:
# More than one cell below the elevation
# Labeling of the cells below the elevation
labeled_array, num_features = label(diff.data)
# Applying the same mask as the original array
labeled_array = ma.asarray(labeled_array)
labeled_array.mask = self.array.mask
if use_memory:
# Use only the labeled areas containing the cells stored in memory selection
labeled_areas = []
for curij in ij_key:
labeled_areas.append(labeled_array[curij[0], curij[1]])
# Remove masked value
labeled_areas = [x for x in labeled_areas if x is not ma.masked]
# Remove duplicates
labeled_areas = list(set(labeled_areas))
for curarea in labeled_areas:
if curarea ==0:
volume = 0.
surface = 0.
continue
# Search
mask = labeled_array == curarea
area = np.where(mask)
volume = -self.dx * self.dy * np.sum(diff[area])
surface = self.dx * self.dy * len(area[0])
extensionmax.array[np.logical_and(mask, extensionmax.array[:, :] == 0.)] = float(i + 1)
else:
labeled_areas = list(sum_labels(np.ones(labeled_array.shape, dtype=np.int32), labeled_array, range(1, num_features+1)))
labeled_areas = [[x, y] for x, y in zip(labeled_areas, range(1, num_features+1))]
labeled_areas.sort(key=lambda x: x[0], reverse=True)
jmax = labeled_areas[0][1]
nbmax = labeled_areas[0][0]
volume = -self.dx * self.dy * np.sum(diff[labeled_array == jmax])
surface = self.dx * self.dy * nbmax
extensionmax.array[np.logical_and(labeled_array == jmax, extensionmax.array[:, :] == 0.)] = float(i + 1)
else:
# Only one cell below the elevation
volume = -self.dx * self.dy * np.sum(diff)
surface = self.dx * self.dy * np.count_nonzero(diff<0.)
extensionmax.array[np.logical_and(diff[:,:]<0., extensionmax.array[:, :] == 0.)] = float(i + 1)
stockage.append([volume, surface])
curz += deltaz
else:
for i in tqdm(range(nb + 1)):
z.append(curz)
if i == 0:
diff = self.array - (curz + 1.e-3)
else:
diff = self.array - curz
diff[diff > 0] = 0.
diff.data[diff.mask] = 0.
volume = -self.dx * self.dy * np.sum(diff)
surface = self.dx * self.dy * np.count_nonzero(diff<0.)
stockage.append([volume, surface])
curz += deltaz
extensionmax.array[np.logical_and(diff[:,:]<0., extensionmax.array[:, :] == 0.)] = float(i + 1)
if output_fn is None:
output_fn = self._prompt_save_file(
_('Choose filename for zoning result'),
'bin (*.bin)|*.bin|tif (*.tif)|*.tif|All (*.*)|*.*')
if output_fn is not None:
extensionmax.filename = output_fn
extensionmax.write_all()
if axs is None:
fig, axs = plt.subplots(1, 2, tight_layout=True)
else:
fig = axs[0].get_figure()
axs[0].plot(z, [x[0] for x in stockage])
axs[0].scatter(z, [x[0] for x in stockage])
axs[0].set_xlabel(_("Elevation [m]"), size=15)
axs[0].set_ylabel(_("Volume [$m^3$]"), size=15)
axs[1].step(z, [x[1] for x in stockage], where='post')
axs[1].scatter(z, [x[1] for x in stockage])
axs[1].set_xlabel(_("Elevation [m]"), size=15)
axs[1].set_ylabel(_("Surface [$m^2$]"), size=15)
fig.suptitle(_("Retention capacity of the selected zone"), fontsize=20)
if output_fn is not None:
with open(output_fn[:-4] + '_hvs.txt', 'w') as f:
f.write('H [m]\tZ [m DNG]\tVolume [$m^3$]\tSurface [$m^2$]\n')
for curz, (curv, curs) in zip(z, stockage):
f.write('{}\t{}\t{}\t{}\n'.format(curz - zmin, curz, curv, curs))
return fig, axs
[docs]
def surface_volume_estimation_from_elevation(self, axs:plt.Axes= None,
desired_zmin:float = None, desired_zmax:float = None,
nb:int = 100,
method:Literal['all below', 'largest area', 'selected'] = 'largest area',
selected_cells:list[tuple[float, float]] = None,
dirout:str | Path = None,
invert_z:bool = False,
array_to_integrate:"WolfArrayModel" = None
):
""" Estimation of the surface and volume from an elevation array.
:param axs: Axes to plot the results
:param desired_zmin: Minimum elevation to consider
:param desired_zmax: Maximum elevation to consider
:param nb: Number of elevation steps
:param method: Method to use for the estimation ('all below', 'largest area', 'selected')
:param selected_cells: List of kernel cells (if method is 'selected') -- Consider only the areas containing these cells
:param dirout: Directory to save the results
:param invert_z: if ``True``, invert the elevation axis (useful for bathymetry)
:param array_to_integrate: optional array whose values are integrated instead of pure surface area
:return: Figure and Axes with the results
:rtype: tuple[plt.Figure, plt.Axes]
"""
assert method in ['all below', 'largest area', 'selected'], _('Method must be one of "all below", "largest area", or "selected"')
if array_to_integrate is not None:
assert isinstance(array_to_integrate, WolfArrayModel), _('array_to_integrate must be a WolfArrayModel instance')
assert array_to_integrate.nbx == self.nbx and array_to_integrate.nby == self.nby, _('array_to_integrate must have the same dimensions as the current WolfArrayModel')
vect = self.array[np.logical_not(self.array.mask)].flatten()
zmin = np.amin(vect)
zmax = np.amax(vect)
if desired_zmin is not None:
zmin = desired_zmin
if desired_zmax is not None:
zmax = desired_zmax
deltaz = (zmax - zmin) / nb
curz = zmin
labeled_areas = []
stockage = []
z = []
if method == 'all below':
labeled = False
else:
labeled = True
if method == 'largest area':
use_memory = False
else:
xy_key = np.asarray(selected_cells, dtype=np.float64)
if xy_key is None:
logging.error(_('Empty selection'))
return
if len(xy_key) == 0:
logging.error(_('Empty selection'))
return
ij_key = self.xy2ij_np(xy_key)
use_memory = True
extensionmax = type(self)(mold=self)
extensionmax.array[:, :] = 0.
if labeled:
for i in tqdm(range(nb + 1)):
z.append(curz)
# Compute difference between the array and the current elevation
if i == 0:
diff = self.array - (curz + 1.e-3)
else:
diff = self.array - curz
# Keep only the negative values
diff[diff > 0] = 0.
diff.data[diff.mask] = 0.
if np.count_nonzero(diff < 0.) > 1:
# More than one cell below the elevation
# Labeling of the cells below the elevation
labeled_array, num_features = label(diff.data)
# Applying the same mask as the original array
labeled_array = ma.asarray(labeled_array)
labeled_array.mask = self.array.mask
if use_memory:
# Use only the labeled areas containing the cells stored in selected_cells
labeled_areas = []
for curij in ij_key:
labeled_areas.append(labeled_array[curij[0], curij[1]])
# Remove masked value
labeled_areas = [x for x in labeled_areas if x is not ma.masked]
# Remove duplicates
labeled_areas = list(set(labeled_areas))
for curarea in labeled_areas:
if curarea == 0:
volume = 0.
surface = 0.
continue
# Search
mask = labeled_array == curarea
area = np.argwhere(mask)
if array_to_integrate is None:
volume = -self.dx * self.dy * np.ma.sum(diff[area])
else:
volume = self.dx * self.dy * np.ma.sum(array_to_integrate.array[area[:,0], area[:,1]])
surface = self.dx * self.dy * area.shape[0]
extensionmax.array[np.logical_and(mask, extensionmax.array == 0.)] = float(i + 1)
else:
labeled_areas = list(sum_labels(np.ones(labeled_array.shape, dtype=np.int32), labeled_array, range(1, num_features+1)))
labeled_areas = [[x, y] for x, y in zip(labeled_areas, range(1, num_features+1))]
labeled_areas.sort(key=lambda x: x[0], reverse=True)
jmax = labeled_areas[0][1]
nbmax = labeled_areas[0][0]
if array_to_integrate is None:
volume = -self.dx * self.dy * np.ma.sum(diff[labeled_array == jmax])
else:
volume = self.dx * self.dy * np.ma.sum(array_to_integrate.array[labeled_array == jmax])
surface = self.dx * self.dy * nbmax
extensionmax.array[np.logical_and(labeled_array == jmax, extensionmax.array == 0.)] = float(i + 1)
else:
# Only one cell below the elevation
if array_to_integrate is None:
volume = -self.dx * self.dy * np.ma.sum(diff)
else:
volume = self.dx * self.dy * np.ma.sum(array_to_integrate.array[diff < 0.])
surface = self.dx * self.dy * np.count_nonzero(diff<0.)
extensionmax.array[np.logical_and(diff[:,:]<0., extensionmax.array[:, :] == 0.)] = float(i + 1)
stockage.append([abs(volume), abs(surface)])
curz += deltaz
else:
for i in tqdm(range(nb + 1)):
z.append(curz)
if i == 0:
diff = self.array - (curz + 1.e-3)
else:
diff = self.array - curz
diff[diff > 0] = 0.
diff.data[diff.mask] = 0.
if array_to_integrate is None:
volume = -self.dx * self.dy * np.ma.sum(diff)
surface = self.dx * self.dy * np.count_nonzero(diff<0.)
stockage.append([abs(volume), abs(surface)])
else:
ij = np.argwhere(diff < 0.)
volume = self.dx * self.dy * np.ma.sum(array_to_integrate.array[ij[:,0], ij[:,1]])
surface = self.dx * self.dy * ij.shape[0]
stockage.append([volume, abs(surface)])
curz += deltaz
extensionmax.array[np.logical_and(diff[:,:]<0., extensionmax.array[:, :] == 0.)] = float(i + 1)
if dirout is None:
dirout = Path(self.filename).parent / 'surface_volume'
if isinstance(dirout, str):
dirout = Path(dirout)
if not dirout.exists():
dirout.mkdir(parents=True, exist_ok=True)
extensionmax.filename = str(dirout / 'surface_volume_extension.tif')
extensionmax.write_all()
if axs is None:
fig, axs = plt.subplots(1, 2, tight_layout=True)
else:
fig = axs[0].get_figure()
if invert_z:
z = [abs(zmin - x) for x in z]
labelx = _("Water depth [m]")
else:
labelx = _("Elevation [m]")
axs[0].plot(z, [x[0] for x in stockage])
axs[0].scatter(z, [x[0] for x in stockage])
axs[0].set_xlabel(labelx, size=10)
axs[0].set_ylabel(_("Volume [$m^3$]"), size=10)
axs[1].step(z, [x[1] for x in stockage], where='post')
axs[1].scatter(z, [x[1] for x in stockage])
axs[1].set_xlabel(labelx, size=10)
axs[1].set_ylabel(_("Surface [$m^2$]"), size=10)
fig.suptitle(_("Retention capacity"), fontsize=12)
fig.savefig(dirout / 'surface_volume.png', dpi=300)
fn = dirout / 'surface_volume_hvs.txt'
with open(fn, 'w') as f:
f.write('H [m]\tZ [m DNG]\tVolume [$m^3$]\tSurface [$m^2$]\n')
for curz, (curv, curs) in zip(z, stockage):
f.write('{}\t{}\t{}\t{}\n'.format(curz - zmin, curz, curv, curs))
return fig, axs
[docs]
def surface_volume_estimation_from_waterdepth(self, axs:plt.Axes=None,
desired_zmin:float = None, desired_zmax:float = None,
nb:int = 100,
method:Literal['all below', 'largest area', 'selected'] = 'largest',
selected_cells:list[tuple[float, float]] = None,
dirout:str | Path = None):
""" Estimation of the surface and volume from a water depth array.
:param axs: Axes to plot the results
:param desired_zmin: Minimum water depth to consider
:param desired_zmax: Maximum water depth to consider
:param nb: Number of water depth steps
:param method: Method to use for the estimation ('all below', 'largest area', 'selected')
:param selected_cells: List of kernel cells (if method is 'selected') -- Consider only the areas containing these cells
:param dirout: Directory to save the results
:return: Figure and Axes with the results
:rtype: tuple[plt.Figure, plt.Axes]
"""
neg_array = type(self)(mold=self)
neg_array.array[:,:] = -self.array
return neg_array.surface_volume_estimation_from_elevation(desired_zmin=-desired_zmax if desired_zmax is not None else None,
desired_zmax=-desired_zmin if desired_zmin is not None else None,
nb=nb, method=method,
selected_cells=selected_cells,
dirout=dirout if dirout is not None else Path(self.filename).parent / 'surface_volume',
axs=axs, invert_z=True)
# ========================================================================
# Gradient, Hillshade & Contours
# ========================================================================
[docs]
def hillshade(self, azimuth:float, angle_altitude:float):
"""Create a hillshade array -- see "hillshade" function accelerated by JIT
:param azimuth: sun azimuth angle in degrees (default: 315°)
:param angle_altitude: sun altitude angle in degrees above the horizon
"""
if self.shaded is None:
logging.error(_('No shaded array'))
return
self.shaded.set_header(self.get_header())
self.shaded.array = hillshade(self.array.data, azimuth, angle_altitude)
self.shaded.delete_lists()
[docs]
def get_gradient_norm(self):
""" Compute and return the norm of the gradient """
mygradient = type(self)(mold=self)
x, y = np.gradient(self.array, self.dx, self.dy)
mygradient.array = ma.asarray(np.pi / 2. - np.arctan(np.sqrt(x * x + y * y)))
mygradient.array.mask = self.array.mask
return mygradient
[docs]
def get_laplace(self):
""" Compute and return the laplacian """
mylap = type(self)(mold=self)
mylap.array = ma.asarray(laplace(self.array) / self.dx ** 2.)
mylap.array.mask = self.array.mask
return mylap
[docs]
def suxsuy_contour(self,
filename:str='',
abs:bool=False,
one_vec_if_ml:bool = True) -> tuple[list[int,int],
list[int,int],
vector | zone,
bool]:
"""
The borders are computed on basis of the current *mask*
:param filename: if provided, write 'sux', 'sux' and 'xy' files
:param abs: add translation coordinates (Global World Coordinates)
:param one_vec_if_ml: if ``True``, merge multi-level contour segments into a single vector
:return: indicesX, indicesY, contourgen, interior
indicesX : list of coupled indices along X - vertical border - 1-based like Fortran
indicesY : list of coupled indices along Y - horizontal border - 1-based like Fortran
contourgen : external contour
interior : if False, contour is unique ; if True, interior contours exist -> interior parts are merged
"""
# Calcul des bords libres SUX, SUY
indicesX=[]
indicesY=[]
locls=[]
dx = self.dx
dy = self.dy
translx = self.origx
transly = self.origy
if abs:
translx += self.translx
transly += self.transly
horiz = np.where(self.array.mask[0:self.nbx-1,0:self.nby-1] ^ self.array.mask[1:self.nbx,0:self.nby-1])
vert = np.where(self.array.mask[0:self.nbx-1,0:self.nby-1] ^ self.array.mask[0:self.nbx-1,1:self.nby])
for i,j in zip(horiz[0],horiz[1]):
x1 = float(i+1) * dx + translx
y1 = float(j+1) * dy + transly
indicesX.append([i+2, j+1])
locls.append(LineString([[x1,y1-dy],[x1,y1]]))
for i,j in zip(vert[0],vert[1]):
x1 = float(i+1) * dx + translx
y1 = float(j+1) * dy + transly
indicesY.append([i+1, j+2])
locls.append(LineString([[x1-dx,y1],[x1,y1]]))
if not locls:
raise Exception(_("I can't detect any contour. Is this right ?"))
interior=False
# generate contour from partial linestring --> using Shapely to do that !
contour = linemerge(locls)
if contour.geom_type == 'LineString':
# All is fine - only one vector
xy = np.asarray(contour.coords)
nb = len(xy)
contourgen = vector(name='external border')
for x,y in xy:
contourgen.add_vertex(wolfvertex(x,y))
elif contour.geom_type == 'MultiLineString':
if one_vec_if_ml:
interior=True
# Multiple vectors --> combine
# searching the longest LineString -> external contour
contour:MultiLineString
lenghts=[mygeom.length for mygeom in contour.geoms]
ind = np.argmax(lenghts)
xyall=[np.column_stack([np.asarray(mygeom.coords),np.zeros(len(mygeom.coords))]) for mygeom in contour.geoms]
# coordinates of the longest LineString
xy = xyall[ind]
for i in range(len(xyall)):
if i!=ind:
# Concatenate local LineString to the external contour + 2 connection segments
# Z coordinate is set to 1. -> will be used to check it after and change "in_use" property
xy=np.concatenate([xy,
np.asarray([xyall[i][0,0],xyall[i][0,1],1.]).reshape([1,3]),
xyall[i][1:],
np.asarray([xy[0,0],xy[0,1],1.]).reshape([1,3])])
nb = len(xy)
contourgen = vector(name='external border')
for x,y,z in xy:
contourgen.add_vertex(wolfvertex(x,y,z))
contourgen.myvertices[-1].in_use = z == 0. # the new vertex is related to a connection segment --> ignore for numerical precision in intersection operations/calculations
else:
contourgen = zone(name = 'contour')
for cur_ls in contour.geoms:
xy = np.asarray(cur_ls.coords)
nb = len(xy)
cur_vec = vector(name='external border')
for x,y in xy:
cur_vec.add_vertex(wolfvertex(x,y))
contourgen.add_vector(cur_vec, forceparent=True)
else:
contourgen = None
err = _(f"Unsupported Shapely contour result: {contour.geom_type}")
logging.warning(err)
if filename != '':
# There is some knowledge about SUX, SUY, XY files there:
# https://gitlab.uliege.be/HECE/wolf-interface/-/blob/master/InterfaceVB6/FrmAffichage.frm
with open(filename+'.sux','w') as f:
np.savetxt(f,np.asarray(indicesX), delimiter=',', fmt='%u,%u')
with open(filename + '.suy', 'w') as f:
np.savetxt(f, np.asarray(indicesY), delimiter=',', fmt='%u,%u')
with open(filename + '.xy', 'w') as f:
f.write('{}\n'.format(nb))
# FIXME Stef commented that. Maybe wrong !!!
# xy[:,0]-=translx-self.origx
# xy[:,1]-=transly-self.origy
np.savetxt(f, xy[:,:2], delimiter='\t')
return indicesX,indicesY,contourgen,interior
[docs]
def contour(self, levels:Union[int, list[float]] = 10) -> Zones:
"""Compute contour lines
:param levels: list of contour levels to extract
"""
if isinstance(levels, int):
levels = np.linspace(self.array.min(), self.array.max(), levels)
x, y = self.meshgrid()
cs = plt.contour(x, y, self.array, levels=levels)
zones = Zones(idx = self.idx + '_contour', mapviewer = self.get_mapviewer())
if hasattr(cs, 'collections'):
# MATPLOTLIB <=3.8
logging.debug('MATPLOTLIB <=3.8')
logging.debug(_('DEPRECATED: MATPLOTLIB <=3.8 -- You should consider upgrading to a more recent version of Matplotlib'))
logging.debug(_('Take care Basemap may not be available in recent versions of mpl_toolkits'))
for collection, level in zip(cs.collections, cs.levels):
zone_level = zone(name=f'Contour {level}')
for idx, path in enumerate(collection.get_paths()):
vector_level = vector(name=f'Contour {level} - {idx}')
zone_level.add_vector(vector_level, forceparent=True)
for vertices in path.to_polygons(closed_only=False):
for vertex in vertices:
vector_level.add_vertex(wolfvertex(vertex[0], vertex[1]))
zones.add_zone(zone_level, forceparent=True)
else:
# MATPLOTLIB >3.8
logging.debug('MATPLOTLIB >3.8')
for path, level in zip(cs.get_paths(), cs.levels):
zone_level = zone(name=f'Contour {level}')
for idx, vertices in enumerate(path.to_polygons(closed_only=False)):
vector_level = vector(name=f'Contour {level} - {idx}')
zone_level.add_vector(vector_level, forceparent=True)
for vertex in vertices:
vector_level.add_vertex(wolfvertex(vertex[0], vertex[1]))
zones.add_zone(zone_level, forceparent=True)
return zones
# ========================================================================
# Hole Filling & Inpainting
# ========================================================================
[docs]
def inpaint(self, mask_array:"WolfArrayModel" = None, test_array:"WolfArrayModel" = None,
ignore_last:int = 1, multiprocess:bool = True):
""" InPaintaing holes in the array
:param mask_array: where computation is done
:param test_array: used in test -- interpolation is accepted if new value is over test_array
:param ignore_last: number of last patches to ignore
:param multiprocess: use multiprocess for inpainting
"""
from ..eikonal import inpaint_array
if mask_array is None:
mask_array = self.array.mask
elif isinstance(mask_array, WolfArrayModel):
mask_array = mask_array.array.data
if test_array is None:
test_array = np.ones(self.array.data.shape) * self.array.data.min()
elif isinstance(test_array, WolfArrayModel):
test_array = test_array.array.data
time, wl_np, wd_np = inpaint_array(self.array.data,
mask_array,
test_array,
ignore_last_patches= ignore_last,
dx = self.dx,
dy = self.dy,
NoDataValue = self.nullvalue,
inplace=True,
multiprocess= multiprocess)
# Create water depth array
# Force the same type as the current array
# (in case of calling it from a child class with a different type like GUI array)
wd = type(self)(mold=self)
wd.array[:,:] = wd_np[:,:]
wd.mask_data(self.nullvalue)
wl = self
self.mask_data(self.nullvalue)
self.reset_plot()
return time, wl, wd
[docs]
def _inpaint_waterlevel_dem_dtm(self, dem:"WolfArrayModel", dtm:"WolfArrayModel",
ignore_last:int = 1, use_fortran:bool = False,
multiprocess:bool = True):
""" InPaintaing waterlevel holes in the array.
We use DEM and DTM to mask and constraint the inpainting process.
:param dem: Digital Elevation Model (same as simulation model)
:param dtm: Digital Terrain Model
:param ignore_last: number of last patches to ignore
:param use_fortran: use Fortran inpainting code
:param multiprocess: use multiprocess for inpainting
"""
if use_fortran:
import shutil
from tempfile import TemporaryDirectory
# check if hoels.exe is available in PATH
if shutil.which('holes.exe') is None:
logging.error(_('holes.exe not found in PATH'))
logging.info(_('We use the Python version of inpainting'))
use_fortran = False
else:
with TemporaryDirectory() as tmpdirname:
# create mask from DEM, DTM and WL
mask = self._create_building_holes_dem_dtm(dem, dtm, ignore_last)
# save mask and dtm to temporary files
locdir = Path(tmpdirname)
wl_name = locdir / 'array.bin'
mask_name = locdir / 'mask.bin'
dtm_name = locdir / 'dtm.bin'
mask.write_all(mask_name)
dtm.write_all(dtm_name)
oldname = self.filename
self.write_all(wl_name)
self.filename = oldname
# call holes.exe
olddir = os.getcwd()
# change to temporary directory
os.chdir(locdir)
shell_command = f'holes.exe inpaint in={wl_name} mask={mask_name} dem={dtm_name} avoid_last={ignore_last} out=temp'
logging.info(shell_command)
logging.info('Inpainting holes using holes.exe')
os.system(shell_command)
logging.info('Done - reading inpainted array')
# read inpainted array
wl = type(self)(locdir / 'temp_combl.tif')
wd = type(self)(locdir / 'temp_h.tif')
wd.array[wd.array < 0.] = 0.
wd.mask_data(0.)
os.chdir(olddir)
self.array.data[:,:] = wl.array.data[:,:]
time = None
if not use_fortran:
from ..eikonal import inpaint_waterlevel
time, wl_np, wd_np = inpaint_waterlevel(self.array.data,
dem.array.data,
dtm.array.data,
ignore_last_patches= ignore_last,
dx = self.dx,
dy = self.dy,
NoDataValue = self.nullvalue,
inplace=True,
multiprocess= multiprocess)
wd = type(self)(mold=self)
wd.array[:,:] = wd_np[:,:]
wd.mask_data(self.nullvalue)
wl = self
self.mask_data(self.nullvalue)
self.reset_plot()
return time, wl, wd
[docs]
def count_holes(self, mask:"WolfArrayModel" = None):
"""Count holes in the array
:param mask: boolean mask array; if ``None``, use :attr:`array.mask`
"""
from ..eikonal import count_holes
if mask is None:
mask = self.array.mask
elif isinstance(mask, WolfArrayModel):
mask = mask.array.data
return count_holes(mask)
[docs]
def select_holes(self, mask:"WolfArrayModel" = None, ignore_last:int = 1):
"""Select holes in the array
:param mask: boolean mask array; if ``None``, use :attr:`array.mask`
:param ignore_last: if ``True``, ignore the largest (background) hole
"""
if mask is None:
mask = self.array.mask
elif isinstance(mask, WolfArrayModel):
mask = mask.array.data
labels, numfeatures = label(mask)
# count cells in holes
nb_cells = np.bincount(labels.ravel())
# sort by number of cells
idx = np.argsort(nb_cells)
for i in range(ignore_last):
labels[labels == idx[-1-i]] = 0
# select holes but ignoring last ones
ij = np.argwhere(labels)
# Convert i,j to x,y
xy = self.ij2xy_np(ij)
self.SelectionData.myselection = list(xy)
self.SelectionData.update_nb_nodes_selection()
[docs]
def create_mask_holes(self, ignore_last:int = 1) -> "WolfArrayModel":
"""Select holes in the array and create a new aray
:param ignore_last: if ``True``, ignore the largest (background) hole
"""
labels, numfeatures = label(self.array.mask)
# count cells in holes
nb_cells = np.bincount(labels.ravel())
# sort by number of cells
idx = np.argsort(nb_cells)
for i in range(ignore_last):
labels[labels == idx[-1-i]] = 0
newarray = type(self)(mold=self)
# newarray.allocate_ressources()
newarray.array.data[:,:] = labels
newarray.array.mask[:,:] = ~labels.astype(bool)
newarray.set_nullvalue_in_mask()
return newarray
[docs]
def fill_holes_with_value(self, value:float, mask:"WolfArrayModel" = None, ignore_last:int = 1):
"""Fill holes in the array with a value
:param value: value to fill holes with
:param mask: boolean mask defining the holes; if ``None``, auto-detect
:param ignore_last: if ``True``, ignore the largest (background) hole
"""
if mask is None:
mask = self.array.mask
labels, numfeatures = label(mask)
# count cells in holes
nb_cells = np.bincount(labels.ravel())
# sort by number of cells
idx = np.argsort(nb_cells)
for i in range(ignore_last):
labels[labels == idx[-1-i]] = 0
self.array.data[labels > 0] = value
self.array.mask[labels > 0] = False # unmask filled data
[docs]
def fill_holes_with_value_if_intersects(self, value:float,
vects:list[vector] | Zones,
method:Literal['matplotlib','shapely_strict', 'shapely'] = 'shapely_strict',
buffer:float = 0.):
"""Fill holes in the array with a value if it intersects with the mask
:param value: value to fill holes with
:param vects: list of polygon vectors for intersection test
:param method: spatial method for intersection check
:param buffer: buffer distance for intersection test
"""
if isinstance(vects, Zones):
vects = vects.concatenate_all_vectors()
for vec in vects:
self._fill_holes_with_value_if_intersects(value, vec, method, buffer)
# list(map(self._fill_holes_with_value_if_intersects, [value]*len(vects), vects, [method]*len(vects), [buffer]*len(vects))) # parallel version
[docs]
def _fill_holes_with_value_if_intersects(self, value:float,
vect:vector,
method:Literal['matplotlib','shapely_strict', 'shapely'] = 'shapely_strict',
buffer:float = 0.) -> "WolfArrayModel":
"""Internal helper: fill a single labelled hole if it intersects the given polygon.
:param value: value to fill holes with
:param vect: polygon vector for intersection test
:param method: spatial method for intersection check
:param buffer: buffer distance for intersection test
"""
if buffer > 0.:
# As we use a buffer, we check the intersction with the buffer...
if self.intersects_polygon(vect, method=method, buffer=buffer):
# ...but we fill the original polygon, not the buffered one
ij = self.get_ij_inside_polygon(vect, method=method, usemask=False)
self.array.data[ij[:,0], ij[:,1]] = value
self.array.mask[ij[:,0], ij[:,1]] = False
else:
ij = self.get_ij_inside_polygon(vect, method=method, buffer=buffer, usemask=False)
if ij.size[0] > 0:
self.array.data[ij[:,0], ij[:,1]] = value
self.array.mask[ij[:,0], ij[:,1]] = False
[docs]
def _create_building_holes_dem_dtm(self, dem:"WolfArrayModel", dtm:"WolfArrayModel", ignore_last:int = 1) -> "WolfArrayModel":
"""Create a mask identifying building footprints from the difference between DEM and DTM.
:param dem: Digital Elevation Model array
:param dtm: Digital Terrain Model array
:param ignore_last: if ``True``, ignore the largest (background) hole
"""
buildings = dem - dtm
buildings.array[buildings.array < 0.] = 0.
buildings.array[buildings.array > 0.] = dtm.array[buildings.array > 0.]
buildings.array[buildings.array == 0.] = dem.array.max() + 1.
return buildings
# ========================================================================
# Triangulation & Mesh
# ========================================================================
[docs]
def get_triangulation(self, bounds:list[float]=None):
"""
Traingulation of the array
:param bounds: [[xmin,xmax],[ymin,ymax]]
"""
all = bounds is None
if bounds is None:
ox = self.origx + self.translx
oy = self.origy + self.transly
nbx = self.nbx
nby = self.nby
i1 = 0
i2 = self.nbx
j1 = 0
j2 = self.nby
bounds = [[ox,ox+float(nbx)*self.dx],[oy,oy+float(nby)*self.dy]]
else:
ox = max(self.origx, bounds[0][0])
oy = max(self.origy, bounds[1][0])
i1, j1 = self.get_ij_from_xy(ox, oy)
i2, j2 = self.get_ij_from_xy(bounds[0][1], bounds[1][1])
i1 = max(i1, 0)
j1 = max(j1, 0)
i2 = min(i2 + 1, self.nbx)
j2 = min(j2 + 1, self.nby)
nbx = i2 - i1
nby = j2 - j1
refx = ox
refy = oy
x = np.arange(self.dx / 2. + refx, float(nbx) * self.dx + self.dx / 2 + refx, self.dx)
y = np.arange(self.dy / 2. + refy, float(nby) * self.dy + self.dy / 2 + refy, self.dy)
znull = np.min(self.array[i1:i2, j1:j2])-1.
if all:
locarr = np.copy(self.array.data)
locarr[self.array.mask] = znull
points = np.meshgrid(x,y)
points = np.concatenate((points[0].flatten(), points[1].flatten(),locarr.flatten())).reshape([3,len(x)*len(y)]).transpose()
else:
points = np.asarray(
[[xx, yy, self.get_value(xx + ox - refx, yy + oy - refy, nullvalue=znull)] for xx in x for yy in y],
dtype=np.float32)
decal = 0
triangles = []
triangles.append([[i + decal, i + decal + 1, i + decal + nby] for i in range(nby - 1)])
triangles.append([[i + decal + nby, i + decal + 1, i + decal + nby + 1] for i in range(nby - 1)])
for k in tqdm(range(1, nbx - 1)):
decal = k * nby
triangles.append([[i + decal, i + decal + 1, i + decal + nby] for i in range(nby - 1)])
triangles.append([[i + decal + nby, i + decal + 1, i + decal + nby + 1] for i in range(nby - 1)])
triangles = np.asarray(triangles, dtype=np.uint32).reshape([(2 * nby - 2) * (nbx - 1), 3])
mytri = Triangulation(pts = points, tri = triangles)
return mytri, znull
# ========================================================================
# Visualization (Matplotlib)
# ========================================================================
[docs]
def compare_cloud(self, mycloud:cloud_vertices, delta:list[float] = [.15, .5, 1.]):
"""
Graphique de comparaison des valeurs d'un nuage de points et des valeurs de la matrice sous les mêmes positions
:param mycloud: cloud_vertices
:param delta: list of tolerance for the comparison
"""
# Get the values of the cloud
xyz_cloud = mycloud.get_xyz()
# Get values of the array at the same positions
zarray = np.array([self.get_value(curxy[0],curxy[1]) for curxy in xyz_cloud])
# count the number of points outside the array
nbout = np.count_nonzero(zarray==-99999)
# Get the values of the cloud that are not outside the array
# Separate XY and Z values (cloud and array)
# - z values
z_cloud = xyz_cloud[zarray!=-99999][:,2]
# - xy values
xy_cloud = xyz_cloud[zarray!=-99999][:,:2]
# - array values
zarray = zarray[zarray!=-99999]
# concatenate all z values
zall = np.concatenate([z_cloud,zarray])
# find the min and max values
zmin = np.min(zall)
zmax = np.max(zall)
# compute differences
diffz = zarray-z_cloud
# choose a colormap
cmap = plt.cm.get_cmap('RdYlBu')
mindiff = np.min(diffz)
maxdiff = np.max(diffz)
# Plot the differences [0] and the position [1]
fig,ax = plt.subplots(2,1)
ax[0].set_title(_('Comparison Z - ') + str(nbout) + _(' outside points on ') + str(len(xyz_cloud)))
sc0 = ax[0].scatter(z_cloud,zarray,s=10,c=diffz,cmap = cmap, vmin=mindiff, vmax=maxdiff)
ax[0].set_xlabel(_('Scatter values'))
ax[0].set_ylabel(_('Array values'))
ax[0].set_xlim([zmin,zmax])
ax[0].set_ylim([zmin,zmax])
ax[0].plot([zmin,zmax],[zmin,zmax], color='black')
if delta is not None:
if isinstance(delta, list):
for idx, curdelta in enumerate(delta):
curdelta = abs(float(curdelta))
ax[0].plot([zmin,zmax],[zmin+delta,zmax+delta], 'k--', alpha=1.-1./(idx+1))
ax[0].plot([zmin,zmax],[zmin-delta,zmax-delta], 'k--', alpha=1.-1./(idx+1))
ax[0].axis('equal')
sc1 = ax[1].scatter(xy_cloud[:,0],xy_cloud[:,1],s=10,c=diffz,cmap = cmap, vmin=mindiff, vmax=maxdiff)
fig.colorbar(sc1)
ax[1].axis('equal')
plt.show()
[docs]
def compare_tri(self,mytri:Triangulation):
"""Plot a comparison between array values and triangulation vertex elevations.
:param mytri: :class:`Triangulation` object to compare against the array
"""
xyz_cloud = mytri.pts
zarray = np.array([self.get_value(curxy[0],curxy[1]) for curxy in xyz_cloud])
nbout = np.count_nonzero(zarray==-99999)
z_cloud = xyz_cloud[zarray!=-99999][:,2]
xy_cloud = xyz_cloud[zarray!=-99999][:,:2]
zarray = zarray[zarray!=-99999]
zall = np.concatenate([z_cloud,zarray])
zmin = np.min(zall)
zmax = np.max(zall)
diffz = zarray-z_cloud
cmap = plt.cm.get_cmap('RdYlBu')
mindiff = np.min(diffz)
maxdiff = np.max(diffz)
fig,ax = plt.subplots(2,1)
ax[0].set_title(_('Comparison Z - ') + str(nbout) + _(' outside points on ') + str(len(xyz_cloud)))
sc0 = ax[0].scatter(z_cloud,zarray,s=10,c=diffz,cmap = cmap, vmin=mindiff, vmax=maxdiff)
ax[0].set_xlabel(_('Scatter values'))
ax[0].set_ylabel(_('Array values'))
ax[0].set_xlim([zmin,zmax])
ax[0].set_ylim([zmin,zmax])
ax[0].plot([zmin,zmax],[zmin,zmax])
ax[0].axis('equal')
sc1 = ax[1].scatter(xy_cloud[:,0],xy_cloud[:,1],s=10,c=diffz,cmap = cmap, vmin=mindiff, vmax=maxdiff)
fig.colorbar(sc1)
ax[1].axis('equal')
plt.show()
[docs]
def plot_matplotlib(self, figax:tuple=None,
getdata_im:bool=False,
update_palette:bool=True,
vmin:float=None, vmax:float=None,
figsize:tuple=None,
Walonmap:bool=False,
cat:Literal['IMAGERIE/ORTHO_1971', 'IMAGERIE/ORTHO_1994_2000', 'IMAGERIE/ORTHO_2006_2007', 'IMAGERIE/ORTHO_2009_2010',
'IMAGERIE/ORTHO_2012_2013', 'IMAGERIE/ORTHO_2015', 'IMAGERIE/ORTHO_2016',
'IMAGERIE/ORTHO_2017', 'IMAGERIE/ORTHO_2018', 'IMAGERIE/ORTHO_2019',
'IMAGERIE/ORTHO_2020', 'IMAGERIE/ORTHO_2021', 'IMAGERIE/ORTHO_2022_PRINTEMPS',
'IMAGERIE/ORTHO_2022_ETE', 'IMAGERIE/ORTHO_2023_ETE', 'IMAGERIE/ORTHO_LAST',
'orthoimage_coverage', 'orthoimage_coverage_2016', 'orthoimage_coverage_2017',
'orthoimage_coverage_2018', 'orthoimage_coverage_2019',
'orthoimage_coverage_2020', 'orthoimage_coverage_2021',
'orthoimage_coverage_2022', 'crossborder', 'crossborder_grey',
'overlay', 'topo', 'topo_grey'] = 'IMAGERIE/ORTHO_LAST',
first_mask_data:bool=True,
with_legend:bool=False,
IGN:bool=False,
Cartoweb:bool=False):
"""
Plot the array - Matplotlib version
Using imshow and RGB array
Plot the array using Matplotlib.
This method visualizes the array data using Matplotlib's `imshow` function.
It supports optional overlays, custom palettes, and value range adjustments.
Notes:
------
- The method applies a mask to the data using the `nullvalue` attribute before plotting.
- If `Walonmap` is True, the method fetches and overlays a map image using the WalOnMap service.
- The aspect ratio of the plot is set to 'equal'.
:param figax: A tuple containing a Matplotlib figure and axis (fig, ax). If None, a new figure and axis are created.
:type figax: tuple, optional (Default value = None)
:param getdata_im: If True, returns the image object along with the figure and axis. Default is False, then it only returns (fig, ax).
:type getdata_im: bool, optional (Default value = False)
:param update_palette: If True, updates the color palette before plotting. Default is True.
:type update_palette: bool, optional (Default value = True)
:param vmin: Minimum value for color scaling. If None, the minimum value is determined automatically. Default is None.
:type vmin: float, optional (Default value = None)
:param vmax: Maximum value for color scaling. If None, the maximum value is determined automatically. Default is None.
:type vmax: float, optional (Default value = None)
:param figsize: Size of the figure in inches (width, height). Only used if `figax` is None. Default is None.
:type figsize: tuple, optional (Default value = None)
:param Walonmap: If True, overlays a map image using the WalOnMap service. Default is False.
:type Walonmap: bool, optional (Default value = False)
:param cat: The category of the map to fetch from the WalOnMap service. Default is 'IMAGERIE/ORTHO_2022_ETE'.
Available orthos:
- `'IMAGERIE/ORTHO_1971'`
- `'IMAGERIE/ORTHO_1994_2000'`
- `'IMAGERIE/ORTHO_2006_2007'`
- `'IMAGERIE/ORTHO_2009_2010'`
- `'IMAGERIE/ORTHO_2012_2013'`
- `'IMAGERIE/ORTHO_2015'`
- `'IMAGERIE/ORTHO_2016'`
- `'IMAGERIE/ORTHO_2017'`
- `'IMAGERIE/ORTHO_2018'`
- `'IMAGERIE/ORTHO_2019'`
- `'IMAGERIE/ORTHO_2020'`
- `'IMAGERIE/ORTHO_2021'`
- `'IMAGERIE/ORTHO_2022_PRINTEMPS'`
- `'IMAGERIE/ORTHO_2022_ETE'`
- `'IMAGERIE/ORTHO_2023_ETE'`
- `'IMAGERIE/ORTHO_LAST'`
- 'orthoimage_coverage'
- 'orthoimage_coverage_2016',
- 'orthoimage_coverage_2017',
- 'orthoimage_coverage_2018',
- 'orthoimage_coverage_2019',
- 'orthoimage_coverage_2020',
- 'orthoimage_coverage_2021',
- 'orthoimage_coverage_2022'
- 'crossborder',
- 'crossborder_grey',
- 'overlay',
- 'topo',
- 'topo_grey'
:type cat: str, optional (Default value = `'IMAGERIE/ORTHO_2022_ETE'`)
:param first_mask_data: If True, applies the mask to the data before plotting. Default is True.
:type first_mask_data: bool, optional (Default value = True)
:param with_legend: If True, adds a color legend to the plot. Default is False.
:type with_legend: bool, optional (Default value = False)
:param IGN: if ``True``, overlay the IGN basemap
:param Cartoweb: if ``True``, overlay the Cartoweb basemap
:return: If `getdata_im` is False, returns (fig, ax), where `fig` is the Matplotlib figure and `ax` is the axis. If `getdata_im` is True, returns (fig, ax, im), where `im` is the image object created by `imshow`.
:rtype: tuple
"""
available_Walonmap = ['IMAGERIE/ORTHO_1971', 'IMAGERIE/ORTHO_1994_2000', 'IMAGERIE/ORTHO_2006_2007', 'IMAGERIE/ORTHO_2009_2010',
'IMAGERIE/ORTHO_2012_2013', 'IMAGERIE/ORTHO_2015', 'IMAGERIE/ORTHO_2016',
'IMAGERIE/ORTHO_2017', 'IMAGERIE/ORTHO_2018', 'IMAGERIE/ORTHO_2019',
'IMAGERIE/ORTHO_2020', 'IMAGERIE/ORTHO_2021', 'IMAGERIE/ORTHO_2022_PRINTEMPS',
'IMAGERIE/ORTHO_2022_ETE', 'IMAGERIE/ORTHO_2023_ETE', 'IMAGERIE/ORTHO_LAST']
available_IGN = ['orthoimage_coverage', 'orthoimage_coverage_2016', 'orthoimage_coverage_2017',
'orthoimage_coverage_2018', 'orthoimage_coverage_2019',
'orthoimage_coverage_2020', 'orthoimage_coverage_2021',
'orthoimage_coverage_2022']
available_Cartoweb = ['crossborder', 'crossborder_grey', 'overlay', 'topo', 'topo_grey']
if first_mask_data:
self.mask_data(self.nullvalue)
if update_palette:
self.updatepalette(0)
if figax is None:
fig, ax = plt.subplots(figsize=figsize)
else:
fig, ax = figax
if Walonmap:
from ..PyWMS import getWalonmap
from PIL import Image
if cat not in available_Walonmap:
logging.error(_('The category {} is not available in WalOnMap').format(cat))
logging.error(_('Available categories are: {}').format(', '.join(available_Walonmap)))
else:
try:
bounds = self.get_bounds()
scale_covery = (bounds[0][1] - bounds[0][0]) / (bounds[1][1] - bounds[1][0])
if scale_covery > 1.:
# x is larger than y
w = 2000
h = int(2000 / scale_covery)
else:
# y is larger than x
h = 2000
w = int(2000 * scale_covery)
IO_image = getWalonmap(cat, bounds[0][0], bounds[1][0],
bounds[0][1], bounds[1][1],
w=w, h=h, tofile=False) # w=self.nbx, h=self.nby
image = Image.open(IO_image)
ax.imshow(image.transpose(Image.Transpose.FLIP_TOP_BOTTOM), extent=(bounds[0][0], bounds[0][1], bounds[1][0], bounds[1][1]), alpha=1, origin='lower')
except Exception as e:
logging.error(_('Error while fetching the map image from WalOnMap'))
logging.error(e)
elif IGN:
from ..PyWMS import getNGI
from PIL import Image
if cat not in available_IGN:
logging.error(_('The category {} is not available in IGN').format(cat))
logging.error(_('Available categories are: {}').format(', '.join(available_IGN)))
else:
try:
bounds = self.get_bounds()
scale_covery = (bounds[0][1] - bounds[0][0]) / (bounds[1][1] - bounds[1][0])
if scale_covery > 1.:
# x is larger than y
w = 2000
h = int(2000 / scale_covery)
else:
# y is larger than x
h = 2000
w = int(2000 * scale_covery)
IO_image = getNGI(cat, bounds[0][0], bounds[1][0],
bounds[0][1], bounds[1][1],
w=w, h=h, tofile=False) # w=self.nbx, h=self.nby
image = Image.open(IO_image)
ax.imshow(image.transpose(Image.Transpose.FLIP_TOP_BOTTOM), extent=(bounds[0][0], bounds[0][1], bounds[1][0], bounds[1][1]), alpha=1, origin='lower')
except Exception as e:
logging.error(_('Error while fetching the map image from IGN'))
logging.error(e)
elif Cartoweb:
from ..PyWMS import getCartoweb
from PIL import Image
if cat not in available_Cartoweb:
logging.error(_('The category {} is not available in Cartoweb').format(cat))
logging.error(_('Available categories are: {}').format(', '.join(available_Cartoweb)))
else:
try:
bounds = self.get_bounds()
scale_covery = (bounds[0][1] - bounds[0][0]) / (bounds[1][1] - bounds[1][0])
if scale_covery > 1.:
# x is larger than y
w = 2000
h = int(2000 / scale_covery)
else:
# y is larger than x
h = 2000
w = int(2000 * scale_covery)
IO_image = getCartoweb(cat, bounds[0][0], bounds[1][0],
bounds[0][1], bounds[1][1],
w=w, h=h, tofile=False) # w=self.nbx, h=self.nby
image = Image.open(IO_image)
ax.imshow(image.transpose(Image.Transpose.FLIP_TOP_BOTTOM), extent=(bounds[0][0], bounds[0][1], bounds[1][0], bounds[1][1]), alpha=1, origin='lower')
except Exception as e:
logging.error(_('Error while fetching the map image from Cartoweb'))
logging.error(e)
if vmin is None and vmax is not None:
vmin = self.mypal.values[0]
elif vmax is None and vmin is not None:
vmax = self.mypal.values[-1]
if (vmin is None) and (vmax is None):
# im = ax.imshow(self.array.transpose(), origin='lower', cmap=self.mypal,
# extent=(self.origx, self.origx + self.dx * self.nbx, self.origy, self.origy + self.dy * self.nby))
im = ax.imshow(self.array.transpose(),
origin='lower',
norm = self.mypal.norm,
cmap=self.mypal.cmap,
interpolation='none',
extent=(self.origx,
self.origx + self.dx * self.nbx,
self.origy,
self.origy + self.dy * self.nby),
alpha=np.select([self.array.mask.T, ~self.array.mask.T],
[np.zeros(self.shape).T, np.ones(self.shape).T * self.alpha]))
if with_legend:
# add a legend in a new axis
ax_leg = fig.add_axes([0.92, 0.12, 0.04, 0.8])
from matplotlib.colorbar import ColorbarBase
from matplotlib import colors
cbar = ColorbarBase(ax_leg, cmap=self.mypal.cmap, norm=self.mypal.norm, orientation='vertical')
cbar.set_ticks(self.mypal.values)
# ticks labels with 3 decimal places
labels = [f"{val:.3f}" for val in self.mypal.values]
cbar.set_ticklabels(labels)
cbar.ax.tick_params(labelsize=8)
cbar.ax.yaxis.set_label_position('left')
cbar.ax.yaxis.set_ticks_position('right')
cbar.ax.yaxis.set_tick_params(width=0.5, size=2, direction='in', color='black')
else:
im = ax.imshow(self.array.transpose(),
origin='lower',
cmap=self.mypal,
extent=(self.origx,
self.origx + self.dx * self.nbx,
self.origy,
self.origy + self.dy * self.nby),
vmin=vmin, vmax=vmax,
alpha=np.select([self.array.mask.T, ~self.array.mask.T],
[np.zeros(self.shape).T, np.ones(self.shape).T * self.alpha]))
if with_legend:
# add a legend in a new axis
ax_leg = fig.add_axes([0.92, 0.12, 0.04, 0.8])
from matplotlib.colorbar import ColorbarBase
from matplotlib import colors
from matplotlib.colors import Normalize
cbar = ColorbarBase(ax_leg, cmap=self.mypal.cmap, norm= Normalize(vmin, vmax), orientation='vertical')
vals = list(np.linspace(vmin, vmax, self.mypal.nb, endpoint=True))
# limit to 2 decimal places
vals = [round(val, 2) for val in vals]
cbar.set_ticks(vals)
cbar.set_ticklabels(vals)
cbar.ax.tick_params(labelsize=8)
cbar.ax.yaxis.set_label_position('left')
cbar.ax.yaxis.set_ticks_position('right')
cbar.ax.yaxis.set_tick_params(width=0.5, size=2, direction='in', color='black')
ax.set_aspect('equal')
if getdata_im:
return fig, ax, im
else:
return fig, ax
[docs]
def imshow(self, figax:tuple[Figure, Axis] = None,
cmap:Colormap = None, step_ticks=100.) -> tuple[Figure, Axis]:
"""
Create Matplotlib image from WolfArrayModel
:param figax: tuple ``(fig, ax)`` of matplotlib Figure and Axes; created if ``None``
:param cmap: matplotlib colormap name (default: ``None`` uses internal palette)
:param step_ticks: step (in world units) between tick labels on the axes
"""
warnings.warn(_('The imshow method is deprecated and will be removed in a future version. Please use plot_matplotlib instead with the appropriate parameters to achieve the same result.'), DeprecationWarning)
# Use figax if passed as argument
if figax is None:
fig,ax = plt.subplots(1,1)
else:
fig,ax = figax
bounds = self.get_bounds()
if cmap is None:
# update local colors if not already done
if VERSION_RGB==1:
if self.rgb is None:
self.updatepalette(1)
# Pointing RGB
colors = self.rgb
colors[self.array.mask,3] = 0.
# Plot
colors = colors.swapaxes(0,1)
ax.imshow(colors, origin='lower')
else:
# Full scale image values
vals = self.array.data
alpha = np.zeros(vals.shape)
alpha[~ self.array.mask] = 1.
# Plot
vals = vals.swapaxes(0,1)
alpha = alpha.swapaxes(0,1)
ax.imshow(vals, origin='lower', cmap=cmap, alpha=alpha)
ax.set_aspect('equal')
x_start = (bounds[0][0] // step_ticks) * step_ticks
x_end = (bounds[0][1] // step_ticks) * step_ticks
y_start = (bounds[1][0] // step_ticks) * step_ticks
y_end = (bounds[1][1] // step_ticks) * step_ticks
x_pos = np.arange(x_start, x_end+.0001, step_ticks)
y_pos = np.arange(y_start, y_end+.0001, step_ticks)
i = [self.get_ij_from_xy(x, 0.)[0] for x in x_pos]
j = [self.get_ij_from_xy(0., y)[1] for y in y_pos]
ax.set_xticks(i)
ax.set_xticklabels(x_pos, rotation = 30)
ax.set_yticks(j)
ax.set_yticklabels(y_pos)
return fig,ax
# ========================================================================
# Rebinning & Resolution Change
# ========================================================================
[docs]
def get_rebin_shape_size(self, factor:float, reshape_array_if_necessary:bool = False) -> tuple[tuple[int, int], tuple[float, float]]:
"""
Return the new shape after rebinning.
newdx = dx * factor
newdy = dy * factor
The shape is adjusted to be a multiple of the factor.
:param factor: factor of resolution change -- > 1.0 : decrease resolution, < 1.0 : increase resolution
:type factor: float
:param reshape_array_if_necessary: if ``True``, pad array so dimensions are divisible by the downsampling factor
:return: new shape
:rtype: Tuple[int, int], Tuple[float, float]
"""
newdx = self.dx * float(factor)
newdy = self.dy * float(factor)
newnbx = self.nbx
newnby = self.nby
if factor >=1.0:
# Decrease resolution
to_reshape = False
if np.mod(self.nbx,factor) != 0 or np.mod(self.nby,factor) != 0 :
newnbx = self.nbx
newnby = self.nby
if np.mod(self.nbx,factor) !=0:
newnbx = int(self.nbx + factor - np.mod(self.nbx,factor))
to_reshape = True
if np.mod(self.nby,factor) !=0:
newnby = int(self.nby + factor - np.mod(self.nby,factor))
to_reshape = True
if to_reshape and reshape_array_if_necessary:
logging.warning(f"The shape ({self.nbx}, {self.nby}) is not a multiple of the factor {factor}")
new_array = ma.masked_array(np.ones((newnbx, newnby), dtype= self.dtype) * self.nullvalue, dtype= self.dtype)
new_array[:self.nbx, :self.nby] = self.array
self.array = new_array
else:
# Increase resolution
if abs(int(1/factor) - 1/factor) > 1e-6:
# find the greatest common divisor of between self.dx and old_dx
gcd = pgcd_decimal(self.dx, self.dx * factor)
intermediate_factor = gcd / self.dx
[newnbx,newnby], [dx,dy] = self.get_rebin_shape_size(intermediate_factor)
factor = factor / intermediate_factor
if np.mod(newnbx,factor) != 0 or np.mod(newnby,factor) != 0 :
if np.mod(newnbx,factor) !=0:
newnbx = int(newnbx + factor - np.mod(newnbx,factor))
if np.mod(newnby,factor) !=0:
newnby = int(newnby + factor - np.mod(newnby,factor))
newdx = dx * factor
newdy = dy * factor
newnbx = int(newnbx / factor)
newnby = int(newnby / factor)
return (newnbx, newnby), (newdx, newdy)
[docs]
def rebin(self,
factor:float,
operation:Literal['mean', 'sum', 'min', 'max', 'median'] ='mean',
operation_matrix:"WolfArrayModel"=None) -> None:
"""
Change resolution - **in place**.
If you want to keep current data, copy the WolfArrayModel into a new variable -> newWA = Wolfarray(mold=curWA).
:param factor: factor of resolution change -- > 1.0 : decrease resolution, < 1.0 : increase resolution
:type factor: float
:param operation: operation to apply on the blocks ('mean', 'sum', 'min', 'max', 'median')
:type operation: str, Rebin_Ops
:param operation_matrix: operation matrix to apply on the blocks -- see the Enum "Rebin_Ops" for more infos. The matrix must have the same shape as the new array
:type operation_matrix: WolfArrayModel
"""
op_str = operation
(testnbx, testnby), __ = self.get_rebin_shape_size(factor)
if operation_matrix is not None:
tmp_header = self.get_rebin_header(factor)
if not operation_matrix.is_like(tmp_header):
logging.error(_("The operation matrix must have the same shape as the new array"))
logging.info(_("You can use the get_rebin_header method to get the new header if you don't know it"))
return
logging.info(_("Operation matrix detected"))
logging.info(_("The operation matrix will be used to apply the operation on the blocks"))
else:
operation = Rebin_Ops.get_ops(operation)
if operation is None:
logging.error(_("Operator not supported -- Must be a string in ['sum', 'mean', 'min', 'max', 'median'] or a Rebin_Ops Enum"))
return
if not callable(operation):
logging.error(_("Operator not supported -- Must be a string in ['sum', 'mean', 'min', 'max', 'median'] or a Rebin_Ops Enum"))
new_shape, (dx, dy) = self.get_rebin_shape_size(factor, reshape_array_if_necessary= True)
if factor >=1.0:
# Decrease resolution
if operation_matrix is not None:
# Reshape the input array to split it into blocks of size f x f
reshaped_a = self.array.reshape(new_shape[0], int(factor), new_shape[1], int(factor))
# Swap axes to make blocks as separate dimensions
reshaped_a = reshaped_a.swapaxes(1, 2)
# Initialize the output matrix
self.array = ma.masked_array(np.ones((new_shape[0], new_shape[1]), dtype= self.dtype) * self.nullvalue, dtype= self.dtype)
# Check the dtype of the newly initialized array
assert self.array.dtype == self.dtype, _('Bad dtype')
# Vectorized operations
for op_idx, operation in enumerate(Rebin_Ops.get_numpy_ops()):
mask = (operation_matrix.array == op_idx)
if np.any(mask):
block_results = operation(reshaped_a, axis=(2, 3))
self.array[mask] = block_results[mask]
else:
compression_pairs = [(d, c // d) for d, c in zip(new_shape,
self.array.shape)]
flattened = [l for p in compression_pairs for l in p]
self.array = operation(self.array.reshape(flattened), axis=(1, 3)).astype(self.dtype)
self.set_nullvalue_in_mask()
else:
# Increase resolution
if abs(int(1/factor) - 1/factor) > 1e-6:
logging.warning(f"The factor {factor} doesn't lead to an integer dimension for the Kronecker product")
logging.warning("The array will be rebinned firstly using the most common factor and then using the operation")
# find the greatest common divisor of between self.dx and old_dx
gcd = pgcd_decimal(self.dx, self.dx * factor)
intermediate_factor = gcd / self.dx
logging.warning(f"The greatest common divisor is {gcd}")
tmp = type(self)(mold=self, whichtype=self.wolftype)
tmp.rebin(intermediate_factor)
if op_str is None:
op_str = 'min'
tmp.rebin(factor / intermediate_factor, operation= op_str)
self.array = tmp.array
tmp.array = None
del tmp
else:
ones = np.ones( (int(1/factor), int(1/factor)), dtype=self.array.dtype)
self.array = np.kron(self.array, ones).astype(self.array.dtype)
self.mask_data(self.nullvalue)
self.nbx = new_shape[0]
self.nby = new_shape[1]
self.dx = dx
self.dy = dy
assert isinstance(self.array, ma.MaskedArray), _('The array must be a masked array')
assert self.array.shape == (testnbx, testnby), _(f'Bad shape: {self.array.shape} != {(testnbx, testnby)}')
self.count()
# rebin must not change the type of the array
assert self.array.dtype == self.dtype, _(f'Bad dtype: {self.array.dtype} != {self.dtype}')
# ========================================================================
# Reprojection (CRS)
# ========================================================================
@classmethod
[docs]
def from_other_epsg_coo(cls,
input_raster_path:str,
input_srs='EPSG:3812',
output_srs='EPSG:31370',
resampling_method=gdal.GRA_Bilinear,
xRes:float=0.5, yRes:float=0.5):
"""
Reprojects and resamples a raster file from an other EPSG coordinates and return it as a WolfArrayModel.
:param input_raster_path: The path to the input raster file.
:type input_raster_path: str
:param input_srs: The input spatial reference system (SRS) in the format 'EPSG:XXXX'. Defaults to Lambert 2008 'EPSG:3812'.
:type input_srs: str
:param output_srs: The output spatial reference system (SRS) in the format 'EPSG:XXXX'. Defaults to Belgian Lambert 72 'EPSG:31370'.
:type output_srs: str
:param resampling_method: The resampling method to use. Defaults to gdal.GRA_Bilinear. Resampling method can be chosen among the gdal GRA_*
constants (gdal.GRA_Average; gdal.GRA_Bilinear; gdal.GRA_Cubic; gdal.GRA_CubicSpline;
gdal.GRA_Lanczos; gdal.GRA_Mode; gdal.GRA_NearestNeighbour)
:type resampling_method: int
:param xRes: The desired output resolution in the x direction. Defaults to 0.5.
:type xRes (float): float
:param yRes: The desired output resolution in the y direction. Defaults to 0.5.
:type yRes (float): float
:raises AssertionError: If the input or output raster file is not a GeoTIFF file.
:raises RuntimeError: If the input raster file cannot be opened.
:raises PermissionError: If there is a permission error while trying to delete the output raster file.
:raises Exception: If an unexpected error occurs while trying to delete the output raster file.
:raises RuntimeError: If the reprojection fails for the input raster file.
:return: WolfArrayModel
"""
#sanitize input
input_raster_path = str(input_raster_path)
input_srs = str(input_srs)
output_srs = str(output_srs)
assert resampling_method in [gdal.GRA_Average, gdal.GRA_Bilinear, gdal.GRA_Cubic, gdal.GRA_CubicSpline, gdal.GRA_Lanczos, gdal.GRA_Mode, gdal.GRA_NearestNeighbour], "Invalid resampling method"
# Define temporary files
with tempfile.TemporaryDirectory() as temp_dir:
output_raster_path = os.path.join(temp_dir, "Array_72.tif")
reproject_and_resample_raster(input_raster_path, output_raster_path, input_srs, output_srs, resampling_method, xRes, yRes)
Array3 = type(self)(output_raster_path, nullvalue=-9999)
return Array3
[docs]
def reproject(self, output_epsg:int,
resampling_method=gdal.GRA_Bilinear,
) -> "WolfArrayModel":
"""
Reprojects and resamples the WolfArrayModel to a new EPSG coordinate system.
:param output_epsg: The output EPSG code for the desired coordinate system.
:type output_epsg: int
:param resampling_method: The resampling method to use. Defaults to gdal.GRA_Bilinear. Resampling method can be chosen among the gdal GRA_*
constants (gdal.GRA_Average; gdal.GRA_Bilinear; gdal.GRA_Cubic; gdal.GRA_CubicSpline;
gdal.GRA_Lanczos; gdal.GRA_Mode; gdal.GRA_NearestNeighbour)
:type resampling_method: int
:return: WolfArrayModel
"""
assert isinstance(output_epsg, int), "output_epsg must be an integer"
assert resampling_method in [gdal.GRA_Average, gdal.GRA_Bilinear, gdal.GRA_Cubic, gdal.GRA_CubicSpline, gdal.GRA_Lanczos, gdal.GRA_Mode, gdal.GRA_NearestNeighbour], "Invalid resampling method"
# Define temporary files
with tempfile.TemporaryDirectory() as temp_dir:
self.write_all(os.path.join(temp_dir, "Array_input.tif"))
output_raster_path = os.path.join(temp_dir, "Array_reprojected.tif")
reproject_and_resample_raster(os.path.join(temp_dir, "Array_input.tif"),
output_raster_path,
f'EPSG:{self.epsg}',
f'EPSG:{output_epsg}',
resampling_method=resampling_method)
Array3 = type(self)(output_raster_path, nullvalue=-9999)
return Array3
# ========================================================================
# Merging & External Data Creation
# ========================================================================
@classmethod
[docs]
def merge(cls, others:list, abs:bool=True, copy:bool = True) -> "WolfArrayModel":
"""
Merge several WolfArrayModel into a single WolfArrayModel
:param others: list of WolfArrayModel to merge
:param abs: if True -> Global World Coordinates
:param copy: if True -> copy data (**Not necessary** if data header CAN BE modified. It can be save memory)
"""
from ._mb import WolfArrayMB
newMBArray = WolfArrayMB()
for curarray in others:
newMBArray.add_block(curarray, force_idx=True, copyarray=copy)
newMBArray.set_header_from_added_blocks()
return newMBArray.as_WolfArray(abs=abs)
@classmethod
[docs]
def set_general_frame_from_xyz(cls, fname:str, dx:float, dy:float, border_size:int=5, delimiter:str=',', fillin:bool=False):
"""
Lecture d'un fichier texte xyz et initialisation des données de base
:param fname: nom du fichier xyz
:param dx: pas en x
:param dy: pas en y
:param border_size: nombre de mailles de bordure en plus de l'extension spatiale du fichier
:param delimiter: column delimiter in the XYZ file (e.g. ``"\\t"``, ``" "``)
:param fillin: if ``True``, fill the array with values from the XYZ file
"""
my_file = XYZFile(fname, delimiter=delimiter)
my_file.read_from_file()
(xlim, ylim) = my_file.get_extent()
Array = cls()
Array.dx = dx
Array.dy = dy
Array.origx = math.floor(xlim[0]) - dx/2. - float(border_size) * Array.dx
Array.origy = math.floor(ylim[0]) - dy/2. - float(border_size) * Array.dy
Array.nbx = int((math.floor(xlim[1]) - math.ceil(xlim[0])) / Array.dx) + 1 + 2*border_size
Array.nby = int((math.floor(ylim[1]) - math.ceil(ylim[0])) / Array.dy) + 1 + 2*border_size
Array.array = np.ma.zeros((Array.nbx, Array.nby), dtype=np.float32)
if fillin:
Array.fillin_from_xyz(my_file.xyz)
return Array
@classmethod
[docs]
def set_general_frame_from_xyz_dir(cls, path:str, bounds:list, delimiter:str=',', dxdy:tuple[float, float]= None, border_size:int=5, fillin:bool=True):
"""
Lecture d'un dossier contenant des fichiers texte xyz, initialisation des données de base et chargement de la matrice
Renvoie un WolfArrayModel or False si le fichier n'est pas dans les limites
:param path: chemin du dossier avec les fichier xyz
:dtype path: str
:param bounds: limites de l'extension spatiale
:dtype bounds: list
:param delimiter: délimiteur des fichiers xyz
:dtype delimiter: str
:param border_size: nombre de mailles de bordure en plus de l'extension spatiale du fichier
:param dxdy: tuple ``(dx, dy)`` cell size; if ``None``, infer from data
:param fillin: if ``True``, fill the array with values from the XYZ files
:dtype border_size: int
"""
Data_xyz = XYZFile('nothing.xyz', folder=path, bounds=bounds, delimiter=delimiter)
if Data_xyz.nblines == 0:
logging.info(f"File not in the boundaries for {path.split(os.sep)[-1]}")
return False
xlim, ylim = Data_xyz.get_extent()
if dxdy is None:
# We assume dx and dy are equals within xyz file
dx = np.diff(Data_xyz.xyz[0:2,0])[0]
logging.info(f"dx = {dx} ; dy = {dx}")
dy = dx
else:
dx,dy = dxdy
assert dx is not None and dy is not None, _('dx and dy must be defined')
Array = cls()
Array.dx = dx
Array.dy = dy
Array.origx = math.floor(xlim[0]) -dx/2. - float(border_size) * Array.dx
Array.origy = math.floor(ylim[0]) -dy/2. - float(border_size) * Array.dy
Array.nbx = int((math.floor(xlim[1]) - math.ceil(xlim[0])) / Array.dx) +1 + 2*border_size
Array.nby = int((math.floor(ylim[1]) - math.ceil(ylim[0])) / Array.dy) +1 + 2*border_size
Array.array = np.ma.zeros((Array.nbx, Array.nby))
if fillin:
Array.fillin_from_xyz(Data_xyz.xyz)
return Array