Source code for wolfhece.wolf_array._base

"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024-2026

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

try:
    from osgeo import gdal, osr
    gdal.UseExceptions()
except ImportError as e:
    print(e)
    raise Exception(_('Error importing GDAL library'))

import os
import sys
import warnings

import numpy as np
import numpy.ma as ma

from typing import Union, Literal, TYPE_CHECKING
from matplotlib.axis import Axis
from matplotlib.figure import Figure
import matplotlib.pyplot as plt
from matplotlib.colors import Colormap
import logging
import json
import tempfile
from pathlib import Path
from tqdm import tqdm

import matplotlib.pyplot as plt
import matplotlib.path as mpltPath
from scipy.interpolate import interp2d, griddata
from scipy.ndimage import laplace, label, sum_labels
from shapely.geometry import Point, LineString, MultiLineString, Polygon, MultiPolygon, MultiPoint
from shapely.ops import linemerge, substring, polygonize_full
from shapely import contains, contains_properly, contains_xy, touches, prepare, destroy_prepared, is_prepared
from os.path import dirname,basename,join
import logging
from typing import Literal
from enum import Enum

try:
    from ..PyTranslate import _
except ImportError as e:
    print(e)
    raise Exception(_('Error importing modules'))

try:
    from ..Coordinates_operations import reproject_and_resample_raster
except ImportError as e:
    print(e)
    raise Exception(_('Error importing modules'))

try:
    from ..xyz_file import XYZFile
    from ..PyPalette import wolfpalette
    from ..PyVertexvectors import Zones, vector, wolfvertex, zone, Triangulation
    from ..PyVertex import cloud_vertices
except ImportError as e:
    print(e)
    raise Exception(_('Error importing modules'))

try:
    from ..pydownloader import download_file, toys_dataset
except ImportError as e:
    raise Exception(_('Error importing pydownloader module'))

# try:
#     from wolfhece_tools.wolf_array import header_wolf
# except ImportError as e:
#     logging.error(e)
#     raise Exception(_('Error importing modules wolfhece_tools - please install it via pip install wolfhece_tools'))

import math
from fractions import Fraction


[docs] def pgcd_decimal(a, b): # Convertir les décimaux en fractions exactes fa = Fraction(a).limit_denominator() fb = Fraction(b).limit_denominator() # Extraire les numérateurs et dénominateurs num_a, den_a = fa.numerator, fa.denominator num_b, den_b = fb.numerator, fb.denominator # Mettre au même dénominateur lcm_den = math.lcm(den_a, den_b) a_int = num_a * (lcm_den // den_a) b_int = num_b * (lcm_den // den_b) # PGCD des entiers pgcd_int = math.gcd(a_int, b_int) # Revenir à l’échelle décimale return pgcd_int / lcm_den
# Re-exported from wolfhece.header_wolf for backward compatibility from ._header_wolf import ( header_wolf, getkeyblock, decodekeyblock, WOLF_ARRAY_HILLSHAPE, WOLF_ARRAY_FULL_SINGLE, WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_SYM_DOUBLE, WOLF_ARRAY_FULL_LOGICAL, WOLF_ARRAY_CSR_DOUBLE, WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_SINGLE_3D, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8, WOLF_ARRAY_MB_SINGLE, WOLF_ARRAY_MB_INTEGER, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_MNAP_INTEGER, WOLF_ARRAY_MB, WOLF_ARRAY_FULL_UINTEGER16, WOLF_ARRAY_FULL_UINTEGER32 )
[docs] class OGLRenderer(Enum):
[docs] LIST = 0
[docs] SHADER = 1
@classmethod
[docs] def get(cls, value:int): """ Return the OGLRenderer value corresponding to an integer value """ for e in cls: if e.value == value: return e
[docs] DEFAULT_OGLRENDERER = OGLRenderer.SHADER
[docs] VERSION_RGB = 3
from numba import jit @jit(nopython=True)
[docs] def custom_gradient(array: np.ndarray): """ Calculate the gradient manually """ grad_x = np.zeros_like(array) grad_y = np.zeros_like(array) for i in range(1, array.shape[0] - 1): for j in range(1, array.shape[1] - 1): grad_x[i, j] = (array[i + 1, j] - array[i - 1, j]) / 2.0 grad_y[i, j] = (array[i, j + 1] - array[i, j - 1]) / 2.0 return grad_x, grad_y
@jit(nopython=True)
[docs] def hillshade(array:np.ndarray, azimuth:float, angle_altitude:float) -> np.ndarray: """ Create a hillshade array """ azimuth = 360.0 - azimuth x, y = custom_gradient(array) slope = np.pi / 2. - np.arctan(np.sqrt(x * x + y * y)) aspect = np.arctan2(-x, y) azimuthrad = azimuth * np.pi / 180. altituderad = angle_altitude * np.pi / 180. shaded = np.sin(altituderad) * np.sin(slope) + np.cos(altituderad) * \ np.cos(slope) * np.cos((azimuthrad - np.pi / 2.) - aspect) shaded += 1. shaded *= .5 return shaded.astype(np.float32)
# Re-exported from wolfhece.ui.wolf_array_ui for backward compatibility # Lazy-loaded in methods that need them to avoid wx dependency at module level # Direct imports available from wolfhece.wolf_array.__init__
[docs] class Rebin_Ops(Enum): """Enum for rebin/downsampling operations (no wx dependency)."""
[docs] MIN = 0
[docs] MEAN = 1
[docs] MAX = 2
[docs] SUM = 3
[docs] MEDIAN = 4
@classmethod
[docs] def get_numpy_ops(cls): """ Return a list of numpy functions corresponding to the enum values """ # CAUTION : Order is important and must match the enum values return [np.ma.min, np.ma.mean, np.ma.max, np.ma.sum, np.ma.median]
@classmethod
[docs] def get_ops(cls, name:str): """ Return the numpy function corresponding to a string """ if isinstance(name, Rebin_Ops): return cls.get_numpy_ops()[name.value] elif isinstance(name, str): if name == 'min': return np.ma.min elif name == 'mean': return np.ma.mean elif name == 'max': return np.ma.max elif name == 'sum': return np.ma.sum elif name == 'median': return np.ma.median else: return None else: return None
# Re-exported from wolfhece.selection_data for backward compatibility from ._selection_data import ( ALL_SELECTED, StorageMode, SelectionData, SelectionDataMB, ) if TYPE_CHECKING: from ..ui.wolf_array_ops import Ops_Array from ..opengl.py3d import WolfArrayPlotShader from ..opengl.wolf_array_shader2d import WolfArrayShader2D
[docs] class WolfArrayModel(header_wolf): """ Data-only WolfArrayModel class (no GUI/OpenGL dependency at class level). For GUI support (OpenGL rendering, wx dialogs), use WolfArray from _base_gui.py. simple précision, double précision, entier... """
[docs] array: ma.masked_array
[docs] _cache_grid: dict # For OpenGL
[docs] linkedvec: vector # used in some operations
[docs] linkedarrays: list["WolfArrayModel"] # used in some operations
# Origin and translation of the coordinate-system # in which the array-data coordinates are expressed.
[docs] origx: float
[docs] origy: float
[docs] origz: float
[docs] translx: float
[docs] transly: float
[docs] translz: float
[docs] myops: "Ops_Array"
# ======================================================================== # Constructor, Destructor & Serialization # ======================================================================== def __init__(self, fname:str = None, mold:"WolfArrayModel" = None, masknull:bool = True, crop:list[list[float],list[float]]=None, whichtype = WOLF_ARRAY_FULL_SINGLE, preload:bool = True, nullvalue:float = 0., srcheader:header_wolf = None, idx:str = '', mask_source:np.ndarray = None, np_source:np.ndarray = None, ) -> None: """ Constructor of the WolfArrayModel class :param fname: filename/filepath - if provided, the file will be read on disk :param mold: initialize from a copy a the mold object --> must be a WolfArrayModel if not None - Do not impose the type of the array. The source array will be converted if necessary. Change "whichtype" to impose a type. :param masknull: mask data based on the nullvalue :param crop: crop data based on the spatial extent [[xmin, xmax], [ymin,ymax]] :param whichtype: type of the numpy array (float32 as default) :param preload: True = load data during initialization ; False = waits for the display to be required :param nullvalue: null value used to mask data :param srcheader: initialize dimension from header_wolf instance :param idx: indentity --> required by the mapviewer :param mask_source: mask to link to the data :param np_source: numpy array to link to the data """ header_wolf.__init__(self) # Defaults for attributes typically set by Element_To_Draw. # In WolfArray (GUI subclass), Element_To_Draw.__init__ is called first, # so these attributes already exist and the hasattr checks skip them. if not hasattr(self, 'mapviewer'): self.mapviewer = None if not hasattr(self, 'wx_exists'): self.wx_exists = False if not hasattr(self, 'idx'): self.idx = idx if not hasattr(self, 'plotted'): self.plotted = False if not hasattr(self, 'plotting'): self.plotting = False self.myops = None
[docs] self._rendering_machine = DEFAULT_OGLRENDERER
[docs] self.mngselection = None
[docs] self.myblocks = None
[docs] self._active_blocks = None
[docs] self.flipupd=False
self.array:ma.masked_array = None # numpy masked array to stored numerical data self.linkedvec = None self.linkedarrays = []
[docs] self.filename = ''
[docs] self.isblock = False
[docs] self.blockindex = 0
[docs] self.wolftype = whichtype
[docs] self.preload = preload
[docs] self.loaded = False
[docs] self.masknull = masknull
if VERSION_RGB==1 : self.rgb = None # numpy array with colorize values
[docs] self.alpha = 1. # transparency alpha value
[docs] self.shading = False # if True, rgb will be shaded - Not available in SHADER mode
[docs] self.azimuthhill = 315. # sun position - azimuth
[docs] self.altitudehill = 0. # sun position - altitude
[docs] self.shaded = None
[docs] self._nullvalue = nullvalue
[docs] self.nbnotnull = 99999 # number of non-null values in the entire aray
[docs] self.nbnotnullzoom = 99999 # number of non-null values in the current visible part in mapviwer
[docs] self.nbtoplot = 0
[docs] self._gridsize = 256 # virtual grid for plotting operations
[docs] self._lod_max_whole_array = -1 # maximum scale used
# colormap
[docs] self.mypal = wolfpalette(None, "Palette of colors")
self.mypal.default16() self.mypal.automatic = True self._cache_grid = {}
[docs] self._cache_grid_last_call = None
[docs] self._cache_grid_frequency_useconds = 1000
[docs] self._shader_2d: "WolfArrayShader2D | None" = None
[docs] self._array3d = None
[docs] self._array2d = None
[docs] self.viewers3d:list["WolfArrayPlotShader"] = []
[docs] self.cropini = crop
if isinstance(srcheader, header_wolf): header=srcheader self.origx = header.origx self.origy = header.origy self.origz = header.origz self.translx = header.translx self.transly = header.transly self.translz = header.translz self.dx = header.dx self.dy = header.dy self.dz = header.dz self.nbx = int(header.nbx) self.nby = int(header.nby) self.nbz = int(header.nbz) self.head_blocks = header.head_blocks.copy() if self.nb_blocks>0: self.myblocks = {} if np_source is None: self.allocate_ressources() else: assert np_source.shape == (self.nbx, self.nby), _('Shape of np_source is not compatible with header') if self.dtype != np_source.dtype: logging.warning(_(f"dtype of np_source is not compatible with header -- Conversion will be done to match wolf header's type {self.dtype}")) np_source = np_source.astype(self.dtype) self.array = ma.MaskedArray(np_source, mask= np_source[:,:] == self.nullvalue if self.masknull else np.zeros(np_source.shape, dtype=bool), copy=False, order='C') # # FIXME Why not initialize with nullvalue ? # self.array = ma.MaskedArray(np.ones((self.nbx, self.nby), order='F', dtype=self.dtype)) if fname is not None: # check if fname is an url if str(fname).startswith('http:') or str(fname).startswith('https:'): try: fname = download_file(fname) except Exception as e: logging.error(_('Error while downloading file: %s') % e) return self.filename = str(fname) logging.debug(_('Loading file : %s') % self.filename) self.read_all() if mask_source is not None: logging.debug(_('Applying mask from source')) self.copy_mask_log(mask_source) logging.debug(_('Data masked')) elif masknull and (self.preload or self.loaded): logging.debug(_('Masking data with nullvalue')) self.mask_data(self.nullvalue) logging.debug(_('Data masked')) elif mold is not None: if self.cropini is None: self.nbx = mold.nbx self.nby = mold.nby self.nbz = mold.nbz self.dx = mold.dx self.dy = mold.dy self.dz = mold.dz self.origx = mold.origx self.origy = mold.origy self.origz = mold.origz self.translx = mold.translx self.transly = mold.transly self.translz = mold.translz self.array = ma.copy(mold.array.astype(self.dtype)) if idx=='': self.idx = mold.idx else: imin, jmin = mold.get_ij_from_xy(self.cropini[0][0], self.cropini[1][0]) imax, jmax = mold.get_ij_from_xy(self.cropini[0][1], self.cropini[1][1]) imin = int(imin) jmin = int(jmin) imax = int(imax) jmax = int(jmax) self.nbx = imax - imin self.nby = jmax - jmin self.dx = mold.dx self.dy = mold.dy self.origx, self.origy = mold.get_xy_from_ij(imin, jmin) self.origx -= self.dx / 2. self.origy -= self.dy / 2. self.translx = mold.translx self.transly = mold.transly if idx=='': self.idx = mold.idx self.array = ma.copy(mold.array[imin:imax, jmin:jmax].astype(self.dtype)) # Selection manager (without GUI) if self.mngselection is None: from ._selection_data import SelectionData, SelectionDataMB if self.nb_blocks > 0: self.mngselection = SelectionDataMB(self) else: self.mngselection = SelectionData(self) def __del__(self): """ Destructeur de la classe """ try: # Perform cleanup tasks safely self.delete_lists() if hasattr(self, 'array'): del self.array if VERSION_RGB == 1 and hasattr(self, 'rgb'): del self.rgb if hasattr(self, '_array3d'): del self._array3d if hasattr(self, '_array2d'): del self._array2d if hasattr(self, 'mypal'): del self.mypal if hasattr(self, 'shaded'): del self.shaded try: if sys.meta_path is not None: # Perform garbage collection if gc is available import gc gc.collect() except Exception: # Try/except to avoid issues during interpreter shutdown pass except Exception as e: print(f"Exception in WolfArrayModel destructor: {e} -- Please report this issue") def __setstate__(self, state): """Restore the object state during deserialization (unpickling). :param state: dictionary of object attributes for deserialization """ self.__dict__.update(state) # ======================================================================== # Properties # ======================================================================== @property
[docs] def memory_usage(self): """ Return the memory usage of the header """ if self.nbz == 0: size = self.nbx * self.nby else: size = self.nbx * self.nby * self.nbz if self.wolftype == WOLF_ARRAY_FULL_SINGLE: return size * 4 elif self.wolftype == WOLF_ARRAY_FULL_DOUBLE: return size * 8 elif self.wolftype == WOLF_ARRAY_FULL_INTEGER: return size * 4 elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]: return size * 2 elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]: return size else: return size * 4
@property
[docs] def memory_usage_mask(self): """ Return the memory usage of the mask """ if self.nbz == 0: size = self.nbx * self.nby else: size = self.nbx * self.nby * self.nbz return size * 1
@property
[docs] def nullvalue(self) -> float: """ Return the null value """ return self._nullvalue
@nullvalue.setter def nullvalue(self, value:float): """Set the null value :param value: new null (no-data) value """ self._nullvalue = value @property
[docs] def nodata(self) -> float: """ alias for nullvalue """ return self._nullvalue
@nodata.setter def nodata(self, value:float): """alias for nullvalue :param value: new null (no-data) value (alias for :attr:`nullvalue`) """ self._nullvalue = value @property
[docs] def SelectionData(self) -> SelectionData: """ Return the data of the selection """ return self.mngselection
@property
[docs] def active_blocks(self) -> list["WolfArrayModel"]: """ Return the active blocks """ if self.nb_blocks>0 and self._active_blocks is not None: if isinstance(self._active_blocks, list): return [self.myblocks[cur] for cur in self._active_blocks] elif self._active_blocks == 0: return [k for k in self.myblocks.values()] elif self._active_blocks in self.myblocks: return [self.myblocks[self._active_blocks]] else: return None else: return [self]
@active_blocks.setter def active_blocks(self, value:Union[str, int, list[int]]): """ Set the active blocks :param value: name of the block or index 1-based or list of index 1-based """ if isinstance(value, str): if value in self.myblocks: self._active_blocks = value logging.info(_(f'Block found - {value}')) else: self._active_blocks = None logging.info(_('Block not found')) elif isinstance(value, int): if value == 0: self._active_blocks = 0 logging.info(_('All blocks selected')) else: value = getkeyblock(value, addone=False) if value in self.myblocks: self._active_blocks = value logging.info(_(f'Block found - {value}')) else: self._active_blocks = None logging.info(_('Block not found')) elif isinstance(value, list): if 0 in value: self._active_blocks = 0 logging.info(_('All blocks selected')) else: value = [getkeyblock(cur, addone=False) for cur in value] value = [cur for cur in value if cur in self.myblocks] if len(value)>0: self._active_blocks = value logging.info(_('List of blocks selected')) else: self._active_blocks = None logging.info(_('No block found')) else: logging.error(_('Unknown type for active_blocks')) @property
[docs] def dtype(self): """ Return the numpy dtype corresponding to the WOLF type Pay ettention to the difference between : - LOGICAL : Fortran and VB6 - Bool : Python In VB6, logical is stored as int16 In Fortran, there are Logical*1, Logical*2, Logical*4, Logical*8 In Python, bool is one byte In Numpy, np.bool_ is one byte """ if self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_SYM_DOUBLE, WOLF_ARRAY_CSR_DOUBLE]: dtype = np.float64 elif self.wolftype in [WOLF_ARRAY_FULL_SINGLE, WOLF_ARRAY_FULL_SINGLE_3D, WOLF_ARRAY_MB_SINGLE]: dtype = np.float32 elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_MB_INTEGER, WOLF_ARRAY_MNAP_INTEGER]: dtype = np.int32 elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]: dtype = np.int16 elif self.wolftype == WOLF_ARRAY_FULL_INTEGER8: dtype = np.int8 elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER8: dtype = np.uint8 elif self.wolftype == WOLF_ARRAY_FULL_LOGICAL: dtype = np.int16 elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER16: dtype = np.uint16 elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER32: dtype = np.uint32 return dtype
@property
[docs] def dtype_gdal(self): """ Return the GDAL dtype corresponding to the WOLF type """ if self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_SYM_DOUBLE, WOLF_ARRAY_CSR_DOUBLE]: dtype = gdal.GDT_Float64 elif self.wolftype in [WOLF_ARRAY_FULL_SINGLE, WOLF_ARRAY_FULL_SINGLE_3D, WOLF_ARRAY_MB_SINGLE]: dtype = gdal.GDT_Float32 elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_MB_INTEGER, WOLF_ARRAY_MNAP_INTEGER]: dtype = gdal.GDT_Int32 elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]: dtype = gdal.GDT_Int16 elif self.wolftype == WOLF_ARRAY_FULL_INTEGER8: dtype = gdal.GDT_Byte elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER8: dtype = gdal.GDT_Byte elif self.wolftype == WOLF_ARRAY_FULL_LOGICAL: dtype = gdal.GDT_Int16 elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER16: dtype = gdal.GDT_UInt16 elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER32: dtype = gdal.GDT_UInt32 return dtype
@property
[docs] def dtype_str(self): """ Return the numpy dtype corresponding to the WOLF type, as a string Pay ettention to the difference between : - LOGICAL : Fortran and VB6 - Bool : Python In VB6, logical is stored as int16 In Fortran, there are Logical*1, Logical*2, Logical*4, Logical*8 In Python, bool is one byte In Numpy, np.bool_ is one byte """ if self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_SYM_DOUBLE, WOLF_ARRAY_CSR_DOUBLE]: dtype = _('float64 - 8 bytes per values') elif self.wolftype in [WOLF_ARRAY_FULL_SINGLE, WOLF_ARRAY_FULL_SINGLE_3D, WOLF_ARRAY_MB_SINGLE]: dtype = _('float32 - 4 bytes per values') elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_MB_INTEGER, WOLF_ARRAY_MNAP_INTEGER]: dtype = _('int32 - 4 bytes per values') elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]: dtype = _('int16 - 2 bytes per values') elif self.wolftype == WOLF_ARRAY_FULL_INTEGER8: dtype = _('int8 - 1 byte per values') elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER8: dtype = _('uint8 - 1 byte per values') elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER16: dtype = _('uint16 - 2 bytes per values') elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER32: dtype = _('uint32 - 4 bytes per values') elif self.wolftype == WOLF_ARRAY_FULL_LOGICAL: dtype = _('int16 - 2 bytes per values') return dtype
@property
[docs] def zmin(self): """ Return the minimum value of the masked array """ return np.ma.min(self.array)
@property
[docs] def zmax(self): """ Return the maximum value of the masked array """ return np.ma.max(self.array)
@property
[docs] def zmin_global(self): """ Return the minimum value of the array -- all data (masked or not) """ return np.min(self.array.data)
@property
[docs] def zmax_global(self): """ Return the maximum value of the array -- all data (masked or not) """ return np.max(self.array.data)
# ======================================================================== # Mapviewer & GUI Hooks # ========================================================================
[docs] def get_mapviewer(self): """Return the mapviewer""" return self.mapviewer
[docs] def set_mapviewer(self, newmapviewer=None): """Attach a (new) mapviewer to the object :param newmapviewer: new mapviewer instance to attach (``None`` to detach) """ self.mapviewer = newmapviewer
# --- Hook methods for GUI interaction --- # These are no-ops in data-only mode; overridden in WolfArray (GUI).
[docs] def _prompt_crop(self, dx:float, dy:float) -> list | None: """Prompt user for crop bounds. Returns [[xmin,xmax],[ymin,ymax]] or None. In data-only mode, returns None (no interaction possible). Overridden in WolfArray to show CropDialog. :param dx: cell size in x direction (used for pre-filling the dialog) :param dy: cell size in y direction (used for pre-filling the dialog) """ logging.error(_('Crop bounds required but no GUI available')) return None
[docs] def _prompt_band_selection(self, band_names:list[str]) -> int | None: """Prompt user for band selection. Returns 1-based band index or None. In data-only mode, returns 1 (first band). Overridden in WolfArray to show SingleChoiceDialog. :param band_names: list of band name strings to choose from """ return 1
[docs] def _prompt_save_file(self, message:str, wildcard:str) -> str | None: """Prompt user for a file path to save. Returns path or None. In data-only mode, returns None. Overridden in WolfArray to show FileDialog. :param message: dialog title / prompt message :param wildcard: file filter pattern (e.g. ``"TIFF files (*.tif)|*.tif"``) """ return None
[docs] def _show_error_dialog(self, message:str): """Show an error dialog to the user. In data-only mode, just logs. Overridden in WolfArray to show wx.MessageDialog. :param message: error message to display or log """ logging.error(message)
[docs] def reset_plot(self, whichpal=0, mimic=True): """Reset plot - no-op in data-only mode; overridden in WolfArray (GUI) :param whichpal: palette index (default: 0) :param mimic: if ``True``, propagate to linked arrays """ self.count()
[docs] def updatepalette(self, which:int=0, onzoom=[]): """Update palette - minimal version without OGL; overridden in WolfArray (GUI) :param which: palette index (default: 0) :param onzoom: zoom window ``[xmin, xmax, ymin, ymax]``; empty list for whole array """ if self.array is None: return if self.mypal.automatic: if onzoom != []: self.mypal.isopop(self.get_working_array(onzoom), self.nbnotnullzoom) else: self.mypal.isopop(self.get_working_array(), self.nbnotnull)
[docs] def delete_lists(self): """Delete OpenGL lists - no-op in data-only mode; overridden in WolfArray (GUI)""" pass
[docs] def find_minmax(self, update=False): """Find min/max spatial extent :param update: if ``True``, recalculate spatial bounds from array dimensions """ if update: [self.xmin, self.xmax], [self.ymin, self.ymax] = self.get_bounds()
# ======================================================================== # Memory Allocation, Header & Initialization # ========================================================================
[docs] def _new_like(self) -> "WolfArrayModel": """Create a new empty instance of the same concrete type, copying grid geometry but not array data. Uses ``type(self)(...)`` so that subclasses (e.g. ``WolfArray``) return an instance of the subclass without needing to override this method. Override in a subclass only if extra initialisation is required (e.g. attaching a *mapviewer*). :return: new instance with the same spatial properties and wolftype :rtype: same type as ``self`` """ obj = type(self)(whichtype=self.wolftype) obj.nbx = self.nbx obj.nby = self.nby obj.dx = self.dx obj.dy = self.dy obj.origx = self.origx obj.origy = self.origy obj.origz = self.origz obj.translx = self.translx obj.transly = self.transly obj.translz = self.translz if self.nbdims == 3: obj.nbz = self.nbz obj.dz = self.dz return obj
[docs] def allocate_ressources(self): """ Allocate the numpy masked array according to the current dtype and dimensions. """ if self.nbdims == 2: self.array = ma.ones([self.nbx, self.nby], order='F', dtype=self.dtype) elif self.nbdims == 3: self.array = ma.ones([self.nbx, self.nby, self.nbz], order='F', dtype=self.dtype) self.mask_reset()
# ************************************************************************************************************************* # END POSITION and VALUES associated to a vector/polygon/polyline # *************************************************************************************************************************
[docs] def reset(self): """ Reset the array to nullvalue """ if self.nbdims == 2: self.array[:, :] = self.nullvalue elif self.nbdims == 3: self.array[:, :, :] = self.nullvalue
[docs] def init_from_header(self, myhead: header_wolf, dtype:np.dtype = None, force_type_from_header:bool=False): """ Initialize the array properties from a header_wolf object :param myhead: header_wolf object :param dtype: numpy dtype :param force_type_from_header: force the type from the header passed as argument """ if force_type_from_header: self.wolftype = myhead.wolftype if dtype is None: if self.wolftype == WOLF_ARRAY_FULL_DOUBLE: dtype = np.float64 elif self.wolftype == WOLF_ARRAY_FULL_SINGLE: dtype = np.float32 elif self.wolftype == WOLF_ARRAY_FULL_INTEGER: dtype = np.int32 elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]: dtype = np.int16 elif self.wolftype == WOLF_ARRAY_FULL_INTEGER8: dtype = np.int8 elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER8: dtype = np.uint8 elif self.wolftype == WOLF_ARRAY_FULL_LOGICAL: dtype = np.int16 elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER16: dtype = np.uint16 elif self.wolftype == WOLF_ARRAY_FULL_UINTEGER32: dtype = np.uint32 self.dx = myhead.dx self.dy = myhead.dy self.nbx = myhead.nbx self.nby = myhead.nby self.origx = myhead.origx self.origy = myhead.origy self.translx = myhead.translx self.transly = myhead.transly self.array = ma.MaskedArray(np.ones((self.nbx, self.nby), order='F', dtype=dtype)) self.mask_reset()
[docs] def get_header(self, abs:bool=True) -> header_wolf: """ Return a header_wolf object - different from the self object header :param abs: if True (default), return an absolute header (shifted origin) and translation set to 0. :return: new header_wolf with copied spatial properties :rtype: header_wolf """ curhead = header_wolf() curhead.origx = self.origx curhead.origy = self.origy curhead.origz = self.origz curhead.dx = self.dx curhead.dy = self.dy curhead.dz = self.dz curhead.nbx = self.nbx curhead.nby = self.nby curhead.nbz = self.nbz curhead.translx = self.translx curhead.transly = self.transly curhead.translz = self.translz curhead.head_blocks = self.head_blocks.copy() curhead.wolftype = self.wolftype if abs: curhead.origx += curhead.translx curhead.origy += curhead.transly curhead.origz += curhead.translz curhead.translx = 0. curhead.transly = 0. curhead.translz = 0. return curhead
[docs] def set_header(self, header: header_wolf): """Set the header from a header_wolf object :param header: :class:`header_wolf` instance to copy properties from """ self.origx = header.origx self.origy = header.origy self.origz = header.origz self.translx = header.translx self.transly = header.transly self.translz = header.translz self.dx = header.dx self.dy = header.dy self.dz = header.dz self.nbx = header.nbx self.nby = header.nby self.nbz = header.nbz self.head_blocks = header.head_blocks.copy() if hasattr(self, 'add_ops_sel'): self.add_ops_sel()
[docs] def set_array_from_numpy(self, array:np.ndarray, nullvalue:float = None): """ Set array from numpy array :param array: numpy array with shape ``(nbx, nby)`` :param nullvalue: null value to apply and mask; if ``None``, use current :attr:`nullvalue` """ if array.shape != (self.nbx, self.nby): logging.warning(f"Array shape {array.shape} is not compatible with WolfArrayModel shape {self.nbx, self.nby}") return def wolftype_from_npz(curarray:np.ndarray): if curarray.dtype == np.float64: return WOLF_ARRAY_FULL_DOUBLE elif curarray.dtype == np.float32: return WOLF_ARRAY_FULL_SINGLE elif curarray.dtype == np.int32: return WOLF_ARRAY_FULL_INTEGER elif curarray.dtype == np.int8: return WOLF_ARRAY_FULL_INTEGER8 elif curarray.dtype == np.uint8: return WOLF_ARRAY_FULL_UINTEGER8 elif curarray.dtype == np.int16: return WOLF_ARRAY_FULL_INTEGER16 elif curarray.dtype == np.uint16: return WOLF_ARRAY_FULL_UINTEGER16 elif curarray.dtype == np.uint32: return WOLF_ARRAY_FULL_UINTEGER32 self.array = np.ma.array(array.copy()) self.wolftype = wolftype_from_npz(array) if nullvalue is not None: self.nullvalue = nullvalue self.mask_data(self.nullvalue) self.reset_plot()
# ======================================================================== # File I/O # ========================================================================
[docs] def read_all(self, which_band = None): """Read the array from disk, auto-detecting the file format (WOLF binary, GeoTIFF, VRT, NumPy). :param which_band: band index (1-based) for multi-band rasters (GeoTIFF, VRT) """ THRESHOLD = 100_000_000 if not os.path.exists(self.filename): self._show_error_dialog(_('No data file : ')+self.filename) return def check_threshold(nbx, nby, THRESHOLD) -> bool: if nbx * nby > THRESHOLD: logging.info(_('The array is very large > 100M pixels')) logging.info(_('Preloading is not recommended for efficiency reasons')) logging.info(_('Maybe could you crop the array to a smaller size')) logging.info(_('Disabling automatic colormap update')) self.mypal.automatic = False return True else: return False if self.filename.endswith('.tif') or self.filename.endswith('.tiff'): self.read_txt_header() if self.preload: update_min_max = check_threshold(self.nbx, self.nby, THRESHOLD) self.import_geotif(which= which_band, crop = self.cropini) self.loaded = True if update_min_max: self.mypal.distribute_values(self.array.min(), self.array.max()) elif self.filename.endswith('.npy'): self.read_txt_header() if self.preload: update_min_max = check_threshold(self.nbx, self.nby, THRESHOLD) self._import_npy(crop = self.cropini) self.loaded = True if update_min_max: self.mypal.distribute_values(self.array.min(), self.array.max()) elif self.filename.endswith('.vrt'): self.read_txt_header() if self.preload: update_min_max = check_threshold(self.nbx, self.nby, THRESHOLD) self.import_vrt(which= which_band, crop = self.cropini) self.loaded = True if update_min_max: self.mypal.distribute_values(self.array.min(), self.array.max()) else: self.read_txt_header() if self.nb_blocks > 0: # At this point, we have the header, we know the number of blocks, if exists self.myblocks = {} if self.preload: update_min_max = check_threshold(self.nbx, self.nby, THRESHOLD) self.read_data() self.loaded = True if update_min_max: self.mypal.distribute_values(self.array.min(), self.array.max())
[docs] def read_data(self): """ Read binary array data from the current file, with optional cropping. """ if not os.path.exists(self.filename): logging.warning(_('No data file : ')+self.filename) return if self.cropini is None: with open(self.filename, 'rb') as f: self._read_binary_data(f) else: tmpdx = self.dx if type(self.cropini) is np.ndarray: pass elif type(self.cropini) is list: pass else: # cropini is not a valid list/array - ask user via hook crop_result = self._prompt_crop(self.dx, self.dy) if crop_result is None: logging.warning(_('No crop defined')) logging.info(_('Abort reading data !')) return self.cropini = crop_result self.cropini = [[self.cropini[0][0] + self.dx/2., self.cropini[0][1] + self.dx/2.], [self.cropini[1][0] + self.dy/2., self.cropini[1][1] + self.dy/2.]] with open(self.filename, 'rb') as f: if self.wolftype == WOLF_ARRAY_FULL_SINGLE or self.wolftype == WOLF_ARRAY_FULL_SINGLE_3D: imin, jmin = self.get_ij_from_xy(self.cropini[0][0], self.cropini[1][0]) imax, jmax = self.get_ij_from_xy(self.cropini[0][1], self.cropini[1][1]) imin = int(imin) jmin = int(jmin) imax = int(imax) jmax = int(jmax) oldnbx = self.nbx oldnby = self.nby self.nbx = imax - imin self.nby = jmax - jmin self.origx, self.origy = self.get_xy_from_ij(imin, jmin) self.origx -= self.dx / 2. self.origy -= self.dy / 2. locarray = np.zeros([self.nbx, self.nby]) # on boucle sur les 'j' nbi = imax - imin if self.filename.endswith('.flt'): f.seek(((oldnby - jmax) * oldnbx + imin) * 4) else: f.seek((imin + jmin * oldnbx) * 4) for j in range(jmin, jmax): locarray[0:imax - imin, j - jmin] = np.frombuffer(f.read(4 * nbi), dtype=np.float32) f.seek((oldnbx - nbi) * 4, 1) self.array = ma.masked_array(locarray, dtype=np.float32) else: logging.warning(_('Unsupported wolftype for cropping')) return if self.filename.endswith('.flt'): # fichier .flt --> miroir "horizontal" self.array = np.fliplr(self.array) if self.dx != tmpdx: self.rebin(tmpdx / self.dx) self.loaded = True
[docs] def _read_binary_data(self, f, seek=0): """Read binary data from file :param f: open file handle in binary read mode :param seek: byte offset to seek to before reading (0 = read from current position) """ if seek > 0: f.seek(0) if self.wolftype == WOLF_ARRAY_FULL_SINGLE or self.wolftype == WOLF_ARRAY_FULL_SINGLE_3D: locarray = np.frombuffer(f.read(self.nbx * self.nby * 4), dtype=np.float32) self.array = ma.masked_array(locarray.copy(), dtype=np.float32) elif self.wolftype == WOLF_ARRAY_FULL_LOGICAL: locarray = np.frombuffer(f.read(self.nbx * self.nby * 2), dtype=np.int16) self.array = ma.masked_array(locarray.copy(), dtype=np.int16) elif self.wolftype == WOLF_ARRAY_FULL_DOUBLE: locarray = np.frombuffer(f.read(self.nbx * self.nby * 8), dtype=np.float64) self.array = ma.masked_array(locarray.copy(), dtype=np.float64) elif self.wolftype == WOLF_ARRAY_FULL_INTEGER: locarray = np.frombuffer(f.read(self.nbx * self.nby * 4), dtype=np.int32) self.array = ma.masked_array(locarray.copy(), dtype=np.int32) elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]: locarray = np.frombuffer(f.read(self.nbx * self.nby * 2), dtype=np.int16) self.array = ma.masked_array(locarray.copy(), dtype=np.int16) elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER8]: locarray = np.frombuffer(f.read(self.nbx * self.nby * 2), dtype=np.int8) self.array = ma.masked_array(locarray.copy(), dtype=np.int8) elif self.wolftype in [WOLF_ARRAY_FULL_UINTEGER8]: locarray = np.frombuffer(f.read(self.nbx * self.nby * 2), dtype=np.uint8) self.array = ma.masked_array(locarray.copy(), dtype=np.uint8) elif self.wolftype in [WOLF_ARRAY_FULL_UINTEGER16]: locarray = np.frombuffer(f.read(self.nbx * self.nby * 2), dtype=np.uint16) self.array = ma.masked_array(locarray.copy(), dtype=np.uint16) elif self.wolftype in [WOLF_ARRAY_FULL_UINTEGER32]: locarray = np.frombuffer(f.read(self.nbx * self.nby * 4), dtype=np.uint32) self.array = ma.masked_array(locarray.copy(), dtype=np.uint32) if self.nbdims == 2: self.array = self.array.reshape(self.nbx, self.nby, order='F') if self.flipupd: self.array=np.fliplr(self.array) elif self.nbdims == 3: self.array = self.array.reshape(self.nbx, self.nby, self.nbz, order='F')
[docs] def read_txt_header(self): """ Read header from txt file Supercharged by WolfArrayModel to avoid explicit call to read_txt_header with parameters """ super().read_txt_header(self.filename)
[docs] def _reload(self): """ Reload the data from the file """ self.read_all() self.mask_force_null()
[docs] def write_all(self, newpath:str | Path = None, EPSG:int = None): """ Ecriture de tous les fichiers d'un Wolf array :param newpath: new path and filename with extension -- if None, use the current filename :param EPSG: EPSG code for geotiff """ if isinstance(newpath, Path): newpath = str(newpath) if newpath is not None: self.filename = newpath self.filename = str(self.filename) if self.filename.endswith('.tif') or self.filename.endswith('.tiff'): self.export_geotif(EPSG=EPSG) elif self.filename.endswith('.npy'): writing_header = True if self.dtype != self.array.data.dtype: logging.warning(_('Data type changed -- Force conversion to internal numpy array')) locarray = self.array.data if locarray.dtype == np.float32: self.wolftype = WOLF_ARRAY_FULL_SINGLE logging.warning(_('Data type changed to float32')) elif locarray.dtype == np.float64: self.wolftype = WOLF_ARRAY_FULL_DOUBLE logging.warning(_('Data type changed to float64')) elif locarray.dtype == np.int32: self.wolftype = WOLF_ARRAY_FULL_INTEGER logging.warning(_('Data type changed to int32')) elif locarray.dtype == np.int16: self.wolftype = WOLF_ARRAY_FULL_INTEGER16 logging.warning(_('Data type changed to int16')) elif locarray.dtype == np.int8: self.wolftype = WOLF_ARRAY_FULL_INTEGER8 logging.warning(_('Data type changed to int8')) elif locarray.dtype == np.uint8: self.wolftype = WOLF_ARRAY_FULL_UINTEGER8 logging.warning(_('Data type changed to uint8')) elif locarray.dtype == np.uint16: self.wolftype = WOLF_ARRAY_FULL_UINTEGER16 logging.warning(_('Data type changed to uint16')) elif locarray.dtype == np.uint32: self.wolftype = WOLF_ARRAY_FULL_UINTEGER32 logging.warning(_('Data type changed to uint32')) else: logging.error(_('Unsupported type in numpy file -- Abort wrting header file')) writing_header = False np.save(self.filename, self.array.data) if writing_header: self.write_txt_header() else: self.write_txt_header() self.write_array()
[docs] def write_array(self): """ Write the raw binary array data to the current file. """ self.array.data.transpose().tofile(self.filename, "")
[docs] def write_txt_header(self): """ Write header to txt file Supercharged by WolfArrayModel to avoid explicit call to write_txt_header with parameters """ super().write_txt_header(self.filename+'.txt', self.wolftype, forceupdate=True)
[docs] def write_xyz(self, fname:str): """Write all unmasked cell data as an XYZ text file. :param fname: output file path for the XYZ file """ my_file = XYZFile(fname) my_file.fill_from_wolf_array(self) my_file.write_to_file()
[docs] def get_xyz(self, which='all') -> np.ndarray: """Return an array of xyz coordinates and values :param which: which values to export: ``'all'`` includes all unmasked cells :return: Nx3 array of ``[x, y, z]`` coordinates for all unmasked cells :rtype: numpy.ndarray """ x1, y1 = self.get_xy_from_ij(0, 0) x2, y2 = self.get_xy_from_ij(self.nbx, self.nby, aswolf=True) xloc = np.linspace(x1, x2, self.nbx) yloc = np.linspace(y1, y2, self.nby) xy = np.meshgrid(xloc, yloc, indexing='xy') xyz = np.column_stack([xy[0].flatten(), xy[1].flatten(), self.array.flatten()]) filter = np.invert(ma.getmaskarray(self.array).flatten()) return xyz[filter]
# ======================================================================== # Import (External Formats) # ========================================================================
[docs] def _import_npy(self, fn:str='', crop:list[float]=None): """ Import a numpy file. Must be called after the header is initialized, e.g. read_txt_header. :param fn: filename :param crop: crop the data - [xmin, xmax, ymin, ymax] """ if fn !='': pass elif self.filename !='': fn = self.filename else: return # Numpy format locarray = np.load(self.filename) assert locarray.shape[0] == self.nbx, _('Incompatible dimensions') assert locarray.shape[1] == self.nby, _('Incompatible dimensions') if crop is not None : # logging.error(_('Cropping not yet implemented for numpy files')) imin, jmin = self.get_ij_from_xy(crop[0][0], crop[1][0]) imax, jmax = self.get_ij_from_xy(crop[0][1], crop[1][1]) imin = int(imin) jmin = int(jmin) imax = int(imax) jmax = int(jmax) self.nbx = imax - imin self.nby = jmax - jmin self.dx = self.dx self.dy = self.dy self.origx, self.origy = self.get_xy_from_ij(imin, jmin) self.origx -= self.dx / 2. self.origy -= self.dy / 2. self.translx = self.translx self.transly = self.transly locarray = locarray[imin:imax, jmin:jmax] self.array = np.ma.asarray(locarray)
[docs] def import_vrt(self, fn:str='', which:int = None, crop:list[float]=None): """ Import a VRT file. A Virtual Raster Tile (VRT) is an XML-based format used by GDAL to describe a raster dataset. It allows users to create virtual datasets that reference other raster files without duplicating the actual data. VRT files can be used to mosaic multiple raster files, apply transformations, or define custom raster structures. :param fn: filename :param which: band to import :param crop: crop the data - [xmin, xmax, ymin, ymax] """ if fn !='': pass elif self.filename !='': fn = self.filename else: return if not os.path.exists(fn): logging.error(_('File not found')) return # convert vrt to tif and mport tif with tempfile.TemporaryDirectory() as tmpdirname: tmpfile = join(tmpdirname, 'tmp.tif') if crop is not None: tmpdx = self.dx fn_crop = fn + '_crop.tif' if type(crop) is np.ndarray: if crop.shape == (4,): logging.error(_('Crop must be a list or a numpy array with 4 values - [[xmin, xmax], [ymin, ymax]]')) crop = [[crop[0], crop[1]], [crop[2], crop[3]]] elif isinstance(crop, list | tuple): if len(crop) == 4: logging.error(_('Crop must be a list or a numpy array with 4 values - [[xmin, xmax], [ymin, ymax]]')) crop = [[crop[0], crop[1]], [crop[2], crop[3]]] else: # crop is not a valid list/array - ask user via hook raster_in:gdal.Dataset raster_in = gdal.Open(fn) geotr = raster_in.GetGeoTransform() self.dx = geotr[1] self.dy = abs(geotr[5]) crop = self._prompt_crop(self.dx, self.dy) if crop is None: return [xmin, xmax], [ymin, ymax] = crop gdal.Translate(tmpfile, fn, projWin=[xmin, ymax, xmax, ymin]) else: gdal.Translate(tmpfile, fn) self.import_geotif(tmpfile, which)
[docs] def import_geotif(self, fn:str='', which:int = None, crop:list[float]=None): """ Import Geotiff Raster file. A Geotiff (Geographic Tagged Image File Format) is a widely used raster file format that embeds geographic metadata within the TIFF file structure. It allows for the storage of georeferencing information, such as coordinate systems, map projections, and spatial extents, alongside the raster image data. This makes Geotiff files suitable for use in Geographic Information Systems (GIS) and remote sensing applications. Formats supportés : - Int8 - Int16 - Int32 - Float32 - Float64 - uInt8 - uInt16 - uInt32 :param fn: filename :param which: band to import :param crop: crop the data - [xmin, xmax, ymin, ymax] or [[xmin, xmax], [ymin, ymax]] """ # from osgeo import gdal, osr, gdalconst if fn !='': pass elif self.filename !='': fn = self.filename else: logging.error(_('No filename provided in import_geotif')) return need_wx = False raster:gdal.Dataset if crop is not None : if not os.path.exists(fn): logging.error(_('File not found')) return tmpdx = self.dx fn_crop = fn + '_crop.tif' raster_in:gdal.Dataset raster_in = gdal.Open(fn) geotr = raster_in.GetGeoTransform() self.dx = geotr[1] self.dy = abs(geotr[5]) ulx = geotr[0] uly = geotr[3] llx = ulx + geotr[1] * raster_in.RasterXSize lly = uly + geotr[5] * raster_in.RasterYSize if type(crop) is np.ndarray: if crop.shape == (4,): pass elif crop.shape == (2,2): crop = [crop[0][0], crop[0][1], crop[1][0], crop[1][1]] else: logging.error(_('Crop must be a list or a numpy array with 4 values - xmin, xmax, ymin, ymax')) elif type(crop) in [list, tuple]: if len(crop) == 2: if len(crop[0]) == 2 and len(crop[1]) == 2: crop = [crop[0][0], crop[0][1], crop[1][0], crop[1][1]] else: logging.error(_('Crop must be a list or a numpy array with 4 values - xmin, xmax, ymin, ymax')) return elif len(crop) ==4: pass else: # crop is not a valid list/array - ask user via hook crop = self._prompt_crop(self.dx, self.dy) if crop is None: return # _prompt_crop returns [[xmin,xmax],[ymin,ymax]], flatten to [xmin,xmax,ymin,ymax] if isinstance(crop, list) and len(crop) == 2: crop = [crop[0][0], crop[0][1], crop[1][0], crop[1][1]] need_wx = True xmin, xmax, ymin, ymax = crop if need_wx: fn_crop = self._prompt_save_file(_('Save the cropped file for later'), "Tiff files (*.tif)|*.tif") if fn_crop is None: from tempfile import NamedTemporaryFile fn_crop = str(NamedTemporaryFile(suffix='.tif').name) if uly > lly: raster = gdal.Translate(fn_crop, fn, projWin=[xmin, ymax, xmax, ymin]) else: raster = gdal.Translate(fn_crop, fn, projWin=[xmin, ymin, xmax, ymax]) fn = fn_crop else: fn_tmp = '__tmp_wolf.tif' if uly > lly: raster = gdal.Translate(fn_tmp, fn, projWin=[xmin, ymax, xmax, ymin]) else: raster = gdal.Translate(fn_tmp, fn, projWin=[xmin, ymin, xmax, ymax]) fn = fn_tmp else: raster = gdal.Open(fn) if raster is None: logging.error(_('Could not open the file {fn}')) return # Dimensions self.nbx = raster.RasterXSize self.nby = raster.RasterYSize # Number of bands nb = raster.RasterCount if which is None: names = [raster.GetRasterBand(which+1).GetDescription() for which in range(nb)] if nb>1: which = self._prompt_band_selection(names) if which is None: return else: which=1 else: if which > nb: which = nb if which < 1: which = 1 # Metadata for the raster dataset # meta = raster.GetMetadata() # Read the raster band as separate variable band = raster.GetRasterBand(which) # Data type of the values self.nullvalue = band.GetNoDataValue() if self.nullvalue is None: self.nullvalue = 0. geotr = raster.GetGeoTransform() self.origx = geotr[0] self.dx = geotr[1] self.dy = abs(geotr[5]) try: if geotr[5] <0.: self.origy = geotr[3]+geotr[5]*self.nby tmp_array = np.transpose(np.flipud(band.ReadAsArray())) else: self.origy = geotr[3] tmp_array = np.transpose(band.ReadAsArray()) self.array = np.ma.asarray(tmp_array.copy()) assert self.array.iscontiguous(), _('Array is not contiguous') del tmp_array if self.array.dtype == np.float64: self.wolftype = WOLF_ARRAY_FULL_DOUBLE elif self.array.dtype == np.float32: self.wolftype = WOLF_ARRAY_FULL_SINGLE elif self.array.dtype == np.int32: self.wolftype = WOLF_ARRAY_FULL_INTEGER # elif self.array.dtype == np.uint32: # logging.warning(_('***************************************************')) # logging.warning(_(' Conversion of uint32 to int32')) # logging.warning(_(' If you save this file, it will be converted to int32')) # logging.warning(_('***************************************************')) # self.array = self.array.astype(np.int32) # self.wolftype = WOLF_ARRAY_FULL_INTEGER # elif self.array.dtype == np.uint8: # logging.warning(_('***************************************************')) # logging.warning(_(' Conversion of uint8 to int8')) # logging.warning(_(' If you save this file, it will be converted to int8')) # logging.warning(_('***************************************************')) # self.array = self.array.astype(np.int8) # self.wolftype = WOLF_ARRAY_FULL_INTEGER8 elif self.array.dtype == np.uint32: self.wolftype = WOLF_ARRAY_FULL_UINTEGER32 elif self.array.dtype == np.uint8: self.wolftype = WOLF_ARRAY_FULL_UINTEGER8 elif self.array.dtype == np.uint16: self.wolftype = WOLF_ARRAY_FULL_UINTEGER16 elif self.array.dtype == np.int8: self.wolftype = WOLF_ARRAY_FULL_INTEGER8 elif self.array.dtype == np.int16: self.wolftype = WOLF_ARRAY_FULL_INTEGER16 except: logging.warning(_('Error during importing tif file')) # Find the EPSG code proj = raster.GetProjection() srs = osr.SpatialReference() srs.ImportFromWkt(proj) if srs.IsProjected: self.epsg = int(srs.GetAttrValue('AUTHORITY',1)) if "EPSG:" in proj: try: idx1 = proj.index("EPSG:") idx2 = proj.index('"', idx1) epsg2 = int(proj[idx1+5:idx2]) if epsg2 != self.epsg: logging.warning(_('Mismatch in EPSG code detection: {} and {}').format(self.epsg, epsg2)) logging.warning(_("Check your file's projection -- You can use gdalinfo command line tool")) self.epsg = epsg2 logging.warning(_('Using EPSG: {}').format(self.epsg)) except: logging.warning(_('Could not check EPSG code in the projection string')) logging.warning(_("Check your file's projection -- You can use gdalinfo command line tool")) else: self.epsg = self.epsg_parent # Close the raster raster.FlushCache() raster = None try: if fn_tmp == '__tmp_wolf.tif': os.remove(fn_tmp) except: pass
[docs] def import_from_gltf(self, fn:str='', fnpos:str='', interp_method:Literal['matplotlib','scipy'] = 'matplotlib'): """ Import from GLTF/GLB format :param fn: filename :param fnpos: filename for the position's information :param interp_method: method for the interpolation -- 'matplotlib' or 'scipy' """ #FIXME : add the possibility to use PyVista if fn == '' or fnpos == '': logging.info(_('Retry !! -- Bad files')) return if self.mapviewer is not None: if self.mapviewer.link_params is None: self.mapviewer.link_params = {} self.mapviewer.link_params['gltf file'] = fn self.mapviewer.link_params['gltf pos'] = fnpos mytri = Triangulation() mytri.import_from_gltf(fn) with open(fnpos, 'r') as f: mylines = f.read().splitlines() ox = float(mylines[0]) oy = float(mylines[1]) nbx = int(mylines[2]) nby = int(mylines[3]) i1 = int(mylines[4]) j1 = int(mylines[5]) i2 = int(mylines[6]) j2 = int(mylines[7]) xmin = float(mylines[8]) xmax = float(mylines[9]) ymin = float(mylines[10]) ymax = float(mylines[11]) try: znull = float(mylines[12]) except: znull=-99999. x = np.arange(self.dx / 2. + ox, float(nbx) * self.dx + self.dx / 2 + ox, self.dx) y = np.arange(self.dy / 2. + oy, float(nby) * self.dy + self.dy / 2 + oy, self.dy) if interp_method =='matplotlib': grid_x, grid_y = np.meshgrid(x, y, indexing='ij') self.interpolate_on_triangulation(np.asarray(mytri.pts),mytri.tri, grid_x, grid_y, mytri.get_mask()) else: grid_x, grid_y = np.meshgrid(x, y, sparse=True, indexing='xy') newvalues = griddata(np.asarray(mytri.pts)[:,0:2], np.asarray(mytri.pts)[:,2], (grid_x, grid_y), method='linear') locmask = np.logical_and(np.logical_not(self.array.mask[i1:i2, j1:j2]), np.logical_not(np.isnan(newvalues.transpose()))) self.array.data[i1:i2, j1:j2][locmask] = newvalues.transpose()[locmask] self.reset_plot()
# ======================================================================== # Export (External Formats) # ========================================================================
[docs] def export_geotif(self, outdir:str= '', extent:str= '', EPSG:int = None): """ Export de la matrice au format Geotiff Pour sauvegarder l'objet au format Geotiff, il est recommandé d'utiliser la fonction write_all plutôt que celle-ci directement. Formats supportés : - Int32 - Int16 - Int8 - Byte - Float32 - Float64 :param outdir: directory - If provided, the file will be savd as "outdir/idx+extent.tif" If not provided, we use the filename attribute :param extent: suffix to add to the filename before the extension '.tif' (only if outdir is provided) :param EPSG: EPSG code, by default 31370 (Lambert 72) """ if EPSG is None and self.epsg is None: EPSG = 31370 # Default EPSG code is Belgian Lambert 72 elif EPSG is None: EPSG = self.epsg outdir = str(outdir) extent = str(extent) srs = osr.SpatialReference() srs.ImportFromEPSG(EPSG) if outdir=='' and extent=='': filename = self.filename else: filename = join(outdir,self.idx)+extent+'.tif' arr=self.array if arr.dtype == np.float32: arr_type = gdal.GDT_Float32 nullvalue = self.nullvalue elif arr.dtype == np.float64: arr_type = gdal.GDT_Float64 nullvalue = self.nullvalue elif arr.dtype == np.int8: arr_type = gdal.GDT_Byte nullvalue = int(self.nullvalue) elif arr.dtype == np.int16: arr_type = gdal.GDT_Int16 nullvalue = int(self.nullvalue) elif arr.dtype == np.uint8: arr_type = gdal.GDT_Byte nullvalue = int(self.nullvalue) else: arr_type = gdal.GDT_Int32 nullvalue = int(self.nullvalue) driver: gdal.Driver out_ds: gdal.Dataset band: gdal.Band driver = gdal.GetDriverByName("GTiff") # bytes_per_pixel = arr.data.dtype.itemsize estimated_file_size = self.memory_usage #arr.shape[0] * arr.shape[1] * bytes_per_pixel # Check if estimated file size exceeds 4GB if (estimated_file_size > 4 * 1024**3): # 4GB in bytes options = ['COMPRESS=LZW', 'BIGTIFF=YES'] logging.info('BigTIFF format will be used!') else: options = ['COMPRESS=LZW'] out_ds = driver.Create(str(filename), arr.shape[0], arr.shape[1], 1, arr_type, options=options) if out_ds is None: logging.error(_('Could not create the file {filename}')) return out_ds.SetProjection(srs.ExportToWkt()) # On utilise le coin supérieur gauche de la matrice et la taille des pixels selon y est négative # !! gdalBuiltvrt ne supporte que cette convention !! out_ds.SetGeoTransform([self.origx+self.translx, self.dx, 0., self.origy+self.transly+float(self.nby)*self.dy, 0., -self.dy]) band = out_ds.GetRasterBand(1) band.SetNoDataValue(nullvalue) band.WriteArray(np.flipud(arr.data.transpose())) band.FlushCache() try: band.ComputeStatistics(True) except RuntimeError as e: # If the array is mainly composed of null values, ComputeStatistics(True) will fail because ComputeStatistics(True) takes a subset of the data to compute statistics (and it may only find null values resulting to an error while in the array, it is not the case) try: band.ComputeStatistics(0) # Using 0 for exact stats instead of True (which implies approximate) except RuntimeError as e: print("Warning: ComputeStatistics failed:", e)
[docs] def export_cityjson(self, outdir:str= '', extent:str= '', EPSG:int = None): """Export the array as a CityJSON file with one polygon per grid cell. :param outdir: output directory; empty string uses :attr:`filename` :param extent: suffix appended to filename before ``.json`` extension :param EPSG: EPSG code for the coordinate reference system (default: 31370) """ if EPSG is None and self.epsg is None: EPSG = 31370 # Default EPSG code is Belgian Lambert 72 elif EPSG is None: EPSG = self.epsg outdir = str(outdir) extent = str(extent) if outdir=='' and extent=='': filename = self.filename else: filename = join(outdir,self.idx)+extent+'.json' # Create a CityJSON object cityjson = { "type": "CityJSON", "version": "1.0", "metadata": { "referenceSystem": f"urn:ogc:def:crs:EPSG::{EPSG}" }, "CityObjects": {}, "vertices": [] } # Iterate through the array and create CityObjects for each cell for i in tqdm(range(self.nbx)): for j in range(self.nby): x, y = self.get_xy_from_ij(i, j) cityjson["CityObjects"][f"cell_{i}_{j}"] = { "type": "CityObject", "geometry": { "type": "Polygon", "coordinates": [ [ [x - self.dx / 2., y - self.dy / 2.], [x + self.dx / 2., y - self.dy / 2.], [x + self.dx / 2., y + self.dy / 2.], [x - self.dx / 2., y + self.dy / 2.], [x - self.dx / 2., y - self.dy / 2.] ] ] } } with open(filename, 'w') as f: json.dump(cityjson, f, indent=2)
[docs] def export_to_gltf(self, bounds:list[float]=None, fn:str=''): """ Export to GLTF/GLB format :param bounds: [[xmin,xmax],[ymin,ymax]] :param fn: filename """ mytri, znull = self.get_triangulation(bounds) mytri.export_to_gltf(fn) mytri.saveas(fn+'.tri') if bounds is None: ox = self.origx + self.translx oy = self.origy + self.transly nbx = self.nbx nby = self.nby i1 = 0 i2 = self.nbx j1 = 0 j2 = self.nby bounds = [[ox,ox+float(nbx)*self.dx],[oy,oy+float(nby)*self.dy]] else: ox = max(self.origx, bounds[0][0]) oy = max(self.origy, bounds[1][0]) i1, j1 = self.get_ij_from_xy(ox, oy) i2, j2 = self.get_ij_from_xy(bounds[0][1], bounds[1][1]) i1 = max(i1, 0) j1 = max(j1, 0) i2 = min(i2 + 1, self.nbx) j2 = min(j2 + 1, self.nby) nbx = i2 - i1 nby = j2 - j1 with open(fn + '.pos', 'w') as f: f.write(str(ox) + '\n') f.write(str(oy) + '\n') f.write(str(nbx) + '\n') f.write(str(nby) + '\n') f.write(str(i1) + '\n') f.write(str(j1) + '\n') f.write(str(i2) + '\n') f.write(str(j2) + '\n') f.write(str(bounds[0][0]) + '\n') f.write(str(bounds[0][1]) + '\n') f.write(str(bounds[1][0]) + '\n') f.write(str(bounds[1][1]) + '\n') f.write(str(znull))
# ======================================================================== # Coordinate, Bounds & Grid Utilities # ========================================================================
[docs] def check_bounds_ij(self, i:int, j:int): """Check if i and j are inside the array bounds :param i: column index (0-based) :param j: row index (0-based) :return: ``True`` if (i, j) is inside the array bounds :rtype: bool """ return i>=0 and j>=0 and i<self.nbx and j<self.nby
[docs] def check_bounds_xy(self, x:float, y:float): """Check if i and j are inside the array bounds :param x: x-coordinate in world space :param y: y-coordinate in world space :return: ``True`` if (x, y) falls inside the array spatial extent :rtype: bool """ i,j = self.get_ij_from_xy(x,y) return self.check_bounds_ij(i,j)
[docs] def get_centers(self, usenap:bool = True): """Get the centers of the cells :param usenap: if ``True``, return only unmasked cell centers; if ``False``, return all :return: flattened array of cell center coordinates ``[x1, y1, x2, y2, ...]`` :rtype: numpy.ndarray """ if usenap: ij = np.argwhere(self.array.mask==False,) xy = self.get_xy_from_ij_array(ij).copy().flatten() else: ij = np.meshgrid(np.arange(self.nbx), np.arange(self.nby)) ij = np.asarray([ij[0].flatten(), ij[1].flatten()]).T xy = self.get_xy_from_ij_array(ij).copy().flatten() return xy.astype(np.float32)
[docs] def get_dxdy_min(self): """ Return the minimal size Mainly useful in PyVertexVectors to get the minimal size of the cells and ensure compatibility with the 2D results (GPU and Multiblocks) """ return min(self.dx, self.dy)
[docs] def get_dxdy_max(self): """ Return the maximal size Mainly useful in PyVertexVectors to get the minimal size of the cells and ensure compatibility with the 2D results (GPU and Multiblocks) """ return max(self.dx, self.dy)
[docs] def meshgrid(self, mode:Literal['gc', 'borders']='gc'): """ Création d'un maillage 2D :param mode: 'gc' pour les centres de mailles, 'borders' pour les bords de mailles """ x_start = self.translx + self.origx y_start = self.transly + self.origy if mode == 'gc': x_discr = np.linspace(x_start + self.dx / 2, x_start + self.nbx * self.dx - self.dx / 2, self.nbx) y_discr = np.linspace(y_start + self.dy / 2, y_start + self.nby * self.dy - self.dy / 2, self.nby) elif mode == 'borders': x_discr = np.linspace(x_start, x_start + self.nbx * self.dx, self.nbx + 1) y_discr = np.linspace(y_start, y_start + self.nby * self.dy, self.nby + 1) y, x = np.meshgrid(y_discr, x_discr) return x, y
[docs] def get_value(self, x:float, y:float, z:float=0., nullvalue:float=-99999, convert_to_float:bool=True): """ Return the value at given coordinates :param x: x coordinate :param y: y coordinate :param z: z coordinate :param nullvalue: value to return if the point is outside the array :param convert_to_float: if ``True``, cast the result to ``float`` """ if isinstance(self.array.mask, np.bool_): logging.error(_('Mask is not an array - Please check your data')) return nullvalue if self.nbdims == 2: i, j = self.get_ij_from_xy(x, y) if i >= 0 and i < self.nbx and j >= 0 and j < self.nby: if self.array.mask[i, j]: value = nullvalue else: value = self.array[i, j] else: value = nullvalue elif self.nbdims == 3: i, j, k = self.get_ij_from_xy(x, y, z) if i >= 0 and i < self.nbx and j >= 0 and j < self.nby and k >= 0 and k < self.nbz: if self.array.mask[i, j, k]: value = nullvalue else: value = self.array[i, j, k] else: value = nullvalue if convert_to_float: return float(value) else: return value
[docs] def get_xlim(self, window_x:float, window_y:float): """ Return the limits in x for a given window size :param window_x: window size in x :param window_y: window size in y """ a_x = window_x / (float(self.nbx) * self.dx) a_y = window_y / (float(self.nby) * self.dy) if a_x < a_y: # C'est la mise à l'échelle selon x qui compte return (self.origx + self.translx, self.origx + self.translx + self.nbx * self.dx) else: # C'est la mise à l'échelle selon y qui compte l = (self.nby * self.dy) / window_y * window_x return (self.origx + self.translx + self.nbx * self.dx * 0.5 - l * 0.5, self.origx + self.translx + self.nbx * self.dx * 0.5 + l * 0.5)
[docs] def get_ylim(self, window_x:float, window_y:float): """ Retrun the limits in y for a given window size :param window_x: window size in x :param window_y: window size in y """ a_x = window_x / (float(self.nbx) * self.dx) a_y = window_y / (float(self.nby) * self.dy) if a_x < a_y: # C'est la mise à l'échelle selon x qui compte l = (self.nbx * self.dx) / window_x * window_y return (self.origy + self.transly + self.nby * self.dy * 0.5 - l * 0.5, self.origy + self.transly + self.nby * self.dy * 0.5 + l * 0.5) else: # C'est la mise à l'échelle selon y qui compte return (self.origy + self.transly, self.origy + self.transly + self.nby * self.dy)
[docs] def get_working_array(self, onzoom:list[float]=[]): """ Return the part of the array in the zoom window :param onzoom: zoom window -- [xmin, xmax, ymin, ymax] """ if onzoom != []: istart, jstart = self.get_ij_from_xy(onzoom[0], onzoom[2]) iend, jend = self.get_ij_from_xy(onzoom[1], onzoom[3]) istart = 0 if istart < 0 else istart jstart = 0 if jstart < 0 else jstart iend = self.nbx if iend > self.nbx else iend jend = self.nby if jend > self.nby else jend partarray = self.array[istart:iend, jstart:jend] self.nbnotnullzoom = partarray.count() return partarray[partarray.mask == False] else: return self.array[self.array.mask == False]
# ======================================================================== # Operator Overloading # ======================================================================== def __add__(self, other): """Addition operator (``+``). Return a new instance of the same type. :param other: right-hand operand -- either a scalar (``float`` / ``int``) or another :class:`WolfArrayModel` :return: new array with element-wise sum :rtype: same type as ``self`` """ newArray = self._new_like() if type(other) in [float, int]: if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]: other = float(other) elif type(other) == float and self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]: other = int(other) if other != 0.: newArray.array = np.ma.masked_array(self.array + other, self.array.mask, dtype=self.array.dtype) else: newArray.array = np.ma.masked_array(self.array + other.array, self.array.mask, dtype=self.array.dtype) newArray.count() assert newArray.array.dtype == self.array.dtype, _('Bad dtype') return newArray def __sub__(self, other): """Subtraction operator (``-``). Return a new instance of the same type. :param other: right-hand operand -- either a scalar (``float`` / ``int``) or another :class:`WolfArrayModel` :return: new array with element-wise difference :rtype: same type as ``self`` """ newArray = self._new_like() if type(other) in [float, int]: if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]: other = float(other) elif type(other) == float and self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]: other = int(other) if other != 0.: newArray.array = np.ma.masked_array(self.array - other, self.array.mask, dtype=self.array.dtype) else: newArray.array = np.ma.masked_array(self.array - other.array, self.array.mask, dtype=self.array.dtype) newArray.count() assert newArray.array.dtype == self.array.dtype, _('Bad dtype') return newArray def __mul__(self, other): """Multiplication operator (``*``). Return a new instance of the same type. :param other: right-hand operand -- either a scalar (``float`` / ``int``) or another :class:`WolfArrayModel` :return: new array with element-wise product :rtype: same type as ``self`` """ newArray = self._new_like() if type(other) in [float, int]: if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]: other = float(other) elif type(other) == float and self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]: other = int(other) if other != 0.: newArray.array = np.ma.masked_array(self.array * other, self.array.mask, dtype=self.array.dtype) else: logging.debug(_('Multiplication by 0')) newArray.array = np.ma.masked_array(self.array * other, self.array.mask, dtype=self.array.dtype) else: newArray.array = np.ma.masked_array(self.array * other.array, self.array.mask, dtype=self.array.dtype) newArray.count() assert newArray.array.dtype == self.array.dtype, _('Bad dtype') return newArray def __truediv__(self, other): """Division operator (``/``). Return a new instance of the same type. :param other: right-hand operand -- either a scalar (``float`` / ``int``) or another :class:`WolfArrayModel` :return: new array with element-wise quotient :rtype: same type as ``self`` """ newArray = self._new_like() if type(other) in [float, int]: if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]: other = float(other) elif type(other) == float and self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]: other = int(other) if other != 0.: newArray.array = np.ma.masked_array(self.array / other, self.array.mask, dtype=self.array.dtype) else: logging.error(_('Division by 0 -- Nothing to do !')) else: newArray.array = np.ma.masked_array(np.where(other == 0., 0., self.array / other.array), self.array.mask, dtype=self.array.dtype) newArray.count() assert newArray.array.dtype == self.array.dtype, _('Bad dtype') return newArray def __pow__(self, other): """Power operator (``**``). Return a new instance of the same type. :param other: right-hand operand -- either a scalar (``float`` / ``int``) or another :class:`WolfArrayModel` :return: new array with element-wise power :rtype: same type as ``self`` """ newArray = self._new_like() if type(other) in [float, int]: if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]: other = float(other) elif type(other) == float and self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]: other = int(other) newArray.array = np.ma.masked_array(self.array ** other, self.array.mask, dtype=self.array.dtype) else: logging.error(_('Power operator only implemented for scalars')) newArray.array[:,:] = 0 newArray.count() assert newArray.array.dtype == self.array.dtype, _('Bad dtype') return newArray # ---- Unary operators -------------------------------------------------- def __neg__(self): """Unary negation (``-array``). Return a new instance of the same type. :return: new array with element-wise negation :rtype: same type as ``self`` """ newArray = self._new_like() newArray.array = np.ma.masked_array(-self.array, self.array.mask, dtype=self.array.dtype) newArray.count() return newArray def __abs__(self): """Absolute value (``abs(array)``). Return a new instance of the same type. :return: new array with element-wise absolute values :rtype: same type as ``self`` """ newArray = self._new_like() newArray.array = np.ma.masked_array(np.abs(self.array), self.array.mask, dtype=self.array.dtype) newArray.count() return newArray # ---- Reflected (right-hand) operators --------------------------------- def __radd__(self, other): """Reflected addition (``scalar + array``). :param other: left-hand scalar operand :rtype: same type as ``self`` """ return self.__add__(other) def __rsub__(self, other): """Reflected subtraction (``scalar - array``). :param other: left-hand scalar operand :rtype: same type as ``self`` """ return (-self).__add__(other) def __rmul__(self, other): """Reflected multiplication (``scalar * array``). :param other: left-hand scalar operand :rtype: same type as ``self`` """ return self.__mul__(other) def __rtruediv__(self, other): """Reflected division (``scalar / array``). :param other: left-hand scalar operand :rtype: same type as ``self`` """ newArray = self._new_like() if type(other) in [float, int]: if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]: other = float(other) safe = np.where(self.array == 0, 1, self.array) result = np.where(self.array == 0, 0, other / safe) newArray.array = np.ma.masked_array(result, self.array.mask, dtype=self.array.dtype) else: logging.error(_('Reflected division only implemented for scalars')) newArray.array = np.ma.masked_array(np.zeros_like(self.array), self.array.mask, dtype=self.array.dtype) newArray.count() return newArray # ---- Floor division --------------------------------------------------- def __floordiv__(self, other): """Floor division operator (``//``). Return a new instance of the same type. :param other: right-hand operand -- either a scalar (``float`` / ``int``) or another :class:`WolfArrayModel` :return: new array with element-wise floor division :rtype: same type as ``self`` """ newArray = self._new_like() if type(other) in [float, int]: if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]: other = float(other) elif type(other) == float and self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]: other = int(other) if other != 0: newArray.array = np.ma.masked_array(self.array // other, self.array.mask, dtype=self.array.dtype) else: logging.error(_('Floor division by 0 -- Nothing to do !')) else: newArray.array = np.ma.masked_array(np.where(other.array == 0, 0, self.array // np.where(other.array == 0, 1, other.array)), self.array.mask, dtype=self.array.dtype) newArray.count() assert newArray.array.dtype == self.array.dtype, _('Bad dtype') return newArray def __rfloordiv__(self, other): """Reflected floor division (``scalar // array``). :param other: left-hand scalar operand :rtype: same type as ``self`` """ newArray = self._new_like() if type(other) in [float, int]: if type(other) == int and self.wolftype in [WOLF_ARRAY_FULL_DOUBLE, WOLF_ARRAY_FULL_SINGLE]: other = float(other) safe = np.where(self.array == 0, 1, self.array) result = np.where(self.array == 0, 0, other // safe) newArray.array = np.ma.masked_array(result, self.array.mask, dtype=self.array.dtype) else: logging.error(_('Reflected floor division only implemented for scalars')) newArray.array = np.ma.masked_array(np.zeros_like(self.array), self.array.mask, dtype=self.array.dtype) newArray.count() return newArray # ======================================================================== # Masking & Null Value Operations # ========================================================================
[docs] def loadnap_and_apply(self): """ Load a mask file (aka nap) and apply it to the array; The mask values are set to the nullvalue. The mask file must have the same name as the array file, with the extension .napbin. It is useful for 2D WOLF simulations. """ file_name, file_extension = os.path.splitext(self.filename) fnnap = file_name + '.napbin' if os.path.exists(fnnap): locnap = WolfArrayModel(fnnap) self.array.data[np.where(locnap.array.mask)] = self.nullvalue self.mask_data(self.nullvalue) self.reset_plot()
[docs] def mask_data(self, value): """Mask all cells whose value equals the given value. :param value: cells equal to this value will be masked """ if self.array is None: return try: if not (np.isnan(value) or math.isnan(value)): if self.wolftype in [WOLF_ARRAY_FULL_INTEGER]: value=np.int32(value) elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER16_2]: value=np.int16(value) elif self.wolftype in [WOLF_ARRAY_FULL_INTEGER8]: try: value = np.int8(value) except: value = 0 logging.warning(_('Value too high for int8 conversion')) logging.warning(_('Value set to 0')) elif self.wolftype in [WOLF_ARRAY_FULL_UINTEGER8]: try: value = np.uint8(value) except: value = 0 logging.warning(_('Value too high for int8 conversion')) logging.warning(_('Value set to 0')) except: logging.error(_('Type not supported : {} - {}'.format(value, type(value)))) logging.warning(_('Masking operation compromised')) if value is not None: if isinstance(self.array.mask, np.bool_): # mask is not an array, but a single boolean value # we must create a new mask array if np.isnan(value) or math.isnan(value): self.array.mask = np.isnan(self.array.data) else: self.array.mask = self.array.data == value else: # Copy to prevent unlinking the mask (see `mask_reset`) if np.isnan(value) or math.isnan(value): np.copyto(self.array.mask, np.isnan(self.array.data)) # self.array.mask[:,:] = np.isnan(self.array.data) else: np.copyto(self.array.mask, self.array.data == value) # self.array.mask[:,:] = self.array.data == value self.count()
[docs] def mask_lower(self, value): """Mask cell where values are strictly lower than `value` :param value: cells strictly below this value will be masked """ if self.array is None: return # Copy to prevent unlinking the mask (see `mask_reset`) np.copyto(self.array.mask, self.array.data < value) self.count()
[docs] def mask_lowerequal(self, value): """Mask cell where values are lower or equal than `value` :param value: cells below or equal to this value will be masked """ if self.array is None: return # Copy to prevent unlinking the mask (see `mask_reset`) np.copyto(self.array.mask, self.array.data <= value) self.count()
[docs] def mask_greater(self, value): """Mask cell where values are strictly greater than `value` :param value: cells strictly above this value will be masked """ if self.array is None: return # Copy to prevent unlinking the mask (see `mask_reset`) np.copyto(self.array.mask, self.array.data > value) self.count()
[docs] def mask_greaterequal(self, value): """Mask cell where values are greater or equal than `value` :param value: cells above or equal to this value will be masked """ if self.array is None: return # Copy to prevent unlinking the mask (see `mask_reset`) np.copyto(self.array.mask, self.array.data >= value) self.count()
[docs] def mask_allexceptdata(self, value): """Mask cell where values are different from `value` :param value: only cells with this exact value remain unmasked """ if self.array is None: return # Copy to prevent unlinking the mask (see `mask_reset`) np.copyto(self.array.mask, self.array.data != value) self.count()
[docs] def mask_invert(self): """ Invert the mask """ if self.array is None: return # Copy to prevent unlinking the mask (see `mask_reset`) np.copyto(self.array.mask, np.logical_not(self.array.mask)) self.count()
[docs] def mask_outsidepoly(self, myvect: vector, eps:float = 0., set_nullvalue:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict'): """ Mask all cells located **outside** a polygon. :param myvect: target vector in global coordinates :param eps: tolerance for boundary detection :param set_nullvalue: if ``True``, set masked cells to :attr:`nullvalue` :param method: spatial method: ``'mpl'``, ``'shapely_strict'``, ``'shapely_wboundary'``, ``'rasterio'`` """ # The polygon here is in world coordinates # (coord will be converted back with translation, origin and dx/dy) # (mesh coord, 0-based) # trouve les indices dans le polygone myij = self.get_ij_inside_polygon(myvect, usemask=True, eps=eps, method=method) self.array.mask.fill(True) # Mask everything # démasquage des mailles contenues self.array.mask[myij[:,0],myij[:,1]] = False if set_nullvalue: # annulation des valeurs en dehors du polygone self.set_nullvalue_in_mask() self.count()
[docs] def mask_insidepoly(self, myvect: vector, eps:float = 0., set_nullvalue:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='mpl'): """ Mask all cells located **inside** a polygon. :param myvect: target vector in global coordinates :param eps: tolerance for boundary detection :param set_nullvalue: if ``True``, set masked cells to :attr:`nullvalue` :param method: spatial method: ``'mpl'``, ``'shapely_strict'``, ``'shapely_wboundary'``, ``'rasterio'`` """ # The polygon here is in world coordinates # (coord will be converted back with translation, origin and dx/dy) # (mesh coord, 0-based) # trouve les indices dans le polygone myij = self.get_ij_inside_polygon(myvect, usemask=False, eps=eps, method=method) if set_nullvalue: # annulation des valeurs en dehors du polygone self.array.data[myij[:,0],myij[:,1]] = self.nullvalue # masquage des mailles contenues self.array.mask[myij[:,0],myij[:,1]] = True self.count()
[docs] def mask_insidepolys(self, myvects: list[vector], eps:float = 0., set_nullvalue:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='mpl'): """ Mask all cells located **inside** any of the given polygons. :param myvect: target vector in global coordinates :param myvects: list of polygon vectors to mask inside :param eps: tolerance for boundary detection :param set_nullvalue: if ``True``, set masked cells to :attr:`nullvalue` :param method: spatial method: ``'mpl'``, ``'shapely_strict'``, ``'shapely_wboundary'``, ``'rasterio'`` """ # The polygon here is in world coordinates # (coord will be converted back with translation, origin and dx/dy) # (mesh coord, 0-based) # trouve les indices dans le polygone for myvect in myvects: myij = self.get_ij_inside_polygon(myvect, usemask=False, eps=eps, method=method) if myij.shape[0] == 0: logging.warning(_("No indices found in the polygon {}").format(myvect.myname)) else: # masquage des mailles contenues self.array.mask[myij[:,0],myij[:,1]] = True if set_nullvalue: # annulation des valeurs en dehors du polygone self.array.data[myij[:,0],myij[:,1]] = self.nullvalue self.count()
[docs] def copy_mask(self, source:"WolfArrayModel", forcenullvalue:bool= False, link:bool=True): """ Copy/Link the mask from another WolfArrayModel :param source: WolfArrayModel source :param forcenullvalue: force nullvalue in the masked zone :param link: link the mask if True (default), copy it otherwise """ assert self.shape == source.shape, _('Bad shape') if forcenullvalue: self.array[np.where(source.array.mask)] = self.nullvalue if link: self.array.mask = source.array.mask else: self.array.mask = source.array.mask.copy() self.nbnotnull = source.nbnotnull self.reset_plot()
[docs] def mask_union(self, source:"WolfArrayModel", link:bool=True): """ Union of the mask with another WolfArrayModel :param source: WolfArrayModel source :param link: link the mask if True (default), copy it otherwise """ union = self.array.mask & source.array.mask self.array[(~union) & (self.array.mask)] = self.nullvalue source.array[(~union) & (source.array.mask)] = self.nullvalue if link: self.array.mask = union source.array.mask = union else: self.array.mask = union.copy() source.array.mask = union.copy() self.reset_plot() source.reset_plot()
[docs] def mask_unions(self, sources:list["WolfArrayModel"], link:bool=True): """ Compute the union of masks from this array and a list of source arrays. :param source: list of WolfArrayModel sourceq :param link: link the mask if True (default), copy it otherwise :param sources: list of :class:`WolfArrayModel` whose masks are unified """ for cursrc in sources: union = self.array.mask & cursrc.array.mask self.array[(~union) & (self.array.mask)] = self.nullvalue for cursrc in sources: cursrc.array[(~union) & (cursrc.array.mask)] = self.nullvalue if link: self.array.mask = union for cursrc in sources: cursrc.array.mask = union else: self.array.mask = union.copy() for cursrc in sources: cursrc.array.mask = union.copy() self.reset_plot() for cursrc in sources: cursrc.reset_plot()
[docs] def copy_mask_log(self, mask:np.ndarray, link:bool=True): """ Copy the mask from a numpy array :param mask: numpy array :param link: link the mask if True (default), copy it otherwise """ assert self.shape == mask.shape, _('Bad shape') if self.array is None: logging.debug(_('No array !!')) return if link: self.array.mask = mask else: self.array.mask = mask.copy() self.reset_plot()
[docs] def set_nullvalue_in_mask(self): """ Set nullvalue in masked cells """ if self.array is None: return self.array.data[self.array.mask] = self.nullvalue
[docs] def mask_force_null(self): """ Force to unmask all and mask null value """ self.mask_reset() self.mask_data(self.nullvalue) self.reset_plot()
[docs] def unmask(self): """ alias to mask_reset """ self.mask_reset()
[docs] def mask_clear(self): """ alias to mask_reset """ self.mask_reset()
[docs] def mask_reset(self): """ Unmask everything """ if self.nbdims == 2: # FIXME if mask linking should work # as expected, then we do: self.array.mask.fill(0.0) # to avoid replacing the linked mask by a (non linked) one. if isinstance(self.array.mask, np.bool_): # mask is not an array, but a single boolean value self.array.mask = np.zeros(self.array.shape) else: self.array.mask.fill(False) # False == not masked self.nbnotnull = self.nbx * self.nby elif self.nbdims == 3: if isinstance(self.array.mask, np.bool_): self.array.mask = np.zeros((self.nbx, self.nby, self.nbz)) else: self.array.mask.fill(False) # False == not masked self.nbnotnull = self.nbx * self.nby * self.nbz
[docs] def create_binary_mask(self, threshold:float = 0.) -> "WolfArrayModel": """Create a binary mask from the array :param threshold: values above this threshold become 1 (unmasked), others become 0 (masked) :return: new binary array (1 where value > threshold, 0 otherwise) :rtype: WolfArrayModel """ newarray = type(self)(mold=self) newarray.array.data[:,:] = self.array.data > threshold newarray.array.mask[:,:] = ~newarray.array.data.astype(bool) newarray.set_nullvalue_in_mask() return newarray
[docs] def count(self): """ Count the number of not masked values """ if self.array is None: self.nbnotnull = 0 else: self.nbnotnull = self.array.count() return self.nbnotnull
# ======================================================================== # Spatial Queries (Polygon & Polyline) # ======================================================================== # ************************************************************************************************************************* # POSITION and VALUES associated to a vector/polygon/polyline # These functions can not be stored in header_wolf, because wa can use the mask of the array to limit the search # These functions are also present in WolfResults_2D, but they are not exactly the same due to the structure of the results # *************************************************************************************************************************
[docs] def get_xy_inside_polygon(self, myvect: vector | Polygon, usemask:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict', epsilon:float = 0.): """ Return the coordinates inside a polygon. Main principle: - Get all the coordinates in the footprint of the polygon (taking into account the epsilon if provided) - Test each coordinate if it is inside the polygon or not (and along boundary if method allows it) - Apply the mask if requested Returned values are stored in an array of shape (N,2) with N the number of points found inside the polygon. :param myvect: target vector - vector or Shapely Polygon :param usemask: limit potential nodes to unmaksed nodes :param method: method to use ('mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio') - default is 'shapely_strict' :param epsilon: tolerance for the point-in-polygon test - default is 0. """ if method == 'mpl': return self.get_xy_inside_polygon_mpl(myvect, usemask= usemask, epsilon = epsilon) elif method == 'shapely_strict': return self.get_xy_inside_polygon_shapely(myvect, usemask, strictly=True, epsilon=epsilon) elif method == 'shapely_wboundary': return self.get_xy_inside_polygon_shapely(myvect, usemask, strictly=False, epsilon=epsilon) elif method == 'rasterio': return self.get_xy_inside_polygon_rasterio(myvect, usemask, epsilon=epsilon)
[docs] def get_xy_strictly_on_borders(self, myvect: vector | Polygon | LineString, usemask:bool=True, method:Literal['shapely']='shapely'): """ Return the coordinates strictly on the borders of a polygon. ATTENTION : Tests are performed numerically, so the results may not be exact in the sense of pure geometry. Do not confuse with "get_xy_under_polyline" which "rasterize" the polyline to find useful coordinates. :param myvect: target vector :param usemask: limit potential nodes to unmaksed nodes :param method: method to use ('shapely' for now) """ # As we will use Shapely(contains) to test if points are on the border, # we need to use a Linestring or a Polygon.boundary. if isinstance(myvect, vector): myvect.find_minmax() boundary = myvect.linestring elif isinstance(myvect, Polygon): boundary = myvect.boundary elif isinstance(myvect, LineString): boundary = myvect if not is_prepared(boundary): prepare(boundary) # Prepare the polygon for **faster** contains check -- VERY IMPORTANT to_destroy = True else: to_destroy = False mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect) points= np.array([Point(x,y) for x,y in mypointsxy]) on_border = list(map(lambda point: boundary.contains(point), points)) if to_destroy: destroy_prepared(boundary) # Destroy the prepared polygon to_keep = np.where(on_border) mypointsxy = mypointsxy[to_keep] if mypointsxy.shape[0] == 0: return mypointsxy if usemask: mypointsij = mypointsij[to_keep] mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]]) mypointsxy = mypointsxy[np.where(mymask)] return mypointsxy
[docs] def get_xy_strictly_on_vertices(self, myvect: vector | Polygon | LineString, usemask:bool=True, method:Literal['shapely']='shapely', eps:float = 1.e-4): """ Return the coordinates strictly on the vertices of a polygon, Linestring or vector. ATTENTION : Tests are performed numerically, so the results may not be exact in the sense of pure geometry. :param myvect: target vector :param usemask: limit potential nodes to unmaksed nodes :param method: method to use ('shapely' for now) :param eps: distance tolerance for matching a vertex to a cell center """ # As we will use Shapely(contains) to test if points are on the border, # we need to use a Linestring or a Polygon.boundary. if isinstance(myvect, vector): myvect.find_minmax() boundary = myvect.linestring elif isinstance(myvect, Polygon): boundary = myvect.boundary elif isinstance(myvect, LineString): boundary = myvect if not is_prepared(boundary): prepare(boundary) # Prepare the polygon for **faster** contains check -- VERY IMPORTANT to_destroy = True else: to_destroy = False mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect) # Calculate distances to the vertices # We use the vertices of the polygon or linestring to check if points are on the # vertices. points = np.array([Point(x,y) for x,y in mypointsxy]) vertices = [Point(pt[0], pt[1], pt[2]) for pt in boundary.coords] on_vertices = list(map(lambda point: any(vert.distance(point) < eps for vert in vertices), points)) if to_destroy: destroy_prepared(boundary) # Destroy the prepared polygon to_keep = np.where(on_vertices) mypointsxy = mypointsxy[to_keep] if mypointsxy.shape[0] == 0: return mypointsxy if usemask: mypointsij = mypointsij[to_keep] mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]]) mypointsxy = mypointsxy[np.where(mymask)] return mypointsxy
[docs] def get_xy_inside_polygon_mpl(self, myvect: vector | Polygon, usemask:bool=True, epsilon:float = 0.): """ Return the coordinates inside a polygon :param myvect: target vector :param usemask: limit potential nodes to unmaksed nodes :param epsilon: tolerance for path containment (matplotlib) """ if isinstance(myvect, vector): # force la mise à jour des min/max myvect.find_minmax() # Conversion des coordonnées en numpy pour plus d'efficacité (du moins on espère) myvert = myvect.asnparray() elif isinstance(myvect, Polygon): myvert = myvect.exterior.coords[:-1] mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect, eps = epsilon) path = mpltPath.Path(myvert) inside = path.contains_points(mypointsxy) mypointsxy = mypointsxy[np.where(inside)] if usemask: mypointsij = mypointsij[np.where(inside)] mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]]) mypointsxy = mypointsxy[np.where(mymask)] return mypointsxy
[docs] def get_xy_inside_polygon_shapely(self, myvect: vector | Polygon, usemask:bool=True, strictly:bool=True, epsilon:float=0.): """ Return the coordinates inside a polygon :param myvect: target vector :param usemask: limit potential nodes to unmaksed nodes :param strictly: if ``True``, use strict containment; otherwise include boundary :param epsilon: buffer tolerance for boundary inclusion """ if isinstance(myvect, vector): myvect.find_minmax() polygon = myvect.polygon elif isinstance(myvect, Polygon): polygon = myvect if not is_prepared(polygon): prepare(polygon) # Prepare the polygon for **faster** contains check -- VERY IMPORTANT to_destroy = True else: to_destroy = False mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect, eps = epsilon) if strictly: inside = contains_xy(polygon, mypointsxy[:,0], mypointsxy[:,1]) else: points= np.array([Point(x,y) for x,y in mypointsxy]) boundary = polygon.boundary inside = contains(polygon, points) on_border = list(map(lambda point: boundary.contains(point), points)) inside = np.logical_or(inside, on_border) if to_destroy: destroy_prepared(polygon) # Destroy the prepared polygon to_keep = np.where(inside) mypointsxy = mypointsxy[to_keep] if mypointsxy.shape[0] == 0: return mypointsxy if usemask: mypointsij = mypointsij[to_keep] mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]]) mypointsxy = mypointsxy[np.where(mymask)] return mypointsxy
[docs] def get_xy_inside_polygon_rasterio(self, myvect: vector | Polygon, usemask:bool=True, epsilon:float = 0.): """ Return the coordinates inside a polygon :param myvect: polygon vector in global coordinates :param usemask: if ``True``, consider only unmasked cells :param epsilon: tolerance (not used in rasterio method) """ # force la mise à jour des min/max myvect.find_minmax() mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect, eps = epsilon) from rasterio.features import geometry_mask poly = myvect.polygon mask = geometry_mask([poly], out_shape=(self.nbx, self.nby), transform=self._transform_gmrio(), invert=True, all_touched=False) # filter mypointsxy where mask is True # mypointsij is a vector of (i,j) indices mypointsxy = mypointsxy[mask[mypointsij[:, 0], mypointsij[:, 1]]] if usemask: mypointsij = mypointsij[mask[mypointsij[:, 0], mypointsij[:, 1]]] mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]]) mypointsxy = mypointsxy[np.where(mymask)] return mypointsxy
[docs] def get_xy_under_polyline(self, myvect: vector, usemask:bool=True): """ Return the coordinates along a polyline :param myvect: target vector :param usemask: limit potential nodes to unmaksed nodes """ allij = self.get_ij_under_polyline(myvect, usemask) mypoints = [self.get_xy_from_ij(cur[0], cur[1]) for cur in allij] return mypoints
[docs] def get_ij_inside_polygon(self, myvect: vector | Polygon, usemask:bool=True, eps:float = 0., method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict'): """ Return the indices inside a polygon. Main principle: - First, get all indices in the footprint of the polygon (bounding box + epsilon) - Then, test each point if it is inside the polygon with the selected method - Filter with the mask if needed Returned indices are stored in an array of shape (N,2) with N the number of points found inside the polygon. :param myvect: target vector :param usemask: limit potential nodes to unmaksed nodes :param eps: epsilon for the intersection :param method: method to use ('mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio') - default is 'shapely_strict' """ if method == 'mpl': return self.get_ij_inside_polygon_mpl(myvect, usemask, eps) elif method == 'shapely_strict': return self.get_ij_inside_polygon_shapely(myvect, usemask, eps, strictly=True) elif method == 'shapely_wboundary': return self.get_ij_inside_polygon_shapely(myvect, usemask, eps, strictly=False) elif method == 'rasterio': return self._get_ij_inside_polygon_rasterio(myvect, usemask, eps)
[docs] def get_ij_inside_listofpolygons(self, myvects: list[vector | Polygon], usemask:bool=True, eps:float = 0., method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict'): """ Return the indices inside a list of polygons :param myvects: target vector :param usemask: limit potential nodes to unmaksed nodes :param eps: epsilon for the intersection :param method: method to use ('mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio') - default is 'shapely_strict' """ if isinstance(myvects, zone): myvects = myvects.myvectors alls = list(map(lambda vec: self.get_ij_inside_polygon(vec, usemask, method=method), myvects)) #remove empty arrays alls = [cur for cur in alls if cur.shape[0] > 0] if len(alls) == 0: return np.empty((0, 2), dtype=int) # Concatenate all arrays mypointsij = np.concatenate(alls, axis=0) # Remove duplicates mypointsij = np.unique(mypointsij, axis=0) if mypointsij.shape[0] == 0: return np.empty((0, 2), dtype=int) return mypointsij
[docs] def get_ij_inside_polygon_mpl(self, myvect: vector | Polygon, usemask:bool=True, eps:float = 0.): """ Return the indices inside a polygon :param myvect: target vector :param usemask: limit potential nodes to unmaksed nodes :param eps: epsilon for the intersection """ # force la mise à jour des min/max if isinstance(myvect, vector): myvect.find_minmax() mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect, eps=eps) # Conversion des coordonnées en numpy pour plus d'efficacité (du moins on espère) myvert = myvect.asnparray() path = mpltPath.Path(myvert) inside = path.contains_points(mypointsxy) mypointsij = mypointsij[np.where(inside)] if usemask: mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]]) mypointsij = mypointsij[np.where(mymask)] return mypointsij
[docs] def get_ij_inside_polygon_shapely(self, myvect: vector | Polygon, usemask:bool=True, eps:float = 0., strictly:bool=True): """ Return the indices inside a polygon with the contains method of shapely :param myvect: target vector :param usemask: limit potential nodes to unmaksed nodes :param eps: epsilon for the intersection :param strictly: if True, the points on the border are not considered inside """ # force la mise à jour des min/max if isinstance(myvect, vector): myvect.find_minmax() poly = myvect.polygon elif isinstance(myvect, Polygon): poly = myvect mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect, eps=eps) if not is_prepared(poly): prepare(poly) # Prepare the polygon for **faster** contains check -- VERY IMPORTANT to_destroy = True else: to_destroy = False if strictly: inside = contains_xy(poly, mypointsxy[:,0], mypointsxy[:,1]) else: points= np.array([Point(x,y) for x,y in mypointsxy]) boundary = poly.boundary inside = contains(poly, points) on_border = list(map(lambda point: boundary.contains(point), points)) inside = np.logical_or(inside, on_border) if to_destroy: destroy_prepared(poly) # Destroy the prepared polygon mypointsij = mypointsij[np.where(inside)] if usemask: mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]]) mypointsij = mypointsij[np.where(mymask)] return mypointsij
[docs] def _get_ij_inside_polygon_rasterio(self, myvect: vector | Polygon, usemask:bool=True, eps:float = 0.): """ Return the indices inside a polygon with the geometry_mask method of rasterio. :remark: get_ij_inside_polygon_shapely is more efficient FIXME : geometry_mask seems strange -- it does not work as documented -- RatsrIO 1.3.x :param myvect: target vector :param usemask: limit potential nodes to unmaksed nodes :param eps: epsilon for the intersection """ # force la mise à jour des min/max if isinstance(myvect, vector): myvect.find_minmax() poly = myvect.polygon elif isinstance(myvect, Polygon): poly = myvect mypointsxy, mypointsij = self.get_xy_infootprint_vect(myvect, eps=eps) from rasterio.features import geometry_mask mask = geometry_mask([poly], out_shape=(self.nbx, self.nby), transform=self._transform_gmrio(), invert=True, all_touched=False) # filter mypointsij where mask is True # mypointsij is a vector of (i,j) indices mypointsij = mypointsij[mask[mypointsij[:, 0], mypointsij[:, 1]]] if usemask: mymask = np.logical_not(self.array.mask[mypointsij[:, 0], mypointsij[:, 1]]) mypointsij = mypointsij[np.where(mymask)] return mypointsij
[docs] def intersects_polygon(self, myvect: vector | Polygon, usemask:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict', buffer:float= 0.) -> bool: """ Return True if the array intersects the polygon :param myvect: target vector :param usemask: limit potential nodes to unmaksed nodes :param method: spatial method: ``'mpl'``, ``'shapely_strict'``, ``'shapely_wboundary'``, or ``'rasterio'`` :param buffer: buffer distance around the polygon for intersection test """ if buffer > 0.: bufvect = myvect.buffer(buffer, inplace= False) return self.get_xy_inside_polygon(bufvect, usemask, method).shape[0] > 0 else: return self.get_xy_inside_polygon(myvect, usemask, method).shape[0] > 0
[docs] def intersects_polygon_shapely(self, myvect: vector | Polygon, eps:float = 0., usemask:bool=True, buffer:float= 0.) -> bool: """ Return True if the array intersects the polygon :param myvect: target vector :param usemask: limit potential nodes to unmaksed nodes :param eps: epsilon for the intersection :param buffer: buffer for the intersection - using buffer from Shapely [m] """ warnings.warn("intersects_polygon_shapely is deprecated, use intersects_polygon with method='shapely_strict' instead", DeprecationWarning) if buffer > 0.: poly = myvect.polygon.buffer(buffer) return self.get_xy_inside_polygon_shapely(poly, usemask, epsilon=eps).shape[0] > 0 else: return self.get_xy_inside_polygon_shapely(myvect, usemask, epsilon=eps).shape[0] > 0
[docs] def interects_listofpolygons(self, myvects:zone | list[vector], usemask:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict', buffer:float= 0.) -> list[bool]: """ Element-wise intersection with a list of polygons :param myvects: list of polygon vectors to test :param usemask: if ``True``, consider only unmasked cells :param method: spatial query method for intersection test :param buffer: buffer distance around each polygon """ if isinstance(myvects, zone): myvects = myvects.myvectors elif isinstance(myvects, list): pass else: logging.error("Bad type") return {} return list(map(lambda x: self.intersects_polygon(x, usemask, method, buffer), myvects))
[docs] def intersects_zones(self, zones:Zones | list[zone], usemask:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict', buffer:float = 0.) -> list[list[bool]]: """ Element-wise intersection with a list of zones :param zones: :class:`Zones` instance containing polygons to test :param usemask: if ``True``, consider only unmasked cells :param method: spatial query method for intersection test :param buffer: buffer distance around each polygon """ if isinstance(zones, Zones): zones = zones.myzones elif isinstance(zones, list): pass else: logging.error("Bad type") return {} return list(map(lambda x: self.interects_listofpolygons(x, usemask, method, buffer), zones))
[docs] def get_total_area_if_intersects(self, myvects:Zones | list[vector], usemask:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict', buffer:float= 0., coefficient:float = 1.) -> float | list[float]: """ Return the area of the intersection with a list of polygons :param myvects: list of polygon vectors :param usemask: if ``True``, consider only unmasked cells :param method: spatial query method for intersection test :param buffer: buffer distance around each polygon :param coefficient: area multiplier (e.g. for unit conversion) """ if isinstance(myvects, Zones): ret = [] for curzone in myvects.myzones: myvects = curzone.myvectors intersects = self.interects_listofpolygons(myvects, usemask, method, buffer) ret.append(sum([x.area for x in myvects if intersects[myvects.index(x)]]) * coefficient) return ret elif isinstance(myvects, list): intersects = self.interects_listofpolygons(myvects, usemask, method, buffer) return sum([x.area for x in myvects if intersects[myvects.index(x)]]) * coefficient
[docs] def get_ij_under_polyline(self, myvect: vector, usemask:bool=True, step_factor:float=1.): """ Return the indices along a polyline :param myvect: target vector :param usedmask: limit potential nodes to unmaksed nodes :param step_factor: step factor for the discretization of myvect (<1 for thinner, >1 for coarser) :param usemask: if ``True``, exclude masked cells from result :return: array of shape (N,2) with N the number of points found under the polyline """ if step_factor>1.0: logging.warning("Step_factor greater than 1.0 may lead to lots of missing (i,j) even if faster") else: def mod_fix(x, y, tol=1e-10): result = np.mod(x, y) return 0 if np.isclose(result, 0, atol=tol) else (y if np.isclose(result, y, atol=tol) else result) assert mod_fix(1.0,step_factor)==0, "1.0 should be a multiple of step_factor" # assert step_factor<=1., "Step factor must be <= 1" ds = step_factor*min(self.dx, self.dy) pts = myvect._refine2D(ds) allij = np.asarray([self.get_ij_from_xy(curpt.x, curpt.y) for curpt in pts]) allij = np.unique(allij, axis=0) # filter negative indexes allij = allij[np.where((allij[:, 0] >= 0) & (allij[:, 1] >= 0) & (allij[:, 0] < self.nbx) & (allij[:, 1] < self.nby))] if usemask: mymask = np.logical_not(self.array.mask[allij[:, 0], allij[:, 1]]) allij = allij[np.where(mymask)] return allij
[docs] def get_values_insidepoly(self, myvect: vector, usemask:bool=True, getxy:bool=False, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict') -> tuple[np.ndarray, np.ndarray | None]: """ Return the cell values located inside a polygon. :param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice :param getxy: (optional) retourne en plus les coordonnées des points :param myvect: polygon vector in global coordinates :param method: spatial method: ``'mpl'``, ``'shapely_strict'``, etc. """ mypoints = self.get_xy_inside_polygon(myvect, usemask, method= method) myvalues = np.asarray([self.get_value(cur[0], cur[1]) for cur in mypoints]) if getxy: return myvalues, mypoints else: return myvalues, None
[docs] def get_values_inside_listofpolygons(self, myvects:zone | list[vector], usemask:bool=True, getxy:bool=False) -> dict: """ Return the cell values located inside a list of polygons. :param myvects: list of polygon vectors :param usemask: if ``True``, consider only unmasked cells :param getxy: if ``True``, also return xy coordinates """ ret = {} if isinstance(myvects, zone): out = ret[myvects.myname] = {} myvects = myvects.myvectors elif isinstance(myvects, list): out = ret else: logging.error("Bad type") return {} for vec in myvects: out[vec.myname] = self.get_values_insidepoly(vec, usemask, getxy)
[docs] def get_values_inside_zones(self, zones:Zones | list[zone], usemask:bool=True, getxy:bool=False) -> dict: """ Return the cell values located inside all polygons of a :class:`Zones` object. :param zones: :class:`Zones` instance :param usemask: if ``True``, consider only unmasked cells :param getxy: if ``True``, also return xy coordinates """ ret = {} if isinstance(zones, Zones): out = ret[zones.idx] = {} myzones = zones.myzones elif isinstance(zones, list): myzones = zones out = ret else: logging.error("Bad type") return {} for zone in myzones: out[zone.myname] = self.get_values_inside_listofpolygons(zone, usemask, getxy)
[docs] def get_values_underpoly(self, myvect: vector, usemask:bool=True, getxy:bool=False): """ Return the cell values located under a polyline. :param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice :param getxy: (optional) retourne en plus les coordonnées des points :param myvect: polyline vector in global coordinates """ mypoints = self.get_xy_under_polyline(myvect, usemask) myvalues = np.asarray([self.get_value(cur[0], cur[1]) for cur in mypoints]) if getxy: return myvalues, mypoints else: return myvalues, None
[docs] def get_all_values_insidepoly(self, myvect: vector, usemask:bool=True, getxy:bool=False): """ Return all cell values (including masked) inside a polygon. :param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice :param getxy: (optional) retourne en plus les coordonnées des points :param myvect: polygon vector in global coordinates ICI on retourne le résultat de get_values_insidepoly, car une seule matrice, mais une autre classe pourrait vouloir faure autre chose C'est le cas notamment de Wolfresults_2D """ return self.get_values_insidepoly(myvect, usemask, getxy)
[docs] def get_all_values_underpoly(self, myvect: vector, usemask:bool=True, getxy:bool=False): """ Return all cell values (including masked) under a polyline. :param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice :param getxy: (optional) retourne en plus les coordonnées des points :param myvect: polyline vector in global coordinates ICI on retourne le résultat de get_values_underpoly, car une seule matrice, mais une autre classe pourrait vouloir faure autre chose C'est le cas notamment de Wolfresults_2D """ return self.get_values_underpoly(myvect, usemask, getxy)
[docs] def count_unique_pixels(self) -> dict[int, int]: """ Count the number of pixels for each unique value in the array :return: dictionary with the code as key and the number of pixels as value """ if self.wolftype not in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER16, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_UINTEGER8]: logging.error("Bad wolftype") return {} counts = np.bincount(self.array[~self.array.mask].ravel()) ret = {i: int(val) for i, val in enumerate(counts) if val > 0} return ret
[docs] def get_unique_areas(self, format:Literal['m2', 'ha', 'km2'] = 'm2') -> dict[int, float]: """ Get the areas of each code in the array :param array: numpy array or WolfArrayModel :param format: output format: ``'dict'`` returns a dict, ``'list'`` returns a list :return: dictionary with the code as key and the area in m² as value """ ret = self.count_unique_pixels() fact = 1.0 if format in ['ha', 'Ha']: fact = 0.0001 elif format in ['km2', 'km²']: fact = 1e-6 elif format not in ['m2', 'm²']: logging.error("Bad format") return {} # Convert counts to areas in m² return {code: float(count) * self.dx * self.dy * fact for code, count in ret.items()}
[docs] def count_insidepoly(self, myvect: vector, usemask:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict', coefficient:float = 1.): """ Compte le nombre de valeurs contenues dans un polygone :param myvect: target vector :param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice :param method: (optional) method to use :param coefficient: (optional) coefficient to apply to the count (default 1.) """ ij = self.get_ij_inside_polygon(myvect, usemask, method=method) return ij.shape[0] * coefficient
[docs] def count_inside_listofpolygons(self, myvects:zone | list[vector], usemask:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict', coefficient:float = 1.): """ Compte le nombre de valeurs contenues dans une instance Zones ou une liste de vecteurs :param myvects: target vectors or zone :param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice :param method: (optional) method to use :param coefficient: (optional) coefficient to apply to the count (default 1.) """ if isinstance(myvects, zone): myvects = myvects.myvectors else: logging.error("Bad type") return {} return list(map(lambda vec: self.count_insidepoly(vec, usemask, method=method, coefficient=coefficient), myvects))
[docs] def count_inside_zones(self, zones:Zones | list[zone], usemask:bool=True, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']='shapely_strict', coefficient:float = 1.): """ Compte le nombre de valeurs contenues dans une instance Zones ou une liste de "zone" :param zones: target zones or list of zones :param usemask: (optional) restreint les éléments aux éléments non masqués de la matrice :param method: (optional) method to use :param coefficient: (optional) coefficient to apply to the count (default 1.) """ if isinstance(zones, Zones): zones = zones.myzones else: logging.error("Bad type") return list(map(lambda loczone: self.count_inside_listofpolygons(loczone, usemask, method=method, coefficient=coefficient), zones))
# ======================================================================== # Interpolation & Rasterization # ========================================================================
[docs] def interpolate_on_polygon(self, working_vector: vector, method:Literal["nearest", "linear", "cubic"]="linear", keep:Literal['all', 'below', 'above'] = 'all', rescale:bool = False, method_selection:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict", usemask:bool = True, epsilon:float = 0.0): """ Interpolation sous un polygone. L'interpolation a lieu : - uniquement dans les mailles sélectionnées si elles existent - dans les mailles contenues dans le polygone sinon On utilise ensuite "griddata" de Scipy pour interpoler les altitudes des mailles depuis les vertices 3D du polygone. :ATTENTION: l'interpolation est réalisée en réels flottants 64 bits. Le résultat sera ensuite transformé dans le type de la matrice. :param working_vector: vector - polygon with z values :param method: interpolation method - 'nearest', 'linear' or 'cubic' :param keep: 'all' to keep all interpolated values 'below' to keep only values below the current value in the array 'above' to keep only values above the current value in the array :param rescale: rescale the input data to [0, 1] for better numerical stability (only for 'linear' and 'cubic' methods) :param method_selection: cell selection method: ``'mpl'``, ``'shapely_strict'``, ``'shapely_wboundary'``, or ``'rasterio'`` :param usemask: if ``True``, interpolate only within unmasked cells :param epsilon: tolerance for boundary detection :return: None """ if self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_INTEGER16]: logging.info(_('Your array contains integer data. The interpolation will be done in float and then converted to integer using "np.ceil" operator.')) if vector.area == 0.: logging.error(_('The polygon has no area')) return if keep not in ['all', 'below', 'above']: logging.error(_('keep must be "all", "below" or "above"')) raise ValueError if method not in ['nearest', 'linear', 'cubic']: logging.error(_('method must be "nearest", "linear" or "cubic"')) raise ValueError if self.mngselection is None: destxy = self.get_xy_inside_polygon(working_vector, method = method_selection, usemask = usemask, epsilon = epsilon) if len(destxy)==0: logging.debug(_('No points to interpolate')) return destij = self.xy2ij_np(destxy) elif self.mngselection.myselection == 'all': destij = np.where(self.array.mask == False) destij = np.array(destij).transpose() if len(destij)==0: logging.debug(_('No points to interpolate')) return destxy = self.ij2xy_np(destij) else: if self.SelectionData.nb == 0: destxy = self.get_xy_inside_polygon(working_vector, method= method_selection, usemask= usemask, epsilon= epsilon) if destxy.shape[0] == 0: logging.debug(_('No points to interpolate')) return else: destxy = self.SelectionData.myselection if len(destxy)==0: logging.debug(_('No points to interpolate')) return destij = self.xy2ij_np(destxy) xyz = working_vector.asnparray3d() newvalues = griddata(xyz[:, :2], xyz[:, 2], destxy, method=method, fill_value=-99999., rescale = rescale) if keep == 'all': locmask = np.where(newvalues != -99999.) elif keep == 'below': locmask = np.where((newvalues != -99999.) & (newvalues < self.array.data[destij[:, 0], destij[:, 1]])) elif keep == 'above': locmask = np.where((newvalues != -99999.) & (newvalues > self.array.data[destij[:, 0], destij[:, 1]])) if self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_INTEGER16]: logging.debug(_('Converting interpolated values to integer using "np.ceil" operator')) newvalues = np.ceil(newvalues) newvalues = newvalues.astype(self.array.dtype) self.array.data[destij[locmask][:, 0], destij[locmask][:, 1]] = newvalues[locmask]
[docs] def interpolate_on_polygons(self, working_zone:zone, method:Literal["nearest", "linear", "cubic"]="linear", keep:Literal['all', 'below', 'above'] = 'all'): """ Interpolate values inside each polygon of a zone using vertex elevations. :param working_zone: zone containing multiple polygons :param method: interpolation method: ``'nearest'``, ``'linear'``, or ``'cubic'`` :param keep: value-keeping strategy: ``'all'``, ``'below'``, or ``'above'`` """ for curvec in working_zone.myvectors: self.interpolate_on_polygon(curvec, method, keep)
[docs] def interpolate_strictly_on_polyline(self, working_vector:vector, usemask=True, keep:Literal['all', 'below', 'above'] = 'all'): """ Set cell values along a polyline using the z-coordinates of the polyline vertices. On utilise ensuite "interpolate" de shapely pour interpoler les altitudes des mailles depuis les vertices 3D de la polyligne :param working_vector: polyline vector with 3D vertices :param usemask: if ``True``, only update unmasked cells :param keep: value-keeping strategy: ``'all'``, ``'below'``, or ``'above'`` """ vecls = working_vector.linestring xy = self.get_xy_strictly_on_borders(working_vector, usemask=usemask) if xy.shape[0] == 0: logging.debug(_('No points to interpolate')) return ij = np.asarray([self.get_ij_from_xy(x, y) for x, y in xy]) newz = np.asarray([vecls.interpolate(vecls.project(Point(x, y))).z for x, y in xy]) if keep == 'all': self.array.data[ij[:, 0], ij[:, 1]] = newz elif keep == 'below': locmask = np.where((newz != -99999.) & (newz < self.array.data[ij[:, 0], ij[:, 1]])) self.array.data[ij[locmask][:, 0], ij[locmask][:, 1]] = newz[locmask] elif keep == 'above': locmask = np.where((newz != -99999.) & (newz > self.array.data[ij[:, 0], ij[:, 1]])) self.array.data[ij[locmask][:, 0], ij[locmask][:, 1]] = newz[locmask]
[docs] def interpolate_strictly_on_vertices(self, working_vector:vector, usemask=True, keep:Literal['all', 'below', 'above'] = 'all'): """ Set cell values at the exact positions of the vector vertices. On utilise ensuite "interpolate" de shapely pour interpoler les altitudes des mailles depuis les vertices 3D de la polyligne :param working_vector: vector with 3D vertices :param usemask: if ``True``, only update unmasked cells :param keep: value-keeping strategy: ``'all'``, ``'below'``, or ``'above'`` """ vecls = working_vector.linestring xy = self.get_xy_strictly_on_vertices(working_vector, usemask=usemask) if xy.shape[0] == 0: logging.debug(_('No points to interpolate')) return ij = np.asarray([self.get_ij_from_xy(x, y) for x, y in xy]) newz = np.asarray([vecls.interpolate(vecls.project(Point(x, y))).z for x, y in xy]) if keep == 'all': self.array.data[ij[:, 0], ij[:, 1]] = newz elif keep == 'below': locmask = np.where((newz != -99999.) & (newz < self.array.data[ij[:, 0], ij[:, 1]])) self.array.data[ij[locmask][:, 0], ij[locmask][:, 1]] = newz[locmask] elif keep == 'above': locmask = np.where((newz != -99999.) & (newz > self.array.data[ij[:, 0], ij[:, 1]])) self.array.data[ij[locmask][:, 0], ij[locmask][:, 1]] = newz[locmask]
[docs] def interpolate_on_polyline(self, working_vector:vector, usemask=True): """ Interpolate cell values along a polyline based on its 3D vertices. L'interpolation a lieu : - uniquement dans les mailles sélectionnées si elles existent - dans les mailles sous la polyligne sinon On utilise ensuite "interpolate" de shapely pour interpoler les altitudes des mailles depuis les vertices 3D de la polyligne :param working_vector: polyline vector with 3D vertices :param usemask: if ``True``, only update unmasked cells """ vecls = working_vector.asshapely_ls() if self.SelectionData is None: allij = self.get_ij_under_polyline(working_vector, usemask) allxy = [self.get_xy_from_ij(cur[0], cur[1]) for cur in allij] else: if self.SelectionData.nb == 0: allij = self.get_ij_under_polyline(working_vector, usemask) allxy = [self.get_xy_from_ij(cur[0], cur[1]) for cur in allij] else: allxy = self.SelectionData.myselection allij = np.asarray([self.get_ij_from_xy(x,y) for x,y in allxy]) newz = np.asarray([vecls.interpolate(vecls.project(Point(x, y))).z for x, y in allxy]) self.array.data[allij[:, 0], allij[:, 1]] = newz
[docs] def interpolate_on_polylines(self, working_zone:zone, usemask=True): """Interpolate cell values along all polylines in a zone. :param working_zone: zone containing multiple polylines :param usemask: if ``True``, only update unmasked cells """ for curvec in working_zone.myvectors: self.interpolate_on_polyline(curvec, usemask)
[docs] def interpolate_on_cloud(self, xy:np.ndarray, z:np.ndarray, method:Literal['linear', 'nearest', 'cubic']= 'linear'): """ Interpolation sur un nuage de points. L'interpolation a lieu : - uniquement dans les mailles sélectionnées si elles existent - dans les mailles contenues dans le polygone convexe contenant les points sinon Using griddata from Scipy. :param xy: numpy.array of vertices - shape (n,2) :param z: numpy.array of values - shape (n,) :param method: method for the interpolation -- 'nearest', 'linear' or 'cubic' See : https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.griddata.html """ if self.mngselection.myselection == [] or self.mngselection.myselection == ALL_SELECTED: decalx = self.origx + self.translx decaly = self.origy + self.transly x = np.arange(self.dx / 2. + decalx, float(self.nbx) * self.dx + self.dx / 2 + decalx, self.dx) y = np.arange(self.dy / 2. + decaly, float(self.nby) * self.dy + self.dy / 2 + decaly, self.dy) grid_x, grid_y = np.meshgrid(x, y, sparse=True, indexing='xy') newvalues = griddata(xy, z, (grid_x, grid_y), method=method, fill_value=-99999.).transpose() self.array.data[np.where(newvalues != -99999.)] = newvalues[np.where(newvalues != -99999.)] else: ij = np.asarray([self.get_ij_from_xy(x, y) for x, y in self.mngselection.myselection]) newvalues = griddata(xy, z, self.mngselection.myselection, method=method, fill_value=-99999.) ij = ij[np.where(newvalues != -99999.)] newvalues = newvalues[np.where(newvalues != -99999.)] self.array.data[ij[:, 0], ij[:, 1]] = newvalues self.reset_plot()
[docs] def interpolate_on_triangulation(self, coords, triangles, grid_x=None, grid_y = None, mask_tri=None, interp_method:Literal['matplotlib','scipy'] = 'scipy', keep:Literal['all', 'below', 'above'] = 'all', points_along_edges:bool = True): """ Interpolation sur une triangulation. L'interpolation a lieu : - uniquement dans les mailles sélectionnées si elles existent - dans les mailles contenues dans la triangulation sinon Scipy(griddata) is used by default (more robust), but Matplotlib can be used as well (more strict). If Matplotlib crashes, try with Scipy. **Matplotlib is more strict on the quality of the triangulation.** :param coords: numpy.array of vertices - shape (n,3) :param triangles: numpy.array of triangles - shape (m,3) :param grid_x: numpy.array of x values where the interpolation will be done -- if None, the grid is created from the array :param grid_y: numpy.array of y values where the interpolation will be done -- if None, the grid is created from the array :param mask_tri: numpy.array of mask for the triangles :param interp_method: method for the interpolation -- 'matplotlib' or 'scipy' :param keep: 'all' to keep all values, 'below' to keep only values below the current array, 'above' to keep only values above the current array :param points_along_edges: number of extra points to add along triangle edges for refinement For matplotlib algo, see : https://matplotlib.org/stable/gallery/images_contours_and_fields/triinterp_demo.html For scipy algo, see : https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.griddata.html """ if interp_method =='matplotlib': # FIXME : This part could be optimized, I think import matplotlib.tri as mtri use_scipy=False try: if self.mngselection is not None: if self.mngselection.myselection != [] and self.mngselection.myselection != ALL_SELECTED: # interpolation only in the selected cells # Convert coordinates to indices ij = np.asarray([self.get_ij_from_xy(x, y) for x, y in self.mngselection.myselection]) try: # Opérateur d'interpolation linéaire triang = mtri.Triangulation(coords[:,0],coords[:,1],triangles) if mask_tri is not None: triang.set_mask(mask_tri) interplin = mtri.LinearTriInterpolator(triang, coords[:,2]) # Interpolation et récupération dans le numpy.array de l'objet Wolf except: raise Warning(_('Bad triangulation - try with another method like Scipy')) newvalues = np.ma.masked_array([interplin(x, y) for x, y in self.mngselection.myselection]) if newvalues.mask.shape!=(): # We have a mask nonmasked_values = np.where(~newvalues.mask) ij = ij[nonmasked_values] newvalues = newvalues[nonmasked_values] if keep == 'below': newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[ij[:, 0], ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = ij[np.where(~newvalues.mask)] newvalues = newvalues[nonmasked_values] elif keep == 'above': newvalues = np.ma.masked_array(newvalues[nonmasked_values], mask=(newvalues[nonmasked_values] < self.array.data[ij[:, 0], ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = ij[np.where(~newvalues.mask)] newvalues = newvalues[nonmasked_values] self.array.data[ij[:, 0], ij[:, 1]] = newvalues else: # No mask --> boolean if keep == 'below': newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[ij[:, 0], ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = ij[np.where(~newvalues.mask)] newvalues = newvalues[nonmasked_values] elif keep == 'above': newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[ij[:, 0], ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = ij[np.where(~newvalues.mask)] newvalues = newvalues[nonmasked_values] self.array.data[ij[:, 0], ij[:, 1]] = newvalues elif self.mngselection.myselection == ALL_SELECTED and (grid_x is None and grid_y is None): # interpolation in all the cells # Creating a grid for all cells decalx = self.origx + self.translx decaly = self.origy + self.transly x = np.arange(self.dx / 2. + decalx, float(self.nbx) * self.dx + self.dx / 2 + decalx, self.dx) y = np.arange(self.dy / 2. + decaly, float(self.nby) * self.dy + self.dy / 2 + decaly, self.dy) grid_x, grid_y = np.meshgrid(x, y, indexing='ij') grid_ij = self.xy2ij_np(np.concatenate([grid_x.ravel()[:, np.newaxis], grid_y.ravel()[:, np.newaxis]], axis=1)) try: # Opérateur d'interpolation linéaire triang = mtri.Triangulation(coords[:,0],coords[:,1],triangles) if mask_tri is not None: triang.set_mask(mask_tri) interplin = mtri.LinearTriInterpolator(triang, coords[:,2]) except: raise Warning(_('Bad triangulation - try with another method like Scipy')) # Interpolation et récupération dans le numpy.array de l'objet Wolf newvalues = interplin(grid_x,grid_y) nonmasked_values = np.where(~newvalues.mask) ij = grid_ij[nonmasked_values] newvalues = newvalues[nonmasked_values] if keep == 'below': newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[grid_ij[:, 0], grid_ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = grid_ij[nonmasked_values] newvalues = newvalues[nonmasked_values] elif keep == 'above': newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[grid_ij[:, 0], grid_ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = grid_ij[nonmasked_values] newvalues = newvalues[nonmasked_values] self.array.data[ij[:, 0], ij[:, 1]] = newvalues elif (grid_x is not None and grid_y is not None): # Working on an imposed grid # Opérateur d'interpolation linéaire try: triang = mtri.Triangulation(coords[:,0],coords[:,1],triangles) if mask_tri is not None: triang.set_mask(mask_tri) interplin = mtri.LinearTriInterpolator(triang, coords[:,2]) # Interpolation et récupération dans le numpy.array de l'objet Wolf newvalues = np.ma.masked_array([interplin(x, y) for x, y in zip(grid_x.ravel(),grid_y.ravel())]) except: raise Warning(_('Bad triangulation - try with another method like Scipy')) # newvalues is a vector of values # ij is a vector of indices ij = np.asarray([self.get_ij_from_xy(x, y) for x, y in zip(grid_x.ravel(),grid_y.ravel())]) if newvalues.mask.shape!=(): nonmasked_values = np.where(~newvalues.mask) ij = ij[nonmasked_values] newvalues = newvalues[nonmasked_values] if keep == 'below': newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[ij[:, 0], ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = ij[nonmasked_values] newvalues = newvalues[nonmasked_values] elif keep == 'above': newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[ij[:, 0], ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = ij[nonmasked_values] newvalues = newvalues[nonmasked_values] self.array.data[ij[:, 0], ij[:, 1]] = newvalues else: if keep == 'below': newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[ij[:, 0], ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = ij[nonmasked_values] newvalues = newvalues[nonmasked_values] elif keep == 'above': newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[ij[:, 0], ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = ij[nonmasked_values] newvalues = newvalues[nonmasked_values] self.array.data[ij[:, 0], ij[:, 1]] = newvalues else: # Working on the whole array # Creating a grid for all cells decalx = self.origx + self.translx decaly = self.origy + self.transly x = np.arange(self.dx / 2. + decalx, float(self.nbx) * self.dx + self.dx / 2 + decalx, self.dx) y = np.arange(self.dy / 2. + decaly, float(self.nby) * self.dy + self.dy / 2 + decaly, self.dy) grid_x, grid_y = np.meshgrid(x, y, indexing='ij') try: # Opérateur d'interpolation linéaire triang = mtri.Triangulation(coords[:,0],coords[:,1],triangles) if mask_tri is not None: triang.set_mask(mask_tri) interplin = mtri.LinearTriInterpolator(triang, coords[:,2]) # Interpolation et récupération dans le numpy.array de l'objet Wolf # newvalues is an array newvalues = interplin(grid_x,grid_y).astype(np.float32) assert newvalues.shape == (self.nbx, self.nby), _('Bad shape for newvalues') if keep == 'below': newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data)) elif keep == 'above': newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data)) nonmasked_values = np.where(~newvalues.mask) self.array.data[nonmasked_values] = newvalues[nonmasked_values] except: raise Warning(_('Bad triangulation - try with another method like Scipy')) else: if grid_x is None and grid_y is None: decalx = self.origx + self.translx decaly = self.origy + self.transly x = np.arange(self.dx / 2. + decalx, float(self.nbx) * self.dx + self.dx / 2 + decalx, self.dx) y = np.arange(self.dy / 2. + decaly, float(self.nby) * self.dy + self.dy / 2 + decaly, self.dy) grid_x, grid_y = np.meshgrid(x, y, indexing='ij') # Opérateur d'interpolation linéaire triang = mtri.Triangulation(coords[:,0],coords[:,1],triangles) interplin = mtri.LinearTriInterpolator(triang, coords[:,2]) # Interpolation et récupération dans le numpy.array de l'objet Wolf newvalues = np.ma.masked_array([interplin(x, y) for x, y in zip(grid_x.ravel(), grid_y.ravel())]) # newvalues = interplin(grid_x,grid_y).astype(np.float32) nonmasked_values = np.where(~newvalues.mask) grid_x = grid_x.ravel()[nonmasked_values] grid_y = grid_y.ravel()[nonmasked_values] newvalues = newvalues[nonmasked_values] # make a shape (n, 2) from grid_x and grid_y grid_x = grid_x[:, np.newaxis] grid_y = grid_y[:, np.newaxis] # concatenate grid_x and grid_y grid_ij = self.xy2ij_np(np.concatenate([grid_x, grid_y], axis=1)) if keep == 'below': newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[grid_ij[:,0], grid_ij[:,1]])) nonmasked_values = np.where(~newvalues.mask) grid_ij = grid_ij[nonmasked_values] newvalues = newvalues[nonmasked_values] elif keep == 'above': newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[grid_ij[:,0], grid_ij[:,1]])) nonmasked_values = np.where(~newvalues.mask) grid_ij = grid_ij[nonmasked_values] newvalues = newvalues[nonmasked_values] self.array.data[grid_ij[:,0], grid_ij[:,1]] = newvalues else: ij = np.asarray([self.get_ij_from_xy(x, y) for x, y in zip(grid_x.ravel(),grid_y.ravel())]) # Opérateur d'interpolation linéaire triang = mtri.Triangulation(coords[:,0],coords[:,1],triangles) interplin = mtri.LinearTriInterpolator(triang, coords[:,2]) # Interpolation et récupération dans le numpy.array de l'objet Wolf newvalues = np.ma.masked_array([interplin(x, y) for x, y in zip(grid_x.ravel(), grid_y.ravel())]) if newvalues.mask.shape!=(): nonmasked_values = np.where(~newvalues.mask) ij = ij[nonmasked_values] newvalues = newvalues[nonmasked_values] if keep == 'below': newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[ij[:, 0], ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = ij[nonmasked_values] newvalues = newvalues[nonmasked_values] elif keep == 'above': newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[ij[:, 0], ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = ij[nonmasked_values] newvalues = newvalues[nonmasked_values] self.array.data[ij[:, 0], ij[:, 1]] = newvalues else: if keep == 'below': newvalues = np.ma.masked_array(newvalues, mask=(newvalues > self.array.data[ij[:, 0], ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = ij[nonmasked_values] newvalues = newvalues[nonmasked_values] elif keep == 'above': newvalues = np.ma.masked_array(newvalues, mask=(newvalues < self.array.data[ij[:, 0], ij[:, 1]])) nonmasked_values = np.where(~newvalues.mask) ij = ij[nonmasked_values] newvalues = newvalues[nonmasked_values] self.array.data[ij[:, 0], ij[:, 1]] = newvalues #on force les valeurs masquées à nullvalue afin que l'interpolation n'applique pas ses effets dans cette zone self.array.data[self.array.mask]= self.nullvalue except: logging.error(_('Bad triangulation - We will try with Scipy')) use_scipy=True if interp_method != 'matplotlib' or use_scipy: # We need to convert the triangle to a vector to call the interpolation for curtri in triangles: curvec = vector(is2D=False) for curpt in curtri: curvec.add_vertex(wolfvertex(coords[curpt,0], coords[curpt,1], coords[curpt,2])) curvec.close_force() # if bool(np.isnan(curvec.x).any() or np.isnan(curvec.y).any()): # continue self.interpolate_on_polygon(curvec, "linear", keep) if points_along_edges: # Test points along edges self.interpolate_strictly_on_polyline(curvec, usemask=True, keep=keep) # self.interpolate_strictly_on_vertices(curvec, usemask=True, keep=keep) self.reset_plot() return
[docs] def interpolate_on_triangulations(self, multiple_triangulations:list[Triangulation], interp_method:Literal['matplotlib','scipy'] = 'scipy', keep:Literal['all', 'below', 'above'] | list[str] = 'all'): """ Interpolation sur plusieurs triangulations. :param multiple_triangulations: list of Triangulation objects :param interp_method: method for the interpolation -- 'matplotlib' or 'scipy' :param keep: 'all' to keep all values, 'below' to keep only values below the current array, 'above' to keep only values above the current array or a list of such values for each triangulation """ for idx, curtri in enumerate(multiple_triangulations): if isinstance(keep, list): curkeep = keep[idx] else: curkeep = keep self.interpolate_on_triangulation(curtri.pts, curtri.tri, interp_method=interp_method, keep=curkeep)
[docs] def interpolation2D(self, key:str='1'): """Perform a 2D interpolation over the selected cells using key reference points. :param key: interpolation key identifying the source data or method """ #FIXME : auhtorize interpolation on other keys key = str(key) if key in self.mngselection.selections.keys(): if len(self.mngselection.myselection)>0: curlist = self.mngselection.selections[key]['select'] cursel = self.mngselection.myselection if len(curlist) > 0: ij = [self.get_ij_from_xy(cur[0], cur[1]) for cur in curlist] z = [self.array.data[curij[0], curij[1]] for curij in ij] if cursel == ALL_SELECTED: xall = np.linspace(self.origx + self.dx / 2., self.origx + (float(self.nbx) - .5) * self.dx, self.nbx) yall = np.linspace(self.origy + self.dy / 2., self.origy + (float(self.nby) - .5) * self.dy, self.nby) cursel = [(x, y) for x in xall for y in yall] z = griddata(curlist, z, cursel, fill_value=np.nan) for cur, curz in zip(cursel, z): if not np.isnan(curz): i, j = self.get_ij_from_xy(cur[0], cur[1]) self.array.data[i, j] = curz self.reset_plot()
[docs] def rasterize_vector_valuebyid(self, working_vector: vector, id, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict", usemask:bool = True, epsilon:float = 0.0): """ Rasterize a vector using the value of the id :param working_vector: vector to rasterize :param id: attribute id whose value will fill the rasterized cells :param method: spatial method: ``'mpl'``, ``'shapely_strict'``, ``'shapely_wboundary'``, ``'rasterio'`` :param usemask: if ``True``, only rasterize unmasked cells :param epsilon: boundary tolerance """ if not isinstance(working_vector, vector): logging.error(_('working_vector must be a vector. You provided a {}').format(type(working_vector))) return if working_vector.get_value(id) is None: logging.error(_('No value for id {}').format(id)) return val = working_vector.get_value(id) if val is None: logging.error(_('No value for id {}').format(id)) return self.fillin_vector_with_value(working_vector, val, method=method, usemask=usemask, epsilon=epsilon)
[docs] def rasterize_zone_valuebyid(self, working_zone: zone, id, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict", usemask:bool = True, epsilon:float = 0.0): """ Rasterize each vector of a zone using the value of the given attribute id. :param working_zone: zone containing vectors to rasterize :param id: attribute id whose value will fill rasterized cells :param method: spatial method for cell selection :param usemask: if ``True``, only rasterize unmasked cells :param epsilon: boundary tolerance """ if not isinstance(working_zone, zone): logging.error(_('working_zone must be a zone. You provided a {}').format(type(working_zone))) return for curvec in working_zone.myvectors: self.rasterize_vector_valuebyid(curvec, id, method, usemask, epsilon)
[docs] def rasterize_zones_valuebyid(self, working_zones: Zones, id, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict", usemask:bool = True, epsilon:float = 0.0): """ Rasterize each vector of a :class:`Zones` collection using the value of the given attribute id. :param working_zones: :class:`Zones` containing vectors to rasterize :param id: attribute id whose value will fill rasterized cells :param method: spatial method for cell selection :param usemask: if ``True``, only rasterize unmasked cells :param epsilon: boundary tolerance """ if not isinstance(working_zones, Zones): logging.error(_('working_zone must be a zone. You provided a {}').format(type(working_zones))) return for curzone in working_zones.myzones: self.rasterize_zone_valuebyid(curzone, id, method, usemask, epsilon)
[docs] def fillin_vector_with_value(self, working_vector: vector, value: int | float | str, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict", usemask:bool = True, epsilon:float = 0.0): """ Fill in a vector using the provided value :param working_vector: vector :param value: value to fill in (float or int depending on the array type) :param method: method to get points inside the polygon - 'mpl' : matplotlib.path.Path.contains_points (fast for simple polygons) - 'shapely_strict' : shapely contains (strictly inside the polygon) - 'shapely_wboundary' : shapely contains or on the boundary (inside or on the polygon) - 'rasterio' : rasterio.features.rasterize (fast for complex polygons with holes) :param usemask: if True, only fill in the masked points :param epsilon: tolerance for shapely methods (to consider points on the boundary) """ if not isinstance(working_vector, vector): logging.error(_('working_vector must be a vector. You provided a {}').format(type(working_vector))) return try: if self.wolftype in [WOLF_ARRAY_FULL_INTEGER, WOLF_ARRAY_FULL_INTEGER8, WOLF_ARRAY_FULL_INTEGER16]: val = int(float(value)) else: val = float(value) except: logging.error(_('Value {} is not a number').format(value)) return ij = self.get_ij_inside_polygon(working_vector, method = method, usemask = usemask, eps = epsilon) if len(ij) == 0: logging.debug(_('No points to rasterize')) return self.array.data[ij[:, 0], ij[:, 1]] = val
[docs] def fillin_zone_with_value(self, working_zone: zone, value, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict", usemask:bool = True, epsilon:float = 0.0): """ Fill all cells inside each vector of a zone with the given value. :param working_zone: zone whose vectors define the fill area :param value: value to assign to cells inside the zone vectors :param method: spatial method for cell selection :param usemask: if ``True``, only fill unmasked cells :param epsilon: boundary tolerance """ if not isinstance(working_zone, zone): logging.error(_('working_zone must be a zone. You provided a {}').format(type(working_zone))) return for curvec in working_zone.myvectors: self.fillin_vector_with_value(curvec, value, method, usemask, epsilon)
[docs] def fillin_zones_with_value(self, working_zones: Zones, value, method:Literal['mpl', 'shapely_strict', 'shapely_wboundary', 'rasterio']="shapely_strict", usemask:bool = True, epsilon:float = 0.0): """ Fill all cells inside each vector of a :class:`Zones` collection with the given value. :param working_zones: :class:`Zones` whose vectors define the fill areas :param value: value to assign to cells inside the zone vectors :param method: spatial method for cell selection :param usemask: if ``True``, only fill unmasked cells :param epsilon: boundary tolerance """ if not isinstance(working_zones, Zones): logging.error(_('working_zones must be a Zones. You provided a {}').format(type(working_zones))) return for curzone in working_zones.myzones: self.fillin_zone_with_value(curzone, value, method, usemask, epsilon)
[docs] def fillin_from_xyz(self, xyz:np.ndarray): """ Remplissage du tableau à partir d'un tableau xyz. :param xyz: Numpy array - shape (n, 3) """ ij = self.get_ij_from_xy(xyz[:, 0], xyz[:, 1]) # filter points outside the array filter = (ij[0] >= 0) & (ij[0] < self.nbx) & (ij[1] >= 0) & (ij[1] < self.nby) xyz = xyz[filter] ij = (ij[0][filter], ij[1][filter]) if self.dtype == np.float32: self.array.data[ij] = np.float32(xyz[:, 2]) elif self.dtype == np.float64: self.array.data[ij] = np.float64(xyz[:, 2]) elif self.dtype == np.int32: self.array.data[ij] = np.int32(xyz[:, 2]) elif self.dtype == np.int16: self.array.data[ij] = np.int16(xyz[:, 2]) elif self.dtype == np.int8: self.array.data[ij] = np.int8(xyz[:, 2]) else: logging.warning(_('Type not supported : ')+str(self.dtype))
[docs] def fillin_from_ijz(self, ijz:np.ndarray): """ Remplissage du tableau à partir d'un tableau ijz :param ijz: Numpy array - shape (n, 3) """ try: i = ijz[:, 0].astype(int) j = ijz[:, 1].astype(int) except Exception as e: logging.error(_('Error in conversion of ijz to int : ')+str(e)) return if self.dtype == np.float32: self.array.data[i, j] = np.float32(ijz[:, 2]) elif self.dtype == np.float64: self.array.data[i, j] = np.float64(ijz[:, 2]) elif self.dtype == np.int32: self.array.data[i, j] = np.int32(ijz[:, 2]) elif self.dtype == np.int16: self.array.data[i, j] = np.int16(ijz[:, 2]) elif self.dtype == np.int8: self.array.data[i, j] = np.int8(ijz[:, 2]) else: logging.warning(_('Type not supported : ')+str(self.dtype))
# ======================================================================== # Filtering & Labelling # ========================================================================
[docs] def filter_inundation(self, epsilon:float = None, mask:np.ndarray = None): """ Apply filter on array : - mask data below eps - mask data outisde linkedvec :param epsilon: value under which data are masked :param mask: mask to apply if eps is None If all params are None, the function will mask NaN values """ if epsilon is not None: self.array[np.where(self.array<epsilon)] = self.nullvalue elif mask is not None: self.array.data[mask] = self.nullvalue idx_nan = np.where(np.isnan(self.array)) if len(idx_nan[0])>0: self.array[idx_nan] = self.nullvalue self.array.mask[idx_nan] = True logging.warning(_('NaN values found in the array')) if len(idx_nan[0])<10: for i,j in zip(idx_nan[0],idx_nan[1]): logging.warning(f'NaN at {i+1},{j+1} -- 1-based') else: for i,j in zip(idx_nan[0][:10],idx_nan[1][:10]): logging.warning(f'NaN at {i+1},{j+1} -- 1-based') logging.warning(f'... and {len(idx_nan[0])-10} more') self.mask_data(self.nullvalue) if self.linkedvec is not None: self.mask_outsidepoly(self.linkedvec) self.reset_plot()
[docs] def filter_independent_zones(self, n_largest:int = 1, reset_plot:bool = True): """ Label independent connected regions and keep only the *n* largest. :param n_largest: number of largest zones to keep :param reset_plot: if ``True``, reset the plot after filtering """ # labellisation labeled_array = self.array.data.copy() labeled_array[np.where(self.array.mask)] = 0 labeled_array, num_features = label(labeled_array) # convertion en masked array labeled_array = ma.asarray(labeled_array) labeled_array.mask = np.zeros(labeled_array.shape, dtype=bool) # application du masque labeled_array.mask[:,:] = self.array.mask[:,:] longueur = [] labeled_array[labeled_array.mask] = 0 longueur = list(sum_labels(np.ones(labeled_array.shape, dtype=np.int32), labeled_array, range(1, num_features+1))) longueur = [[longueur[j], j+1] for j in range(0, num_features)] # longueur = [[np.sum(labeled_array[labeled_array == j]) // j, j] for j in range(1, num_features+1)] longueur.sort(key=lambda x: x[0], reverse=True) self.array.mask[:,:] = True for j in range(0, n_largest): self.array.mask[labeled_array == longueur[j][1]] = False self.set_nullvalue_in_mask() if reset_plot: self.reset_plot()
[docs] def filter_zone(self, set_null:bool = False, reset_plot:bool = True): """ Keep only the labeled regions that contain at least one selected cell. mailles sont sélectionnées :param set_null: if ``True``, set filtered-out cells to :attr:`nullvalue` :param reset_plot: if ``True``, reset the plot after filtering """ if self.SelectionData.nb == 0: logging.info(_('No selection -- no filtering')) return if self.SelectionData.myselection == ALL_SELECTED: logging.info(_('All nodes selected -- no filtering')) return # labellisation labeled_array = self.array.data.copy() labeled_array[np.where(self.array.mask)] = 0 labeled_array, num_features = label(labeled_array) # récupération des zones utiles vals_ij = [self.get_ij_from_xy(cur[0], cur[1]) for cur in self.SelectionData.myselection] vals = list(set([labeled_array[int(cur[0]), int(cur[1])] for cur in vals_ij])) self.array.mask[:,:] = True for j in vals: self.array.mask[labeled_array == j] = False if set_null: self.set_nullvalue_in_mask() if reset_plot: self.reset_plot()
[docs] def clean_small_patches(self, min_size:int = 1, set_null:bool = False, reset_plot:bool = True): """Clean small patches in the array :param min_size: minimum patch size in cells; smaller patches are removed :param set_null: if ``True``, set removed cells to :attr:`nullvalue` :param reset_plot: if ``True``, reset the plot after cleaning """ if min_size < 1: logging.error(_('Minimum size must be greater than 1')) return # labellisation labeled_array = self.array.data.copy() labeled_array[np.where(self.array.mask)] = 0 labeled_array, num_features = label(labeled_array,) # count the number of elements in each patch patch_size = np.bincount(labeled_array.ravel()) # mask the small patches mask_small_patches = patch_size < min_size labeled_array[mask_small_patches[labeled_array]] = 0 # mask data in the array based on the labeled array self.array.mask[labeled_array == 0] = True if set_null: self.set_nullvalue_in_mask() if reset_plot: self.reset_plot()
[docs] def labelling(self, reset_plot:bool = True): """ Labelling of the array using Scipy :param reset_plot: if ``True``, reset the plot after labelling """ # labellisation labeled_array = self.array.data.copy() labeled_array[np.where(self.array.mask)] = 0 labeled_array, num_features = label(labeled_array) self.array.data[:,:] = labeled_array[:,:].astype(self.dtype) if reset_plot: self.reset_plot()
# ======================================================================== # Statistics & Analysis # ========================================================================
[docs] def statistics(self, inside_polygon:vector | Polygon | np.ndarray = None) -> dict[str, float | np.ndarray]: """ Statistics on Selected data or the whole array if no selection :param inside_polygon: vector or Polygon to select data inside the polygon :return: mean, std, median, sum, volume (sum*dx*dy), values :rtype: dict[str, float | np.ndarray] Translated Keys are: - _('Mean'): mean value of the selected data - _('Std'): standard deviation of the selected data - _('Median'): median value of the selected data - _('Sum'): sum of the selected data - _('Volume'): volume of the selected data (sum * dx * dy) - _('Values'): values of the selected data as a numpy array - _('Min'): minimum value of the selected data - _('Max'): maximum value of the selected data """ if inside_polygon is not None: if isinstance(inside_polygon, vector | Polygon): ij = self.get_ij_inside_polygon(inside_polygon) elif isinstance(inside_polygon, np.ndarray): if inside_polygon.ndim == 2 and inside_polygon.shape[1] == 2: ij = self.get_ij_from_xy_array(inside_polygon) else: logging.error(_('Invalid shape for inside_polygon numpy array')) return {} vals = self.array[ij[:,0], ij[:,1]] elif self.SelectionData.nb == 0 or self.SelectionData.myselection == ALL_SELECTED: logging.info(_('No selection -- statistics on the whole array')) vals = self.array[~self.array.mask].ravel().data # all values else: vals = self.SelectionData.get_values_sel() mean = np.ma.mean(vals) std = np.ma.std(vals) median = np.ma.median(vals) sum = np.ma.sum(vals) vol = sum * self.dx * self.dy min_val = np.ma.min(vals) max_val = np.ma.max(vals) return {_('Mean'): mean, _('Std'): std, _('Median'): median, _('Sum'): sum, _('Volume'): vol, _('Values'): vals, _('Min'): min_val, _('Max'): max_val}
[docs] def get_unique_values(self): """ Return unique values in the array """ unique = np.ma.unique(self.array) while unique[-1] is np.ma.masked and len(unique) > 1: unique = unique[:-1] return unique
[docs] def map_values(self, keys_vals:dict, default:float=None): """ Mapping array values to new values defined by a dictionnary. First, check if all values are in keys_vals. If not, set to default. If default is None, set to nullvalue. :param keys_vals: dictionary of values to map :param default: default value if key not found """ vals = self.get_unique_values() def_keys = [] for val in vals: if val not in keys_vals: logging.warning(_(f"Value {val} not in keys_vals -- Will be set to default or NullValue")) def_keys.append(val) continue for key, val in keys_vals.items(): self.array.data[self.array.data == key] = val if default is None: default = self.nullvalue for key in def_keys: self.array.data[self.array.data == key] = default self.mask_data(self.nullvalue) self.reset_plot()
# ======================================================================== # Linked Arrays & Palette # ========================================================================
[docs] def add_crosslinked_array(self, newlink:"WolfArrayModel"): """Add a bidirectional cross-link between this array and another. :param newlink: :class:`WolfArrayModel` to cross-link with this array """ self.linkedarrays.append(newlink) newlink.linkedarrays.append(self)
[docs] def share_palette(self): """ Share the color palette object with all cross-linked arrays. """ for cur in self.linkedarrays: if id(cur.mypal) != id(self.mypal): cur.mypal = self.mypal
# ======================================================================== # Array Manipulation (Crop, Paste, Extend, ...) # ========================================================================
[docs] def paste_all(self, fromarray:"WolfArrayModel", mask_after:bool=True): """Paste the data from another array into this one where geometries overlap. :param fromarray: source :class:`WolfArrayModel` from which to paste values :param mask_after: if ``True``, mask cells equal to :attr:`nullvalue` after pasting """ fromarray: WolfArrayModel # Récupération des bornes de la matrice source dans la matrice de destination i1, j1 = self.get_ij_from_xy(fromarray.origx, fromarray.origy) i2, j2 = self.get_ij_from_xy(fromarray.origx + fromarray.nbx * fromarray.dx, fromarray.origy + fromarray.nby * fromarray.dy) # Limitation des bornes à la matrice de destination i1 = max(0, i1) j1 = max(0, j1) i2 = min(self.nbx, i2) j2 = min(self.nby, j2) # Conversion des bornes utiles en coordonnées x1, y1 = self.get_xy_from_ij(i1, j1) x2, y2 = self.get_xy_from_ij(i2, j2) # Récupération des bornes utiles dans la matrice source i3, j3 = fromarray.get_ij_from_xy(x1, y1) i4, j4 = fromarray.get_ij_from_xy(x2, y2) # Sélection des valeurs non masquées # Attention : le résultat est en indices relatifs à [i3,j3] --> demande une conversion en indices absolus pour retrouver les valeurs dans la matrice complète usefulij = np.where(np.logical_not(fromarray.array.mask[i3:i4, j3:j4])) i5, j5 = self.get_ij_from_xy(x1, y1) # Décalage des indices pour la matrice de destination usefulij_dest = (usefulij[0] + i5, usefulij[1] + j5) usefulij[0][:] += i3 usefulij[1][:] += j3 self.array.data[usefulij_dest] = fromarray.array.data[usefulij] if mask_after: self.mask_data(self.nullvalue) self.reset_plot()
[docs] def set_values_sel(self, xy:list[float], z:list[float], update:bool=True): """ Set values at the selected positions :param xy: [[x1,y1],[x2,y2],...] :param z: [z1,z2,...] :param update: update the plot """ sel = np.asarray(xy) z = np.asarray(z) if len(sel) == 1: ijall = self.xy2ij_np(sel) i, j = ijall[0, 0], ijall[0, 1] if i < 0 or i >= self.nbx or j < 0 or j >= self.nby: logging.error(_('Selected point is out of bounds')) return else: self.array[i, j] = z else: # ijall = np.asarray(self.get_ij_from_xy(sel[:, 0], sel[:, 1])).transpose() ijall = self.xy2ij_np(sel) useful = np.where((ijall[:, 0] >= 0) & (ijall[:, 0] < self.nbx) & (ijall[:, 1] >= 0) & (ijall[:, 1] < self.nby)) self.array[ijall[useful, 0], ijall[useful, 1]] = z[useful] self.mask_data(self.nullvalue) if update: self.reset_plot()
[docs] def crop_array(self, bbox:list[list[float],list[float]], setnull_trx_try:bool = False) -> "WolfArrayModel": """ Crop the data based on the bounding box. Beware of the grid: - If the cropped region is smaller than a cell, then the cropped array will be empty. - If the cropped region doesn't align with the source array grid, it will be forced to do so. :param bbox: bounding box [[xmin, xmax], [ymin, ymax]]. :param setnull_trx_try: set the translation to 0 if True, origx and origy will be set to the lower left corner of the bbox. Default is `False`. """ xmin, xmax = bbox[0] ymin, ymax = bbox[1] # Make sure the bounding box can be used. if xmin > xmax: xmin, xmax = xmax, xmin if ymin > ymax: ymin, ymax = ymax, ymin imin, jmin = self.get_ij_from_xy(xmin, ymin) imax, jmax = self.get_ij_from_xy(xmax, ymax) imin = int(imin) jmin = int(jmin) imax = int(imax) jmax = int(jmax) newheader = header_wolf() newheader.nbx = imax-imin newheader.nby = jmax-jmin newheader.dx = self.dx newheader.dy = self.dy newheader.origx, newheader.origy = self.get_xy_from_ij(imin, jmin, abs = setnull_trx_try) newheader.origx -= self.dx / 2. newheader.origy -= self.dy / 2. newheader.translx = self.translx newheader.transly = self.transly if setnull_trx_try: newheader.translx = 0. newheader.transly = 0. newarray = type(self)(srcheader=newheader) if imin < imax and jmin < jmax: newarray.array[:,:] = self.array[imin:imax, jmin:jmax] else: # One of the dimensions has 0 size => the array is empty pass return newarray
[docs] def crop_masked_at_edges(self): """ Crop the array to remove masked cells at the edges of the array :return: cropped array, WolfArrayModel instance """ # Get max indexes Existing_indexes = np.argwhere(self.array.mask!=True) Max_index = np.max(Existing_indexes, 0) Min_index = np.min(Existing_indexes, 0) # convert index in location xMax, yMax = self.convert_ij2xy_np(Max_index.reshape((1,2))) xMin, yMin = self.convert_ij2xy_np(Min_index.reshape((1,2))) # crop nbx=np.ceil((xMax[0]-xMin[0])/self.dx).astype(int)+1 #+1 otherwise you remove one line nby=np.ceil((yMax[0]-yMin[0])/self.dy).astype(int)+1 #+1 otherwise you remove one column return self.crop(int(Min_index[0]),int(Min_index[1]),int(nbx),int(nby))
[docs] def crop(self, i_start:int, j_start:int, nbx:int, nby:int, k_start:int=1, nbz:int=1): """ Crop the array :param i_start: start index in x :param j_start: start index in y :param nbx: number of cells in x :param nby: number of cells in y :param k_start: start index in z :param nbz: number of cells in z :return: cropped array, WolfArrayModel instance """ assert type(i_start) == int, "i_start must be an integer" assert type(j_start) == int, "j_start must be an integer" assert type(nbx) == int, "nbx must be an integer" assert type(nby) == int, "nby must be an integer" assert type(k_start) == int, "k_start must be an integer" assert type(nbz) == int, "nbz must be an integer" newWolfArray = type(self)(nullvalue=self.nullvalue) newWolfArray.nbx = nbx newWolfArray.nby = nby newWolfArray.dx = self.dx newWolfArray.dy = self.dy newWolfArray.origx = self.origx + float(i_start) * self.dx newWolfArray.origy = self.origy + float(j_start) * self.dy newWolfArray.translx = self.translx newWolfArray.transly = self.transly if self.nbdims == 3: newWolfArray.nbz = nbz newWolfArray.dz = self.dz newWolfArray.origz = self.origz + float(k_start) * self.dz newWolfArray.translz = self.translz newWolfArray.array = self.array[i_start:i_start + nbx, j_start:j_start + nby, k_start:k_start + nbz].copy() elif self.nbdims == 2: newWolfArray.array = self.array[i_start:i_start + nbx, j_start:j_start + nby].copy() return newWolfArray
[docs] def extend(self, x_ext:int, y_ext:int): """ Extend the array Crop is the opposite :param x_ext: number of cells to add/remove in the x direction (positive = extend, negative = shrink) :param y_ext: number of cells to add/remove in the y direction (positive = extend, negative = shrink) """ assert x_ext >= 0 and y_ext >= 0 assert self.nbdims == 2, "Only 2D arrays are supported" # Remember WolfArrays are masked. Therefore # we need to extend mask. In this case, not specifying # anything will expand the mask with "dont mask" # values. # extend vertically ex = self.array if x_ext > 0: # dtype is important: it allows to keep a Fortran friendly # type I think. ex = ma.append( ex, np.array([0] * ex.shape[1] * x_ext, dtype=ex.dtype).reshape((x_ext, -1)), axis=0, ) self.nbx += x_ext # extend horizontally if y_ext > 0: ex = ma.append( ex, np.array([0] * ex.shape[0] * y_ext, dtype=ex.dtype).reshape((-1, y_ext)), axis=1, ) self.nby += y_ext self.array = ex self.mask_data(self.nullvalue)
[docs] def extremum(self, which:Literal['min','max']='min'): """Return the extremum value :param which: which extremum to find: ``'min'`` or ``'max'`` """ if which == 'min': my_extr = np.amin(self.array) elif which == 'max': my_extr = np.amax(self.array) else: logging.warning(_('Extremum not supported : ')+which) my_extr = -99999. return my_extr
[docs] def nullify_border(self, width:int = 1): """ Set border to nullvalue :param width: number of border cells to set to :attr:`nullvalue` """ self.array.data[:width,:] = self.nullvalue self.array.data[-width:,:] = self.nullvalue self.array.data[:,:width] = self.nullvalue self.array.data[:,-width:] = self.nullvalue self.array.mask[:width,:] = True self.array.mask[-width:,:] = True self.array.mask[:,:width] = True self.array.mask[:,-width:] = True
[docs] def convolve(self, filter, method:Literal['scipyfft', 'jaxfft', 'classic']='scipyfft', inplace:bool=True) -> "WolfArrayModel": """ Convolve the array with a filter. The array and the filter should have the same dtype. If not, a warning is issued and the array dtype is used. Thus: - An int8 array convolved with float32 filter will be converted to int8. - A float32 array convolved with int8 filter will be converted to float32. **User should be careful with the dtype of both the array AND the filter.** :param filter: filter to convolve with :param method: method to use for convolution ('scipyfft', 'jaxfft', 'classic') :param inplace: if True, the array is modified in place, otherwise a new array is returned -- same type as the original one :return: convolved array, WolfArrayModel instance """ if isinstance(filter, WolfArrayModel): _filter = filter.array.data.copy() _filter[filter.array.mask] = 0.0 elif isinstance(filter, np.ndarray): _filter = filter.copy() _filter[np.isnan(_filter)] = 0.0 else: logging.error("Filter must be a WolfArrayModel or a numpy array") return None if self.dtype != _filter.dtype: logging.warning(_("Filter and array should have the same dtype - {} - {}").format(self.dtype, _filter.dtype)) logging.warning(_("Convolution result will have the dtype - {}").format(self.dtype)) if method == 'classic': from scipy.signal import convolve2d convolved = convolve2d(self.array.data, _filter, mode='same', fillvalue=0.0) elif method == 'scipyfft': from scipy.signal import fftconvolve convolved = fftconvolve(self.array.data, _filter, mode='same') elif method == 'jaxfft': import jax.numpy as jnp from jax.scipy.signal import fftconvolve as jax_fftconvolve jax_filter = jnp.array(_filter) jax_array = jnp.array(self.array.data) convolved = jax_fftconvolve(jax_array, jax_filter, mode='same') convolved = jnp.asarray(convolved) else: logging.error("Method not supported") return None if inplace: self.array.data[:,:] = convolved return self else: newWolfArray = type(self)(mold=self, whichtype=self.wolftype) newWolfArray.array.data[:,:] = convolved newWolfArray.array.mask[:,:] = self.array.mask[:,:] newWolfArray.count() return newWolfArray
[docs] def concatenate(self, list_arr:list["WolfArrayModel"], nullvalue:float = 0.): """ Concatenate the values from another WolfArrays into a new one :param list_arr: list of WolfArrayModel objects :param nullvalue: null value for the output array :return: a new WolfArrayModel :return_type: WolfArrayModel """ list_arr:list[WolfArrayModel] for curarray in list_arr: assert isinstance(curarray, WolfArrayModel), "The list must contain WolfArrayModel objects" assert curarray.nbdims == self.nbdims, "The arrays must have the same number of dimensions" assert curarray.dx == self.dx and curarray.dy == self.dy, "The arrays must have the same dx and dy" assert curarray.translx == 0 and curarray.transly == 0, "The translations must be zero" assert (np.abs(curarray.origx-self.origx)%int(self.dx) == 0)and(np.abs(curarray.origy-self.origy)%int(self.dy) == 0), "The origins are not compatible! You need to do some interpolation stuff" assert self.translx == 0 and self.transly == 0, "The translations must be zero" assert self.wolftype == curarray.wolftype, "The arrays must have the same wolftype" # create an array newArray = type(self)(nullvalue=nullvalue, whichtype=self.wolftype) Xlim,Ylim = self.find_union(list_arr) newArray.origx = Xlim[0] newArray.origy = Ylim[0] newArray.dx = self.dx newArray.dy = self.dy newArray.nbx = int(np.diff(Xlim)[0]/newArray.dx) newArray.nby = int(np.diff(Ylim)[0]/newArray.dy) newArray.translx = 0. newArray.transly = 0. newArray.array = np.ma.masked_array(np.ones((newArray.nbx, newArray.nby), dtype=self.dtype) * nullvalue, mask=True, dtype=self.dtype) newArray.paste_all(self, mask_after=False) for curarray in list_arr: Array_intersect = curarray.find_intersection(self, ij=True) if Array_intersect is not None: logging.info(_("There is intersection. By default, the array {} overlaps the first one.".format(curarray.filename))) newArray.paste_all(curarray, mask_after=False) newArray.mask_data(nullvalue) return newArray
[docs] def as_WolfArray(self, abs:bool=True) -> "WolfArrayModel": """ Return a WolfArrayModel object from this WolfArrayModel :param abs: if ``True``, use absolute coordinates """ NewArray = type(self)(mold=self) if abs: NewArray.origx += self.translx NewArray.origy += self.transly NewArray.translx = 0. NewArray.transly = 0. return NewArray
# ======================================================================== # Volume & Surface Estimation # ========================================================================
[docs] def volume_estimation(self, zmax:float = None, nb:int = 10, labeled:bool = False, use_memory:bool = False, memory_key:str = None, output_fn:str = None, axs:plt.Axes = None): """Estimation of the volume of the selected zone. :param zmax: maximum elevation (if None, uses array max) :param nb: number of elevation steps :param labeled: if True, use labeling to identify connected areas :param use_memory: if True, use memory selection to identify areas :param memory_key: key of the memory selection to use (if use_memory) :param output_fn: filename for zoning result (if None, no file is written) :param axs: matplotlib Axes to plot the results :return: (fig, axs) or None if cancelled """ vect = self.array[np.logical_not(self.array.mask)].flatten() zmin = np.amin(vect) if zmax is None: zmax = np.amax(vect) deltaz = (zmax - zmin) / nb curz = zmin labeled_areas = [] stockage = [] z = [] keys_select = list(self.SelectionData.selections.keys()) if use_memory: if memory_key is not None: key = memory_key elif len(keys_select) == 1: key = keys_select[0] elif len(keys_select) > 1: key = keys_select[0] logging.warning(_('Multiple memory selections available, using first one: %s') % key) else: logging.error(_('No memory selection available')) return xy_key = self.SelectionData.selections[key]['select'] if len(xy_key) == 0: logging.error(_('Empty selection')) return ij_key = self.xy2ij_np(xy_key) extensionmax = type(self)(mold=self) extensionmax.array[:, :] = 0. if labeled: for i in tqdm(range(nb + 1)): z.append(curz) # Compute difference between the array and the current elevation if i == 0: diff = self.array - (curz + 1.e-3) else: diff = self.array - curz # Keep only the negative values diff[diff > 0] = 0. diff.data[diff.mask] = 0. if np.count_nonzero(diff < 0.) > 1: # More than one cell below the elevation # Labeling of the cells below the elevation labeled_array, num_features = label(diff.data) # Applying the same mask as the original array labeled_array = ma.asarray(labeled_array) labeled_array.mask = self.array.mask if use_memory: # Use only the labeled areas containing the cells stored in memory selection labeled_areas = [] for curij in ij_key: labeled_areas.append(labeled_array[curij[0], curij[1]]) # Remove masked value labeled_areas = [x for x in labeled_areas if x is not ma.masked] # Remove duplicates labeled_areas = list(set(labeled_areas)) for curarea in labeled_areas: if curarea ==0: volume = 0. surface = 0. continue # Search mask = labeled_array == curarea area = np.where(mask) volume = -self.dx * self.dy * np.sum(diff[area]) surface = self.dx * self.dy * len(area[0]) extensionmax.array[np.logical_and(mask, extensionmax.array[:, :] == 0.)] = float(i + 1) else: labeled_areas = list(sum_labels(np.ones(labeled_array.shape, dtype=np.int32), labeled_array, range(1, num_features+1))) labeled_areas = [[x, y] for x, y in zip(labeled_areas, range(1, num_features+1))] labeled_areas.sort(key=lambda x: x[0], reverse=True) jmax = labeled_areas[0][1] nbmax = labeled_areas[0][0] volume = -self.dx * self.dy * np.sum(diff[labeled_array == jmax]) surface = self.dx * self.dy * nbmax extensionmax.array[np.logical_and(labeled_array == jmax, extensionmax.array[:, :] == 0.)] = float(i + 1) else: # Only one cell below the elevation volume = -self.dx * self.dy * np.sum(diff) surface = self.dx * self.dy * np.count_nonzero(diff<0.) extensionmax.array[np.logical_and(diff[:,:]<0., extensionmax.array[:, :] == 0.)] = float(i + 1) stockage.append([volume, surface]) curz += deltaz else: for i in tqdm(range(nb + 1)): z.append(curz) if i == 0: diff = self.array - (curz + 1.e-3) else: diff = self.array - curz diff[diff > 0] = 0. diff.data[diff.mask] = 0. volume = -self.dx * self.dy * np.sum(diff) surface = self.dx * self.dy * np.count_nonzero(diff<0.) stockage.append([volume, surface]) curz += deltaz extensionmax.array[np.logical_and(diff[:,:]<0., extensionmax.array[:, :] == 0.)] = float(i + 1) if output_fn is None: output_fn = self._prompt_save_file( _('Choose filename for zoning result'), 'bin (*.bin)|*.bin|tif (*.tif)|*.tif|All (*.*)|*.*') if output_fn is not None: extensionmax.filename = output_fn extensionmax.write_all() if axs is None: fig, axs = plt.subplots(1, 2, tight_layout=True) else: fig = axs[0].get_figure() axs[0].plot(z, [x[0] for x in stockage]) axs[0].scatter(z, [x[0] for x in stockage]) axs[0].set_xlabel(_("Elevation [m]"), size=15) axs[0].set_ylabel(_("Volume [$m^3$]"), size=15) axs[1].step(z, [x[1] for x in stockage], where='post') axs[1].scatter(z, [x[1] for x in stockage]) axs[1].set_xlabel(_("Elevation [m]"), size=15) axs[1].set_ylabel(_("Surface [$m^2$]"), size=15) fig.suptitle(_("Retention capacity of the selected zone"), fontsize=20) if output_fn is not None: with open(output_fn[:-4] + '_hvs.txt', 'w') as f: f.write('H [m]\tZ [m DNG]\tVolume [$m^3$]\tSurface [$m^2$]\n') for curz, (curv, curs) in zip(z, stockage): f.write('{}\t{}\t{}\t{}\n'.format(curz - zmin, curz, curv, curs)) return fig, axs
[docs] def surface_volume_estimation_from_elevation(self, axs:plt.Axes= None, desired_zmin:float = None, desired_zmax:float = None, nb:int = 100, method:Literal['all below', 'largest area', 'selected'] = 'largest area', selected_cells:list[tuple[float, float]] = None, dirout:str | Path = None, invert_z:bool = False, array_to_integrate:"WolfArrayModel" = None ): """ Estimation of the surface and volume from an elevation array. :param axs: Axes to plot the results :param desired_zmin: Minimum elevation to consider :param desired_zmax: Maximum elevation to consider :param nb: Number of elevation steps :param method: Method to use for the estimation ('all below', 'largest area', 'selected') :param selected_cells: List of kernel cells (if method is 'selected') -- Consider only the areas containing these cells :param dirout: Directory to save the results :param invert_z: if ``True``, invert the elevation axis (useful for bathymetry) :param array_to_integrate: optional array whose values are integrated instead of pure surface area :return: Figure and Axes with the results :rtype: tuple[plt.Figure, plt.Axes] """ assert method in ['all below', 'largest area', 'selected'], _('Method must be one of "all below", "largest area", or "selected"') if array_to_integrate is not None: assert isinstance(array_to_integrate, WolfArrayModel), _('array_to_integrate must be a WolfArrayModel instance') assert array_to_integrate.nbx == self.nbx and array_to_integrate.nby == self.nby, _('array_to_integrate must have the same dimensions as the current WolfArrayModel') vect = self.array[np.logical_not(self.array.mask)].flatten() zmin = np.amin(vect) zmax = np.amax(vect) if desired_zmin is not None: zmin = desired_zmin if desired_zmax is not None: zmax = desired_zmax deltaz = (zmax - zmin) / nb curz = zmin labeled_areas = [] stockage = [] z = [] if method == 'all below': labeled = False else: labeled = True if method == 'largest area': use_memory = False else: xy_key = np.asarray(selected_cells, dtype=np.float64) if xy_key is None: logging.error(_('Empty selection')) return if len(xy_key) == 0: logging.error(_('Empty selection')) return ij_key = self.xy2ij_np(xy_key) use_memory = True extensionmax = type(self)(mold=self) extensionmax.array[:, :] = 0. if labeled: for i in tqdm(range(nb + 1)): z.append(curz) # Compute difference between the array and the current elevation if i == 0: diff = self.array - (curz + 1.e-3) else: diff = self.array - curz # Keep only the negative values diff[diff > 0] = 0. diff.data[diff.mask] = 0. if np.count_nonzero(diff < 0.) > 1: # More than one cell below the elevation # Labeling of the cells below the elevation labeled_array, num_features = label(diff.data) # Applying the same mask as the original array labeled_array = ma.asarray(labeled_array) labeled_array.mask = self.array.mask if use_memory: # Use only the labeled areas containing the cells stored in selected_cells labeled_areas = [] for curij in ij_key: labeled_areas.append(labeled_array[curij[0], curij[1]]) # Remove masked value labeled_areas = [x for x in labeled_areas if x is not ma.masked] # Remove duplicates labeled_areas = list(set(labeled_areas)) for curarea in labeled_areas: if curarea == 0: volume = 0. surface = 0. continue # Search mask = labeled_array == curarea area = np.argwhere(mask) if array_to_integrate is None: volume = -self.dx * self.dy * np.ma.sum(diff[area]) else: volume = self.dx * self.dy * np.ma.sum(array_to_integrate.array[area[:,0], area[:,1]]) surface = self.dx * self.dy * area.shape[0] extensionmax.array[np.logical_and(mask, extensionmax.array == 0.)] = float(i + 1) else: labeled_areas = list(sum_labels(np.ones(labeled_array.shape, dtype=np.int32), labeled_array, range(1, num_features+1))) labeled_areas = [[x, y] for x, y in zip(labeled_areas, range(1, num_features+1))] labeled_areas.sort(key=lambda x: x[0], reverse=True) jmax = labeled_areas[0][1] nbmax = labeled_areas[0][0] if array_to_integrate is None: volume = -self.dx * self.dy * np.ma.sum(diff[labeled_array == jmax]) else: volume = self.dx * self.dy * np.ma.sum(array_to_integrate.array[labeled_array == jmax]) surface = self.dx * self.dy * nbmax extensionmax.array[np.logical_and(labeled_array == jmax, extensionmax.array == 0.)] = float(i + 1) else: # Only one cell below the elevation if array_to_integrate is None: volume = -self.dx * self.dy * np.ma.sum(diff) else: volume = self.dx * self.dy * np.ma.sum(array_to_integrate.array[diff < 0.]) surface = self.dx * self.dy * np.count_nonzero(diff<0.) extensionmax.array[np.logical_and(diff[:,:]<0., extensionmax.array[:, :] == 0.)] = float(i + 1) stockage.append([abs(volume), abs(surface)]) curz += deltaz else: for i in tqdm(range(nb + 1)): z.append(curz) if i == 0: diff = self.array - (curz + 1.e-3) else: diff = self.array - curz diff[diff > 0] = 0. diff.data[diff.mask] = 0. if array_to_integrate is None: volume = -self.dx * self.dy * np.ma.sum(diff) surface = self.dx * self.dy * np.count_nonzero(diff<0.) stockage.append([abs(volume), abs(surface)]) else: ij = np.argwhere(diff < 0.) volume = self.dx * self.dy * np.ma.sum(array_to_integrate.array[ij[:,0], ij[:,1]]) surface = self.dx * self.dy * ij.shape[0] stockage.append([volume, abs(surface)]) curz += deltaz extensionmax.array[np.logical_and(diff[:,:]<0., extensionmax.array[:, :] == 0.)] = float(i + 1) if dirout is None: dirout = Path(self.filename).parent / 'surface_volume' if isinstance(dirout, str): dirout = Path(dirout) if not dirout.exists(): dirout.mkdir(parents=True, exist_ok=True) extensionmax.filename = str(dirout / 'surface_volume_extension.tif') extensionmax.write_all() if axs is None: fig, axs = plt.subplots(1, 2, tight_layout=True) else: fig = axs[0].get_figure() if invert_z: z = [abs(zmin - x) for x in z] labelx = _("Water depth [m]") else: labelx = _("Elevation [m]") axs[0].plot(z, [x[0] for x in stockage]) axs[0].scatter(z, [x[0] for x in stockage]) axs[0].set_xlabel(labelx, size=10) axs[0].set_ylabel(_("Volume [$m^3$]"), size=10) axs[1].step(z, [x[1] for x in stockage], where='post') axs[1].scatter(z, [x[1] for x in stockage]) axs[1].set_xlabel(labelx, size=10) axs[1].set_ylabel(_("Surface [$m^2$]"), size=10) fig.suptitle(_("Retention capacity"), fontsize=12) fig.savefig(dirout / 'surface_volume.png', dpi=300) fn = dirout / 'surface_volume_hvs.txt' with open(fn, 'w') as f: f.write('H [m]\tZ [m DNG]\tVolume [$m^3$]\tSurface [$m^2$]\n') for curz, (curv, curs) in zip(z, stockage): f.write('{}\t{}\t{}\t{}\n'.format(curz - zmin, curz, curv, curs)) return fig, axs
[docs] def surface_volume_estimation_from_waterdepth(self, axs:plt.Axes=None, desired_zmin:float = None, desired_zmax:float = None, nb:int = 100, method:Literal['all below', 'largest area', 'selected'] = 'largest', selected_cells:list[tuple[float, float]] = None, dirout:str | Path = None): """ Estimation of the surface and volume from a water depth array. :param axs: Axes to plot the results :param desired_zmin: Minimum water depth to consider :param desired_zmax: Maximum water depth to consider :param nb: Number of water depth steps :param method: Method to use for the estimation ('all below', 'largest area', 'selected') :param selected_cells: List of kernel cells (if method is 'selected') -- Consider only the areas containing these cells :param dirout: Directory to save the results :return: Figure and Axes with the results :rtype: tuple[plt.Figure, plt.Axes] """ neg_array = type(self)(mold=self) neg_array.array[:,:] = -self.array return neg_array.surface_volume_estimation_from_elevation(desired_zmin=-desired_zmax if desired_zmax is not None else None, desired_zmax=-desired_zmin if desired_zmin is not None else None, nb=nb, method=method, selected_cells=selected_cells, dirout=dirout if dirout is not None else Path(self.filename).parent / 'surface_volume', axs=axs, invert_z=True)
# ======================================================================== # Gradient, Hillshade & Contours # ========================================================================
[docs] def hillshade(self, azimuth:float, angle_altitude:float): """Create a hillshade array -- see "hillshade" function accelerated by JIT :param azimuth: sun azimuth angle in degrees (default: 315°) :param angle_altitude: sun altitude angle in degrees above the horizon """ if self.shaded is None: logging.error(_('No shaded array')) return self.shaded.set_header(self.get_header()) self.shaded.array = hillshade(self.array.data, azimuth, angle_altitude) self.shaded.delete_lists()
[docs] def get_gradient_norm(self): """ Compute and return the norm of the gradient """ mygradient = type(self)(mold=self) x, y = np.gradient(self.array, self.dx, self.dy) mygradient.array = ma.asarray(np.pi / 2. - np.arctan(np.sqrt(x * x + y * y))) mygradient.array.mask = self.array.mask return mygradient
[docs] def get_laplace(self): """ Compute and return the laplacian """ mylap = type(self)(mold=self) mylap.array = ma.asarray(laplace(self.array) / self.dx ** 2.) mylap.array.mask = self.array.mask return mylap
[docs] def suxsuy_contour(self, filename:str='', abs:bool=False, one_vec_if_ml:bool = True) -> tuple[list[int,int], list[int,int], vector | zone, bool]: """ The borders are computed on basis of the current *mask* :param filename: if provided, write 'sux', 'sux' and 'xy' files :param abs: add translation coordinates (Global World Coordinates) :param one_vec_if_ml: if ``True``, merge multi-level contour segments into a single vector :return: indicesX, indicesY, contourgen, interior indicesX : list of coupled indices along X - vertical border - 1-based like Fortran indicesY : list of coupled indices along Y - horizontal border - 1-based like Fortran contourgen : external contour interior : if False, contour is unique ; if True, interior contours exist -> interior parts are merged """ # Calcul des bords libres SUX, SUY indicesX=[] indicesY=[] locls=[] dx = self.dx dy = self.dy translx = self.origx transly = self.origy if abs: translx += self.translx transly += self.transly horiz = np.where(self.array.mask[0:self.nbx-1,0:self.nby-1] ^ self.array.mask[1:self.nbx,0:self.nby-1]) vert = np.where(self.array.mask[0:self.nbx-1,0:self.nby-1] ^ self.array.mask[0:self.nbx-1,1:self.nby]) for i,j in zip(horiz[0],horiz[1]): x1 = float(i+1) * dx + translx y1 = float(j+1) * dy + transly indicesX.append([i+2, j+1]) locls.append(LineString([[x1,y1-dy],[x1,y1]])) for i,j in zip(vert[0],vert[1]): x1 = float(i+1) * dx + translx y1 = float(j+1) * dy + transly indicesY.append([i+1, j+2]) locls.append(LineString([[x1-dx,y1],[x1,y1]])) if not locls: raise Exception(_("I can't detect any contour. Is this right ?")) interior=False # generate contour from partial linestring --> using Shapely to do that ! contour = linemerge(locls) if contour.geom_type == 'LineString': # All is fine - only one vector xy = np.asarray(contour.coords) nb = len(xy) contourgen = vector(name='external border') for x,y in xy: contourgen.add_vertex(wolfvertex(x,y)) elif contour.geom_type == 'MultiLineString': if one_vec_if_ml: interior=True # Multiple vectors --> combine # searching the longest LineString -> external contour contour:MultiLineString lenghts=[mygeom.length for mygeom in contour.geoms] ind = np.argmax(lenghts) xyall=[np.column_stack([np.asarray(mygeom.coords),np.zeros(len(mygeom.coords))]) for mygeom in contour.geoms] # coordinates of the longest LineString xy = xyall[ind] for i in range(len(xyall)): if i!=ind: # Concatenate local LineString to the external contour + 2 connection segments # Z coordinate is set to 1. -> will be used to check it after and change "in_use" property xy=np.concatenate([xy, np.asarray([xyall[i][0,0],xyall[i][0,1],1.]).reshape([1,3]), xyall[i][1:], np.asarray([xy[0,0],xy[0,1],1.]).reshape([1,3])]) nb = len(xy) contourgen = vector(name='external border') for x,y,z in xy: contourgen.add_vertex(wolfvertex(x,y,z)) contourgen.myvertices[-1].in_use = z == 0. # the new vertex is related to a connection segment --> ignore for numerical precision in intersection operations/calculations else: contourgen = zone(name = 'contour') for cur_ls in contour.geoms: xy = np.asarray(cur_ls.coords) nb = len(xy) cur_vec = vector(name='external border') for x,y in xy: cur_vec.add_vertex(wolfvertex(x,y)) contourgen.add_vector(cur_vec, forceparent=True) else: contourgen = None err = _(f"Unsupported Shapely contour result: {contour.geom_type}") logging.warning(err) if filename != '': # There is some knowledge about SUX, SUY, XY files there: # https://gitlab.uliege.be/HECE/wolf-interface/-/blob/master/InterfaceVB6/FrmAffichage.frm with open(filename+'.sux','w') as f: np.savetxt(f,np.asarray(indicesX), delimiter=',', fmt='%u,%u') with open(filename + '.suy', 'w') as f: np.savetxt(f, np.asarray(indicesY), delimiter=',', fmt='%u,%u') with open(filename + '.xy', 'w') as f: f.write('{}\n'.format(nb)) # FIXME Stef commented that. Maybe wrong !!! # xy[:,0]-=translx-self.origx # xy[:,1]-=transly-self.origy np.savetxt(f, xy[:,:2], delimiter='\t') return indicesX,indicesY,contourgen,interior
[docs] def contour(self, levels:Union[int, list[float]] = 10) -> Zones: """Compute contour lines :param levels: list of contour levels to extract """ if isinstance(levels, int): levels = np.linspace(self.array.min(), self.array.max(), levels) x, y = self.meshgrid() cs = plt.contour(x, y, self.array, levels=levels) zones = Zones(idx = self.idx + '_contour', mapviewer = self.get_mapviewer()) if hasattr(cs, 'collections'): # MATPLOTLIB <=3.8 logging.debug('MATPLOTLIB <=3.8') logging.debug(_('DEPRECATED: MATPLOTLIB <=3.8 -- You should consider upgrading to a more recent version of Matplotlib')) logging.debug(_('Take care Basemap may not be available in recent versions of mpl_toolkits')) for collection, level in zip(cs.collections, cs.levels): zone_level = zone(name=f'Contour {level}') for idx, path in enumerate(collection.get_paths()): vector_level = vector(name=f'Contour {level} - {idx}') zone_level.add_vector(vector_level, forceparent=True) for vertices in path.to_polygons(closed_only=False): for vertex in vertices: vector_level.add_vertex(wolfvertex(vertex[0], vertex[1])) zones.add_zone(zone_level, forceparent=True) else: # MATPLOTLIB >3.8 logging.debug('MATPLOTLIB >3.8') for path, level in zip(cs.get_paths(), cs.levels): zone_level = zone(name=f'Contour {level}') for idx, vertices in enumerate(path.to_polygons(closed_only=False)): vector_level = vector(name=f'Contour {level} - {idx}') zone_level.add_vector(vector_level, forceparent=True) for vertex in vertices: vector_level.add_vertex(wolfvertex(vertex[0], vertex[1])) zones.add_zone(zone_level, forceparent=True) return zones
# ======================================================================== # Hole Filling & Inpainting # ========================================================================
[docs] def inpaint(self, mask_array:"WolfArrayModel" = None, test_array:"WolfArrayModel" = None, ignore_last:int = 1, multiprocess:bool = True): """ InPaintaing holes in the array :param mask_array: where computation is done :param test_array: used in test -- interpolation is accepted if new value is over test_array :param ignore_last: number of last patches to ignore :param multiprocess: use multiprocess for inpainting """ from ..eikonal import inpaint_array if mask_array is None: mask_array = self.array.mask elif isinstance(mask_array, WolfArrayModel): mask_array = mask_array.array.data if test_array is None: test_array = np.ones(self.array.data.shape) * self.array.data.min() elif isinstance(test_array, WolfArrayModel): test_array = test_array.array.data time, wl_np, wd_np = inpaint_array(self.array.data, mask_array, test_array, ignore_last_patches= ignore_last, dx = self.dx, dy = self.dy, NoDataValue = self.nullvalue, inplace=True, multiprocess= multiprocess) # Create water depth array # Force the same type as the current array # (in case of calling it from a child class with a different type like GUI array) wd = type(self)(mold=self) wd.array[:,:] = wd_np[:,:] wd.mask_data(self.nullvalue) wl = self self.mask_data(self.nullvalue) self.reset_plot() return time, wl, wd
[docs] def _inpaint_waterlevel_dem_dtm(self, dem:"WolfArrayModel", dtm:"WolfArrayModel", ignore_last:int = 1, use_fortran:bool = False, multiprocess:bool = True): """ InPaintaing waterlevel holes in the array. We use DEM and DTM to mask and constraint the inpainting process. :param dem: Digital Elevation Model (same as simulation model) :param dtm: Digital Terrain Model :param ignore_last: number of last patches to ignore :param use_fortran: use Fortran inpainting code :param multiprocess: use multiprocess for inpainting """ if use_fortran: import shutil from tempfile import TemporaryDirectory # check if hoels.exe is available in PATH if shutil.which('holes.exe') is None: logging.error(_('holes.exe not found in PATH')) logging.info(_('We use the Python version of inpainting')) use_fortran = False else: with TemporaryDirectory() as tmpdirname: # create mask from DEM, DTM and WL mask = self._create_building_holes_dem_dtm(dem, dtm, ignore_last) # save mask and dtm to temporary files locdir = Path(tmpdirname) wl_name = locdir / 'array.bin' mask_name = locdir / 'mask.bin' dtm_name = locdir / 'dtm.bin' mask.write_all(mask_name) dtm.write_all(dtm_name) oldname = self.filename self.write_all(wl_name) self.filename = oldname # call holes.exe olddir = os.getcwd() # change to temporary directory os.chdir(locdir) shell_command = f'holes.exe inpaint in={wl_name} mask={mask_name} dem={dtm_name} avoid_last={ignore_last} out=temp' logging.info(shell_command) logging.info('Inpainting holes using holes.exe') os.system(shell_command) logging.info('Done - reading inpainted array') # read inpainted array wl = type(self)(locdir / 'temp_combl.tif') wd = type(self)(locdir / 'temp_h.tif') wd.array[wd.array < 0.] = 0. wd.mask_data(0.) os.chdir(olddir) self.array.data[:,:] = wl.array.data[:,:] time = None if not use_fortran: from ..eikonal import inpaint_waterlevel time, wl_np, wd_np = inpaint_waterlevel(self.array.data, dem.array.data, dtm.array.data, ignore_last_patches= ignore_last, dx = self.dx, dy = self.dy, NoDataValue = self.nullvalue, inplace=True, multiprocess= multiprocess) wd = type(self)(mold=self) wd.array[:,:] = wd_np[:,:] wd.mask_data(self.nullvalue) wl = self self.mask_data(self.nullvalue) self.reset_plot() return time, wl, wd
[docs] def count_holes(self, mask:"WolfArrayModel" = None): """Count holes in the array :param mask: boolean mask array; if ``None``, use :attr:`array.mask` """ from ..eikonal import count_holes if mask is None: mask = self.array.mask elif isinstance(mask, WolfArrayModel): mask = mask.array.data return count_holes(mask)
[docs] def select_holes(self, mask:"WolfArrayModel" = None, ignore_last:int = 1): """Select holes in the array :param mask: boolean mask array; if ``None``, use :attr:`array.mask` :param ignore_last: if ``True``, ignore the largest (background) hole """ if mask is None: mask = self.array.mask elif isinstance(mask, WolfArrayModel): mask = mask.array.data labels, numfeatures = label(mask) # count cells in holes nb_cells = np.bincount(labels.ravel()) # sort by number of cells idx = np.argsort(nb_cells) for i in range(ignore_last): labels[labels == idx[-1-i]] = 0 # select holes but ignoring last ones ij = np.argwhere(labels) # Convert i,j to x,y xy = self.ij2xy_np(ij) self.SelectionData.myselection = list(xy) self.SelectionData.update_nb_nodes_selection()
[docs] def create_mask_holes(self, ignore_last:int = 1) -> "WolfArrayModel": """Select holes in the array and create a new aray :param ignore_last: if ``True``, ignore the largest (background) hole """ labels, numfeatures = label(self.array.mask) # count cells in holes nb_cells = np.bincount(labels.ravel()) # sort by number of cells idx = np.argsort(nb_cells) for i in range(ignore_last): labels[labels == idx[-1-i]] = 0 newarray = type(self)(mold=self) # newarray.allocate_ressources() newarray.array.data[:,:] = labels newarray.array.mask[:,:] = ~labels.astype(bool) newarray.set_nullvalue_in_mask() return newarray
[docs] def fill_holes_with_value(self, value:float, mask:"WolfArrayModel" = None, ignore_last:int = 1): """Fill holes in the array with a value :param value: value to fill holes with :param mask: boolean mask defining the holes; if ``None``, auto-detect :param ignore_last: if ``True``, ignore the largest (background) hole """ if mask is None: mask = self.array.mask labels, numfeatures = label(mask) # count cells in holes nb_cells = np.bincount(labels.ravel()) # sort by number of cells idx = np.argsort(nb_cells) for i in range(ignore_last): labels[labels == idx[-1-i]] = 0 self.array.data[labels > 0] = value self.array.mask[labels > 0] = False # unmask filled data
[docs] def fill_holes_with_value_if_intersects(self, value:float, vects:list[vector] | Zones, method:Literal['matplotlib','shapely_strict', 'shapely'] = 'shapely_strict', buffer:float = 0.): """Fill holes in the array with a value if it intersects with the mask :param value: value to fill holes with :param vects: list of polygon vectors for intersection test :param method: spatial method for intersection check :param buffer: buffer distance for intersection test """ if isinstance(vects, Zones): vects = vects.concatenate_all_vectors() for vec in vects: self._fill_holes_with_value_if_intersects(value, vec, method, buffer)
# list(map(self._fill_holes_with_value_if_intersects, [value]*len(vects), vects, [method]*len(vects), [buffer]*len(vects))) # parallel version
[docs] def _fill_holes_with_value_if_intersects(self, value:float, vect:vector, method:Literal['matplotlib','shapely_strict', 'shapely'] = 'shapely_strict', buffer:float = 0.) -> "WolfArrayModel": """Internal helper: fill a single labelled hole if it intersects the given polygon. :param value: value to fill holes with :param vect: polygon vector for intersection test :param method: spatial method for intersection check :param buffer: buffer distance for intersection test """ if buffer > 0.: # As we use a buffer, we check the intersction with the buffer... if self.intersects_polygon(vect, method=method, buffer=buffer): # ...but we fill the original polygon, not the buffered one ij = self.get_ij_inside_polygon(vect, method=method, usemask=False) self.array.data[ij[:,0], ij[:,1]] = value self.array.mask[ij[:,0], ij[:,1]] = False else: ij = self.get_ij_inside_polygon(vect, method=method, buffer=buffer, usemask=False) if ij.size[0] > 0: self.array.data[ij[:,0], ij[:,1]] = value self.array.mask[ij[:,0], ij[:,1]] = False
[docs] def _create_building_holes_dem_dtm(self, dem:"WolfArrayModel", dtm:"WolfArrayModel", ignore_last:int = 1) -> "WolfArrayModel": """Create a mask identifying building footprints from the difference between DEM and DTM. :param dem: Digital Elevation Model array :param dtm: Digital Terrain Model array :param ignore_last: if ``True``, ignore the largest (background) hole """ buildings = dem - dtm buildings.array[buildings.array < 0.] = 0. buildings.array[buildings.array > 0.] = dtm.array[buildings.array > 0.] buildings.array[buildings.array == 0.] = dem.array.max() + 1. return buildings
# ======================================================================== # Triangulation & Mesh # ========================================================================
[docs] def get_triangulation(self, bounds:list[float]=None): """ Traingulation of the array :param bounds: [[xmin,xmax],[ymin,ymax]] """ all = bounds is None if bounds is None: ox = self.origx + self.translx oy = self.origy + self.transly nbx = self.nbx nby = self.nby i1 = 0 i2 = self.nbx j1 = 0 j2 = self.nby bounds = [[ox,ox+float(nbx)*self.dx],[oy,oy+float(nby)*self.dy]] else: ox = max(self.origx, bounds[0][0]) oy = max(self.origy, bounds[1][0]) i1, j1 = self.get_ij_from_xy(ox, oy) i2, j2 = self.get_ij_from_xy(bounds[0][1], bounds[1][1]) i1 = max(i1, 0) j1 = max(j1, 0) i2 = min(i2 + 1, self.nbx) j2 = min(j2 + 1, self.nby) nbx = i2 - i1 nby = j2 - j1 refx = ox refy = oy x = np.arange(self.dx / 2. + refx, float(nbx) * self.dx + self.dx / 2 + refx, self.dx) y = np.arange(self.dy / 2. + refy, float(nby) * self.dy + self.dy / 2 + refy, self.dy) znull = np.min(self.array[i1:i2, j1:j2])-1. if all: locarr = np.copy(self.array.data) locarr[self.array.mask] = znull points = np.meshgrid(x,y) points = np.concatenate((points[0].flatten(), points[1].flatten(),locarr.flatten())).reshape([3,len(x)*len(y)]).transpose() else: points = np.asarray( [[xx, yy, self.get_value(xx + ox - refx, yy + oy - refy, nullvalue=znull)] for xx in x for yy in y], dtype=np.float32) decal = 0 triangles = [] triangles.append([[i + decal, i + decal + 1, i + decal + nby] for i in range(nby - 1)]) triangles.append([[i + decal + nby, i + decal + 1, i + decal + nby + 1] for i in range(nby - 1)]) for k in tqdm(range(1, nbx - 1)): decal = k * nby triangles.append([[i + decal, i + decal + 1, i + decal + nby] for i in range(nby - 1)]) triangles.append([[i + decal + nby, i + decal + 1, i + decal + nby + 1] for i in range(nby - 1)]) triangles = np.asarray(triangles, dtype=np.uint32).reshape([(2 * nby - 2) * (nbx - 1), 3]) mytri = Triangulation(pts = points, tri = triangles) return mytri, znull
# ======================================================================== # Visualization (Matplotlib) # ========================================================================
[docs] def compare_cloud(self, mycloud:cloud_vertices, delta:list[float] = [.15, .5, 1.]): """ Graphique de comparaison des valeurs d'un nuage de points et des valeurs de la matrice sous les mêmes positions :param mycloud: cloud_vertices :param delta: list of tolerance for the comparison """ # Get the values of the cloud xyz_cloud = mycloud.get_xyz() # Get values of the array at the same positions zarray = np.array([self.get_value(curxy[0],curxy[1]) for curxy in xyz_cloud]) # count the number of points outside the array nbout = np.count_nonzero(zarray==-99999) # Get the values of the cloud that are not outside the array # Separate XY and Z values (cloud and array) # - z values z_cloud = xyz_cloud[zarray!=-99999][:,2] # - xy values xy_cloud = xyz_cloud[zarray!=-99999][:,:2] # - array values zarray = zarray[zarray!=-99999] # concatenate all z values zall = np.concatenate([z_cloud,zarray]) # find the min and max values zmin = np.min(zall) zmax = np.max(zall) # compute differences diffz = zarray-z_cloud # choose a colormap cmap = plt.cm.get_cmap('RdYlBu') mindiff = np.min(diffz) maxdiff = np.max(diffz) # Plot the differences [0] and the position [1] fig,ax = plt.subplots(2,1) ax[0].set_title(_('Comparison Z - ') + str(nbout) + _(' outside points on ') + str(len(xyz_cloud))) sc0 = ax[0].scatter(z_cloud,zarray,s=10,c=diffz,cmap = cmap, vmin=mindiff, vmax=maxdiff) ax[0].set_xlabel(_('Scatter values')) ax[0].set_ylabel(_('Array values')) ax[0].set_xlim([zmin,zmax]) ax[0].set_ylim([zmin,zmax]) ax[0].plot([zmin,zmax],[zmin,zmax], color='black') if delta is not None: if isinstance(delta, list): for idx, curdelta in enumerate(delta): curdelta = abs(float(curdelta)) ax[0].plot([zmin,zmax],[zmin+delta,zmax+delta], 'k--', alpha=1.-1./(idx+1)) ax[0].plot([zmin,zmax],[zmin-delta,zmax-delta], 'k--', alpha=1.-1./(idx+1)) ax[0].axis('equal') sc1 = ax[1].scatter(xy_cloud[:,0],xy_cloud[:,1],s=10,c=diffz,cmap = cmap, vmin=mindiff, vmax=maxdiff) fig.colorbar(sc1) ax[1].axis('equal') plt.show()
[docs] def compare_tri(self,mytri:Triangulation): """Plot a comparison between array values and triangulation vertex elevations. :param mytri: :class:`Triangulation` object to compare against the array """ xyz_cloud = mytri.pts zarray = np.array([self.get_value(curxy[0],curxy[1]) for curxy in xyz_cloud]) nbout = np.count_nonzero(zarray==-99999) z_cloud = xyz_cloud[zarray!=-99999][:,2] xy_cloud = xyz_cloud[zarray!=-99999][:,:2] zarray = zarray[zarray!=-99999] zall = np.concatenate([z_cloud,zarray]) zmin = np.min(zall) zmax = np.max(zall) diffz = zarray-z_cloud cmap = plt.cm.get_cmap('RdYlBu') mindiff = np.min(diffz) maxdiff = np.max(diffz) fig,ax = plt.subplots(2,1) ax[0].set_title(_('Comparison Z - ') + str(nbout) + _(' outside points on ') + str(len(xyz_cloud))) sc0 = ax[0].scatter(z_cloud,zarray,s=10,c=diffz,cmap = cmap, vmin=mindiff, vmax=maxdiff) ax[0].set_xlabel(_('Scatter values')) ax[0].set_ylabel(_('Array values')) ax[0].set_xlim([zmin,zmax]) ax[0].set_ylim([zmin,zmax]) ax[0].plot([zmin,zmax],[zmin,zmax]) ax[0].axis('equal') sc1 = ax[1].scatter(xy_cloud[:,0],xy_cloud[:,1],s=10,c=diffz,cmap = cmap, vmin=mindiff, vmax=maxdiff) fig.colorbar(sc1) ax[1].axis('equal') plt.show()
[docs] def plot_matplotlib(self, figax:tuple=None, getdata_im:bool=False, update_palette:bool=True, vmin:float=None, vmax:float=None, figsize:tuple=None, Walonmap:bool=False, cat:Literal['IMAGERIE/ORTHO_1971', 'IMAGERIE/ORTHO_1994_2000', 'IMAGERIE/ORTHO_2006_2007', 'IMAGERIE/ORTHO_2009_2010', 'IMAGERIE/ORTHO_2012_2013', 'IMAGERIE/ORTHO_2015', 'IMAGERIE/ORTHO_2016', 'IMAGERIE/ORTHO_2017', 'IMAGERIE/ORTHO_2018', 'IMAGERIE/ORTHO_2019', 'IMAGERIE/ORTHO_2020', 'IMAGERIE/ORTHO_2021', 'IMAGERIE/ORTHO_2022_PRINTEMPS', 'IMAGERIE/ORTHO_2022_ETE', 'IMAGERIE/ORTHO_2023_ETE', 'IMAGERIE/ORTHO_LAST', 'orthoimage_coverage', 'orthoimage_coverage_2016', 'orthoimage_coverage_2017', 'orthoimage_coverage_2018', 'orthoimage_coverage_2019', 'orthoimage_coverage_2020', 'orthoimage_coverage_2021', 'orthoimage_coverage_2022', 'crossborder', 'crossborder_grey', 'overlay', 'topo', 'topo_grey'] = 'IMAGERIE/ORTHO_LAST', first_mask_data:bool=True, with_legend:bool=False, IGN:bool=False, Cartoweb:bool=False): """ Plot the array - Matplotlib version Using imshow and RGB array Plot the array using Matplotlib. This method visualizes the array data using Matplotlib's `imshow` function. It supports optional overlays, custom palettes, and value range adjustments. Notes: ------ - The method applies a mask to the data using the `nullvalue` attribute before plotting. - If `Walonmap` is True, the method fetches and overlays a map image using the WalOnMap service. - The aspect ratio of the plot is set to 'equal'. :param figax: A tuple containing a Matplotlib figure and axis (fig, ax). If None, a new figure and axis are created. :type figax: tuple, optional (Default value = None) :param getdata_im: If True, returns the image object along with the figure and axis. Default is False, then it only returns (fig, ax). :type getdata_im: bool, optional (Default value = False) :param update_palette: If True, updates the color palette before plotting. Default is True. :type update_palette: bool, optional (Default value = True) :param vmin: Minimum value for color scaling. If None, the minimum value is determined automatically. Default is None. :type vmin: float, optional (Default value = None) :param vmax: Maximum value for color scaling. If None, the maximum value is determined automatically. Default is None. :type vmax: float, optional (Default value = None) :param figsize: Size of the figure in inches (width, height). Only used if `figax` is None. Default is None. :type figsize: tuple, optional (Default value = None) :param Walonmap: If True, overlays a map image using the WalOnMap service. Default is False. :type Walonmap: bool, optional (Default value = False) :param cat: The category of the map to fetch from the WalOnMap service. Default is 'IMAGERIE/ORTHO_2022_ETE'. Available orthos: - `'IMAGERIE/ORTHO_1971'` - `'IMAGERIE/ORTHO_1994_2000'` - `'IMAGERIE/ORTHO_2006_2007'` - `'IMAGERIE/ORTHO_2009_2010'` - `'IMAGERIE/ORTHO_2012_2013'` - `'IMAGERIE/ORTHO_2015'` - `'IMAGERIE/ORTHO_2016'` - `'IMAGERIE/ORTHO_2017'` - `'IMAGERIE/ORTHO_2018'` - `'IMAGERIE/ORTHO_2019'` - `'IMAGERIE/ORTHO_2020'` - `'IMAGERIE/ORTHO_2021'` - `'IMAGERIE/ORTHO_2022_PRINTEMPS'` - `'IMAGERIE/ORTHO_2022_ETE'` - `'IMAGERIE/ORTHO_2023_ETE'` - `'IMAGERIE/ORTHO_LAST'` - 'orthoimage_coverage' - 'orthoimage_coverage_2016', - 'orthoimage_coverage_2017', - 'orthoimage_coverage_2018', - 'orthoimage_coverage_2019', - 'orthoimage_coverage_2020', - 'orthoimage_coverage_2021', - 'orthoimage_coverage_2022' - 'crossborder', - 'crossborder_grey', - 'overlay', - 'topo', - 'topo_grey' :type cat: str, optional (Default value = `'IMAGERIE/ORTHO_2022_ETE'`) :param first_mask_data: If True, applies the mask to the data before plotting. Default is True. :type first_mask_data: bool, optional (Default value = True) :param with_legend: If True, adds a color legend to the plot. Default is False. :type with_legend: bool, optional (Default value = False) :param IGN: if ``True``, overlay the IGN basemap :param Cartoweb: if ``True``, overlay the Cartoweb basemap :return: If `getdata_im` is False, returns (fig, ax), where `fig` is the Matplotlib figure and `ax` is the axis. If `getdata_im` is True, returns (fig, ax, im), where `im` is the image object created by `imshow`. :rtype: tuple """ available_Walonmap = ['IMAGERIE/ORTHO_1971', 'IMAGERIE/ORTHO_1994_2000', 'IMAGERIE/ORTHO_2006_2007', 'IMAGERIE/ORTHO_2009_2010', 'IMAGERIE/ORTHO_2012_2013', 'IMAGERIE/ORTHO_2015', 'IMAGERIE/ORTHO_2016', 'IMAGERIE/ORTHO_2017', 'IMAGERIE/ORTHO_2018', 'IMAGERIE/ORTHO_2019', 'IMAGERIE/ORTHO_2020', 'IMAGERIE/ORTHO_2021', 'IMAGERIE/ORTHO_2022_PRINTEMPS', 'IMAGERIE/ORTHO_2022_ETE', 'IMAGERIE/ORTHO_2023_ETE', 'IMAGERIE/ORTHO_LAST'] available_IGN = ['orthoimage_coverage', 'orthoimage_coverage_2016', 'orthoimage_coverage_2017', 'orthoimage_coverage_2018', 'orthoimage_coverage_2019', 'orthoimage_coverage_2020', 'orthoimage_coverage_2021', 'orthoimage_coverage_2022'] available_Cartoweb = ['crossborder', 'crossborder_grey', 'overlay', 'topo', 'topo_grey'] if first_mask_data: self.mask_data(self.nullvalue) if update_palette: self.updatepalette(0) if figax is None: fig, ax = plt.subplots(figsize=figsize) else: fig, ax = figax if Walonmap: from ..PyWMS import getWalonmap from PIL import Image if cat not in available_Walonmap: logging.error(_('The category {} is not available in WalOnMap').format(cat)) logging.error(_('Available categories are: {}').format(', '.join(available_Walonmap))) else: try: bounds = self.get_bounds() scale_covery = (bounds[0][1] - bounds[0][0]) / (bounds[1][1] - bounds[1][0]) if scale_covery > 1.: # x is larger than y w = 2000 h = int(2000 / scale_covery) else: # y is larger than x h = 2000 w = int(2000 * scale_covery) IO_image = getWalonmap(cat, bounds[0][0], bounds[1][0], bounds[0][1], bounds[1][1], w=w, h=h, tofile=False) # w=self.nbx, h=self.nby image = Image.open(IO_image) ax.imshow(image.transpose(Image.Transpose.FLIP_TOP_BOTTOM), extent=(bounds[0][0], bounds[0][1], bounds[1][0], bounds[1][1]), alpha=1, origin='lower') except Exception as e: logging.error(_('Error while fetching the map image from WalOnMap')) logging.error(e) elif IGN: from ..PyWMS import getNGI from PIL import Image if cat not in available_IGN: logging.error(_('The category {} is not available in IGN').format(cat)) logging.error(_('Available categories are: {}').format(', '.join(available_IGN))) else: try: bounds = self.get_bounds() scale_covery = (bounds[0][1] - bounds[0][0]) / (bounds[1][1] - bounds[1][0]) if scale_covery > 1.: # x is larger than y w = 2000 h = int(2000 / scale_covery) else: # y is larger than x h = 2000 w = int(2000 * scale_covery) IO_image = getNGI(cat, bounds[0][0], bounds[1][0], bounds[0][1], bounds[1][1], w=w, h=h, tofile=False) # w=self.nbx, h=self.nby image = Image.open(IO_image) ax.imshow(image.transpose(Image.Transpose.FLIP_TOP_BOTTOM), extent=(bounds[0][0], bounds[0][1], bounds[1][0], bounds[1][1]), alpha=1, origin='lower') except Exception as e: logging.error(_('Error while fetching the map image from IGN')) logging.error(e) elif Cartoweb: from ..PyWMS import getCartoweb from PIL import Image if cat not in available_Cartoweb: logging.error(_('The category {} is not available in Cartoweb').format(cat)) logging.error(_('Available categories are: {}').format(', '.join(available_Cartoweb))) else: try: bounds = self.get_bounds() scale_covery = (bounds[0][1] - bounds[0][0]) / (bounds[1][1] - bounds[1][0]) if scale_covery > 1.: # x is larger than y w = 2000 h = int(2000 / scale_covery) else: # y is larger than x h = 2000 w = int(2000 * scale_covery) IO_image = getCartoweb(cat, bounds[0][0], bounds[1][0], bounds[0][1], bounds[1][1], w=w, h=h, tofile=False) # w=self.nbx, h=self.nby image = Image.open(IO_image) ax.imshow(image.transpose(Image.Transpose.FLIP_TOP_BOTTOM), extent=(bounds[0][0], bounds[0][1], bounds[1][0], bounds[1][1]), alpha=1, origin='lower') except Exception as e: logging.error(_('Error while fetching the map image from Cartoweb')) logging.error(e) if vmin is None and vmax is not None: vmin = self.mypal.values[0] elif vmax is None and vmin is not None: vmax = self.mypal.values[-1] if (vmin is None) and (vmax is None): # im = ax.imshow(self.array.transpose(), origin='lower', cmap=self.mypal, # extent=(self.origx, self.origx + self.dx * self.nbx, self.origy, self.origy + self.dy * self.nby)) im = ax.imshow(self.array.transpose(), origin='lower', norm = self.mypal.norm, cmap=self.mypal.cmap, interpolation='none', extent=(self.origx, self.origx + self.dx * self.nbx, self.origy, self.origy + self.dy * self.nby), alpha=np.select([self.array.mask.T, ~self.array.mask.T], [np.zeros(self.shape).T, np.ones(self.shape).T * self.alpha])) if with_legend: # add a legend in a new axis ax_leg = fig.add_axes([0.92, 0.12, 0.04, 0.8]) from matplotlib.colorbar import ColorbarBase from matplotlib import colors cbar = ColorbarBase(ax_leg, cmap=self.mypal.cmap, norm=self.mypal.norm, orientation='vertical') cbar.set_ticks(self.mypal.values) # ticks labels with 3 decimal places labels = [f"{val:.3f}" for val in self.mypal.values] cbar.set_ticklabels(labels) cbar.ax.tick_params(labelsize=8) cbar.ax.yaxis.set_label_position('left') cbar.ax.yaxis.set_ticks_position('right') cbar.ax.yaxis.set_tick_params(width=0.5, size=2, direction='in', color='black') else: im = ax.imshow(self.array.transpose(), origin='lower', cmap=self.mypal, extent=(self.origx, self.origx + self.dx * self.nbx, self.origy, self.origy + self.dy * self.nby), vmin=vmin, vmax=vmax, alpha=np.select([self.array.mask.T, ~self.array.mask.T], [np.zeros(self.shape).T, np.ones(self.shape).T * self.alpha])) if with_legend: # add a legend in a new axis ax_leg = fig.add_axes([0.92, 0.12, 0.04, 0.8]) from matplotlib.colorbar import ColorbarBase from matplotlib import colors from matplotlib.colors import Normalize cbar = ColorbarBase(ax_leg, cmap=self.mypal.cmap, norm= Normalize(vmin, vmax), orientation='vertical') vals = list(np.linspace(vmin, vmax, self.mypal.nb, endpoint=True)) # limit to 2 decimal places vals = [round(val, 2) for val in vals] cbar.set_ticks(vals) cbar.set_ticklabels(vals) cbar.ax.tick_params(labelsize=8) cbar.ax.yaxis.set_label_position('left') cbar.ax.yaxis.set_ticks_position('right') cbar.ax.yaxis.set_tick_params(width=0.5, size=2, direction='in', color='black') ax.set_aspect('equal') if getdata_im: return fig, ax, im else: return fig, ax
[docs] def imshow(self, figax:tuple[Figure, Axis] = None, cmap:Colormap = None, step_ticks=100.) -> tuple[Figure, Axis]: """ Create Matplotlib image from WolfArrayModel :param figax: tuple ``(fig, ax)`` of matplotlib Figure and Axes; created if ``None`` :param cmap: matplotlib colormap name (default: ``None`` uses internal palette) :param step_ticks: step (in world units) between tick labels on the axes """ warnings.warn(_('The imshow method is deprecated and will be removed in a future version. Please use plot_matplotlib instead with the appropriate parameters to achieve the same result.'), DeprecationWarning) # Use figax if passed as argument if figax is None: fig,ax = plt.subplots(1,1) else: fig,ax = figax bounds = self.get_bounds() if cmap is None: # update local colors if not already done if VERSION_RGB==1: if self.rgb is None: self.updatepalette(1) # Pointing RGB colors = self.rgb colors[self.array.mask,3] = 0. # Plot colors = colors.swapaxes(0,1) ax.imshow(colors, origin='lower') else: # Full scale image values vals = self.array.data alpha = np.zeros(vals.shape) alpha[~ self.array.mask] = 1. # Plot vals = vals.swapaxes(0,1) alpha = alpha.swapaxes(0,1) ax.imshow(vals, origin='lower', cmap=cmap, alpha=alpha) ax.set_aspect('equal') x_start = (bounds[0][0] // step_ticks) * step_ticks x_end = (bounds[0][1] // step_ticks) * step_ticks y_start = (bounds[1][0] // step_ticks) * step_ticks y_end = (bounds[1][1] // step_ticks) * step_ticks x_pos = np.arange(x_start, x_end+.0001, step_ticks) y_pos = np.arange(y_start, y_end+.0001, step_ticks) i = [self.get_ij_from_xy(x, 0.)[0] for x in x_pos] j = [self.get_ij_from_xy(0., y)[1] for y in y_pos] ax.set_xticks(i) ax.set_xticklabels(x_pos, rotation = 30) ax.set_yticks(j) ax.set_yticklabels(y_pos) return fig,ax
# ======================================================================== # Rebinning & Resolution Change # ========================================================================
[docs] def get_rebin_shape_size(self, factor:float, reshape_array_if_necessary:bool = False) -> tuple[tuple[int, int], tuple[float, float]]: """ Return the new shape after rebinning. newdx = dx * factor newdy = dy * factor The shape is adjusted to be a multiple of the factor. :param factor: factor of resolution change -- > 1.0 : decrease resolution, < 1.0 : increase resolution :type factor: float :param reshape_array_if_necessary: if ``True``, pad array so dimensions are divisible by the downsampling factor :return: new shape :rtype: Tuple[int, int], Tuple[float, float] """ newdx = self.dx * float(factor) newdy = self.dy * float(factor) newnbx = self.nbx newnby = self.nby if factor >=1.0: # Decrease resolution to_reshape = False if np.mod(self.nbx,factor) != 0 or np.mod(self.nby,factor) != 0 : newnbx = self.nbx newnby = self.nby if np.mod(self.nbx,factor) !=0: newnbx = int(self.nbx + factor - np.mod(self.nbx,factor)) to_reshape = True if np.mod(self.nby,factor) !=0: newnby = int(self.nby + factor - np.mod(self.nby,factor)) to_reshape = True if to_reshape and reshape_array_if_necessary: logging.warning(f"The shape ({self.nbx}, {self.nby}) is not a multiple of the factor {factor}") new_array = ma.masked_array(np.ones((newnbx, newnby), dtype= self.dtype) * self.nullvalue, dtype= self.dtype) new_array[:self.nbx, :self.nby] = self.array self.array = new_array else: # Increase resolution if abs(int(1/factor) - 1/factor) > 1e-6: # find the greatest common divisor of between self.dx and old_dx gcd = pgcd_decimal(self.dx, self.dx * factor) intermediate_factor = gcd / self.dx [newnbx,newnby], [dx,dy] = self.get_rebin_shape_size(intermediate_factor) factor = factor / intermediate_factor if np.mod(newnbx,factor) != 0 or np.mod(newnby,factor) != 0 : if np.mod(newnbx,factor) !=0: newnbx = int(newnbx + factor - np.mod(newnbx,factor)) if np.mod(newnby,factor) !=0: newnby = int(newnby + factor - np.mod(newnby,factor)) newdx = dx * factor newdy = dy * factor newnbx = int(newnbx / factor) newnby = int(newnby / factor) return (newnbx, newnby), (newdx, newdy)
[docs] def get_rebin_header(self, factor:float) -> header_wolf: """ Return a new header after rebinning. :param factor: factor of resolution change -- > 1.0 : decrease resolution, < 1.0 : increase resolution :type factor: float :return: new header :rtype: header_wolf """ newshape, newdx_dy = self.get_rebin_shape_size(factor) newheader = self.get_header() newheader.nbx = newshape[0] newheader.nby = newshape[1] newheader.dx = newdx_dy[0] newheader.dy = newdx_dy[1] return newheader
[docs] def rebin(self, factor:float, operation:Literal['mean', 'sum', 'min', 'max', 'median'] ='mean', operation_matrix:"WolfArrayModel"=None) -> None: """ Change resolution - **in place**. If you want to keep current data, copy the WolfArrayModel into a new variable -> newWA = Wolfarray(mold=curWA). :param factor: factor of resolution change -- > 1.0 : decrease resolution, < 1.0 : increase resolution :type factor: float :param operation: operation to apply on the blocks ('mean', 'sum', 'min', 'max', 'median') :type operation: str, Rebin_Ops :param operation_matrix: operation matrix to apply on the blocks -- see the Enum "Rebin_Ops" for more infos. The matrix must have the same shape as the new array :type operation_matrix: WolfArrayModel """ op_str = operation (testnbx, testnby), __ = self.get_rebin_shape_size(factor) if operation_matrix is not None: tmp_header = self.get_rebin_header(factor) if not operation_matrix.is_like(tmp_header): logging.error(_("The operation matrix must have the same shape as the new array")) logging.info(_("You can use the get_rebin_header method to get the new header if you don't know it")) return logging.info(_("Operation matrix detected")) logging.info(_("The operation matrix will be used to apply the operation on the blocks")) else: operation = Rebin_Ops.get_ops(operation) if operation is None: logging.error(_("Operator not supported -- Must be a string in ['sum', 'mean', 'min', 'max', 'median'] or a Rebin_Ops Enum")) return if not callable(operation): logging.error(_("Operator not supported -- Must be a string in ['sum', 'mean', 'min', 'max', 'median'] or a Rebin_Ops Enum")) new_shape, (dx, dy) = self.get_rebin_shape_size(factor, reshape_array_if_necessary= True) if factor >=1.0: # Decrease resolution if operation_matrix is not None: # Reshape the input array to split it into blocks of size f x f reshaped_a = self.array.reshape(new_shape[0], int(factor), new_shape[1], int(factor)) # Swap axes to make blocks as separate dimensions reshaped_a = reshaped_a.swapaxes(1, 2) # Initialize the output matrix self.array = ma.masked_array(np.ones((new_shape[0], new_shape[1]), dtype= self.dtype) * self.nullvalue, dtype= self.dtype) # Check the dtype of the newly initialized array assert self.array.dtype == self.dtype, _('Bad dtype') # Vectorized operations for op_idx, operation in enumerate(Rebin_Ops.get_numpy_ops()): mask = (operation_matrix.array == op_idx) if np.any(mask): block_results = operation(reshaped_a, axis=(2, 3)) self.array[mask] = block_results[mask] else: compression_pairs = [(d, c // d) for d, c in zip(new_shape, self.array.shape)] flattened = [l for p in compression_pairs for l in p] self.array = operation(self.array.reshape(flattened), axis=(1, 3)).astype(self.dtype) self.set_nullvalue_in_mask() else: # Increase resolution if abs(int(1/factor) - 1/factor) > 1e-6: logging.warning(f"The factor {factor} doesn't lead to an integer dimension for the Kronecker product") logging.warning("The array will be rebinned firstly using the most common factor and then using the operation") # find the greatest common divisor of between self.dx and old_dx gcd = pgcd_decimal(self.dx, self.dx * factor) intermediate_factor = gcd / self.dx logging.warning(f"The greatest common divisor is {gcd}") tmp = type(self)(mold=self, whichtype=self.wolftype) tmp.rebin(intermediate_factor) if op_str is None: op_str = 'min' tmp.rebin(factor / intermediate_factor, operation= op_str) self.array = tmp.array tmp.array = None del tmp else: ones = np.ones( (int(1/factor), int(1/factor)), dtype=self.array.dtype) self.array = np.kron(self.array, ones).astype(self.array.dtype) self.mask_data(self.nullvalue) self.nbx = new_shape[0] self.nby = new_shape[1] self.dx = dx self.dy = dy assert isinstance(self.array, ma.MaskedArray), _('The array must be a masked array') assert self.array.shape == (testnbx, testnby), _(f'Bad shape: {self.array.shape} != {(testnbx, testnby)}') self.count() # rebin must not change the type of the array assert self.array.dtype == self.dtype, _(f'Bad dtype: {self.array.dtype} != {self.dtype}')
# ======================================================================== # Reprojection (CRS) # ======================================================================== @classmethod
[docs] def from_other_epsg_coo(cls, input_raster_path:str, input_srs='EPSG:3812', output_srs='EPSG:31370', resampling_method=gdal.GRA_Bilinear, xRes:float=0.5, yRes:float=0.5): """ Reprojects and resamples a raster file from an other EPSG coordinates and return it as a WolfArrayModel. :param input_raster_path: The path to the input raster file. :type input_raster_path: str :param input_srs: The input spatial reference system (SRS) in the format 'EPSG:XXXX'. Defaults to Lambert 2008 'EPSG:3812'. :type input_srs: str :param output_srs: The output spatial reference system (SRS) in the format 'EPSG:XXXX'. Defaults to Belgian Lambert 72 'EPSG:31370'. :type output_srs: str :param resampling_method: The resampling method to use. Defaults to gdal.GRA_Bilinear. Resampling method can be chosen among the gdal GRA_* constants (gdal.GRA_Average; gdal.GRA_Bilinear; gdal.GRA_Cubic; gdal.GRA_CubicSpline; gdal.GRA_Lanczos; gdal.GRA_Mode; gdal.GRA_NearestNeighbour) :type resampling_method: int :param xRes: The desired output resolution in the x direction. Defaults to 0.5. :type xRes (float): float :param yRes: The desired output resolution in the y direction. Defaults to 0.5. :type yRes (float): float :raises AssertionError: If the input or output raster file is not a GeoTIFF file. :raises RuntimeError: If the input raster file cannot be opened. :raises PermissionError: If there is a permission error while trying to delete the output raster file. :raises Exception: If an unexpected error occurs while trying to delete the output raster file. :raises RuntimeError: If the reprojection fails for the input raster file. :return: WolfArrayModel """ #sanitize input input_raster_path = str(input_raster_path) input_srs = str(input_srs) output_srs = str(output_srs) assert resampling_method in [gdal.GRA_Average, gdal.GRA_Bilinear, gdal.GRA_Cubic, gdal.GRA_CubicSpline, gdal.GRA_Lanczos, gdal.GRA_Mode, gdal.GRA_NearestNeighbour], "Invalid resampling method" # Define temporary files with tempfile.TemporaryDirectory() as temp_dir: output_raster_path = os.path.join(temp_dir, "Array_72.tif") reproject_and_resample_raster(input_raster_path, output_raster_path, input_srs, output_srs, resampling_method, xRes, yRes) Array3 = type(self)(output_raster_path, nullvalue=-9999) return Array3
[docs] def reproject(self, output_epsg:int, resampling_method=gdal.GRA_Bilinear, ) -> "WolfArrayModel": """ Reprojects and resamples the WolfArrayModel to a new EPSG coordinate system. :param output_epsg: The output EPSG code for the desired coordinate system. :type output_epsg: int :param resampling_method: The resampling method to use. Defaults to gdal.GRA_Bilinear. Resampling method can be chosen among the gdal GRA_* constants (gdal.GRA_Average; gdal.GRA_Bilinear; gdal.GRA_Cubic; gdal.GRA_CubicSpline; gdal.GRA_Lanczos; gdal.GRA_Mode; gdal.GRA_NearestNeighbour) :type resampling_method: int :return: WolfArrayModel """ assert isinstance(output_epsg, int), "output_epsg must be an integer" assert resampling_method in [gdal.GRA_Average, gdal.GRA_Bilinear, gdal.GRA_Cubic, gdal.GRA_CubicSpline, gdal.GRA_Lanczos, gdal.GRA_Mode, gdal.GRA_NearestNeighbour], "Invalid resampling method" # Define temporary files with tempfile.TemporaryDirectory() as temp_dir: self.write_all(os.path.join(temp_dir, "Array_input.tif")) output_raster_path = os.path.join(temp_dir, "Array_reprojected.tif") reproject_and_resample_raster(os.path.join(temp_dir, "Array_input.tif"), output_raster_path, f'EPSG:{self.epsg}', f'EPSG:{output_epsg}', resampling_method=resampling_method) Array3 = type(self)(output_raster_path, nullvalue=-9999) return Array3
# ======================================================================== # Merging & External Data Creation # ======================================================================== @classmethod
[docs] def merge(cls, others:list, abs:bool=True, copy:bool = True) -> "WolfArrayModel": """ Merge several WolfArrayModel into a single WolfArrayModel :param others: list of WolfArrayModel to merge :param abs: if True -> Global World Coordinates :param copy: if True -> copy data (**Not necessary** if data header CAN BE modified. It can be save memory) """ from ._mb import WolfArrayMB newMBArray = WolfArrayMB() for curarray in others: newMBArray.add_block(curarray, force_idx=True, copyarray=copy) newMBArray.set_header_from_added_blocks() return newMBArray.as_WolfArray(abs=abs)
@classmethod
[docs] def set_general_frame_from_xyz(cls, fname:str, dx:float, dy:float, border_size:int=5, delimiter:str=',', fillin:bool=False): """ Lecture d'un fichier texte xyz et initialisation des données de base :param fname: nom du fichier xyz :param dx: pas en x :param dy: pas en y :param border_size: nombre de mailles de bordure en plus de l'extension spatiale du fichier :param delimiter: column delimiter in the XYZ file (e.g. ``"\\t"``, ``" "``) :param fillin: if ``True``, fill the array with values from the XYZ file """ my_file = XYZFile(fname, delimiter=delimiter) my_file.read_from_file() (xlim, ylim) = my_file.get_extent() Array = cls() Array.dx = dx Array.dy = dy Array.origx = math.floor(xlim[0]) - dx/2. - float(border_size) * Array.dx Array.origy = math.floor(ylim[0]) - dy/2. - float(border_size) * Array.dy Array.nbx = int((math.floor(xlim[1]) - math.ceil(xlim[0])) / Array.dx) + 1 + 2*border_size Array.nby = int((math.floor(ylim[1]) - math.ceil(ylim[0])) / Array.dy) + 1 + 2*border_size Array.array = np.ma.zeros((Array.nbx, Array.nby), dtype=np.float32) if fillin: Array.fillin_from_xyz(my_file.xyz) return Array
@classmethod
[docs] def set_general_frame_from_xyz_dir(cls, path:str, bounds:list, delimiter:str=',', dxdy:tuple[float, float]= None, border_size:int=5, fillin:bool=True): """ Lecture d'un dossier contenant des fichiers texte xyz, initialisation des données de base et chargement de la matrice Renvoie un WolfArrayModel or False si le fichier n'est pas dans les limites :param path: chemin du dossier avec les fichier xyz :dtype path: str :param bounds: limites de l'extension spatiale :dtype bounds: list :param delimiter: délimiteur des fichiers xyz :dtype delimiter: str :param border_size: nombre de mailles de bordure en plus de l'extension spatiale du fichier :param dxdy: tuple ``(dx, dy)`` cell size; if ``None``, infer from data :param fillin: if ``True``, fill the array with values from the XYZ files :dtype border_size: int """ Data_xyz = XYZFile('nothing.xyz', folder=path, bounds=bounds, delimiter=delimiter) if Data_xyz.nblines == 0: logging.info(f"File not in the boundaries for {path.split(os.sep)[-1]}") return False xlim, ylim = Data_xyz.get_extent() if dxdy is None: # We assume dx and dy are equals within xyz file dx = np.diff(Data_xyz.xyz[0:2,0])[0] logging.info(f"dx = {dx} ; dy = {dx}") dy = dx else: dx,dy = dxdy assert dx is not None and dy is not None, _('dx and dy must be defined') Array = cls() Array.dx = dx Array.dy = dy Array.origx = math.floor(xlim[0]) -dx/2. - float(border_size) * Array.dx Array.origy = math.floor(ylim[0]) -dy/2. - float(border_size) * Array.dy Array.nbx = int((math.floor(xlim[1]) - math.ceil(xlim[0])) / Array.dx) +1 + 2*border_size Array.nby = int((math.floor(ylim[1]) - math.ceil(ylim[0])) / Array.dy) +1 + 2*border_size Array.array = np.ma.zeros((Array.nbx, Array.nby)) if fillin: Array.fillin_from_xyz(Data_xyz.xyz) return Array