"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
import numpy as np
import json
import math
from shapely.geometry import Point, LineString, MultiPoint
from os.path import exists
import re
import logging
from enum import Enum
from scipy.spatial import KDTree
from pathlib import Path
from typing import Literal, Union
from shapely.geometry import Polygon
from ..PyTranslate import _
from ..textpillow import Text_Infos, Font_Priority
from ..color_constants import getIfromRGB, getRGBfromI
[docs]
_NUMPY_MISSING = object()
# ----------------------------------------------------------------
# Storage backend enum
# ----------------------------------------------------------------
[docs]
class StorageMode(str, Enum):
"""Storage backend mode for cloud vertices."""
# ----------------------------------------------------------------
# Vertex model
# ----------------------------------------------------------------
[docs]
class wolfvertex:
"""WOLF vertex — 3D point with associated values.
Represents a point in space (x, y, z) with an optional dictionary
of named values (e.g. elevation, discharge, concentration…).
:ivar x: X coordinate (Easting)
:ivar y: Y coordinate (Northing)
:ivar z: Z coordinate (elevation), -99999. by default (= undefined)
:ivar in_use: whether the vertex is active
:ivar values: dictionary ``{key: value}`` of associated quantities, ``None`` if empty
"""
__slots__ = ('x', 'y', 'z', 'in_use', 'values')
def __init__(self, x:float, y:float, z:float=-99999.) -> None:
"""
:param x: X coordinate (Easting)
:param y: Y coordinate (Northing)
:param z: Z coordinate (elevation) — -99999. means undefined
"""
self.x = float(x)
self.y = float(y)
self.z = float(z)
self.values = None
def __add__(self, v:"wolfvertex") -> "wolfvertex":
"""Component-wise addition of two vertices.
:param v: vertex to add
:return: new ``wolfvertex`` (x₁+x₂, y₁+y₂, z₁+z₂)
"""
assert isinstance(v, wolfvertex), "Error in wolfvertex addition -- v must be a wolfvertex"
return wolfvertex(self.x + v.x, self.y + v.y, self.z + v.z)
def __sub__(self, v:"wolfvertex") -> "wolfvertex":
"""Component-wise subtraction of two vertices.
:param v: vertex to subtract
:return: new ``wolfvertex`` (x₁-x₂, y₁-y₂, z₁-z₂)
"""
assert isinstance(v, wolfvertex), "Error in wolfvertex subtraction -- v must be a wolfvertex"
return wolfvertex(self.x - v.x, self.y - v.y, self.z - v.z)
[docs]
def rotate(self, angle:float, center:tuple):
""" Rotate the vertex
:param angle: angle in radians (positive for counterclockwise)
:param center: center of the rotation (x, y)
"""
x = self.x - center[0]
y = self.y - center[1]
x1 = x * math.cos(angle) - y * math.sin(angle)
y1 = x * math.sin(angle) + y * math.cos(angle)
self.x = x1 + center[0]
self.y = y1 + center[1]
[docs]
def as_shapelypoint(self) -> Point:
"""Convert the vertex to a ``shapely.geometry.Point``.
:return: Shapely Point object (x, y, z)
"""
return Point(self.getcoords())
[docs]
def copy(self) -> "wolfvertex":
"""Independent copy of the vertex (``values`` are not copied).
:return: new ``wolfvertex`` with the same coordinates
"""
return wolfvertex(self.x, self.y, self.z)
[docs]
def getcoords(self) -> np.ndarray:
"""Return coordinates as a NumPy array ``[x, y, z]``.
:return: ``np.ndarray`` of shape (3,)
"""
return np.array([self.x, self.y, self.z])
[docs]
def dist3D(self, v:"wolfvertex") -> float:
"""
Return the 3D distance to another vertex
:param v: vertex to compare
"""
v = self.getcoords() - v.getcoords()
return np.sqrt(np.inner(v, v))
[docs]
def dist2D(self, v:"wolfvertex") -> float:
"""
Return the 2D distance to another vertex
:param v: vertex to compare
"""
v = self.getcoords()[0:2] - v.getcoords()[0:2]
return np.sqrt(np.inner(v, v))
[docs]
def addvalue(self, id, value):
"""Add an associated value to the vertex.
Creates the ``values`` dictionary if it does not exist yet.
:param id: key identifying the value (e.g. ``'discharge'``, ``'concentration'``)
:param value: value to associate (numeric, string…)
"""
if self.values is None:
self.values = {}
self.values[id] = value
[docs]
def add_value(self, id, value):
"""Alias for :meth:`addvalue` — add an associated value to the vertex.
:param id: key identifying the value
:param value: value to associate
"""
self.addvalue(id, value)
[docs]
def add_values(self, values:dict):
"""Add multiple associated values to the vertex at once.
:param values: dictionary ``{key: value}`` to merge into the vertex
"""
if self.values is None:
self.values = {}
for key, value in values.items():
self.values[key] = value
[docs]
def getvalue(self, id):
"""Return the value associated with the key *id*.
:param id: key of the requested value
:return: the value if it exists, ``None`` otherwise
"""
if not self.values is None:
if id in self.values.keys():
return self.values[id]
else:
return None
[docs]
def get_value(self, id):
"""Alias for :meth:`getvalue` — return an associated value from the vertex.
:param id: key of the requested value
:return: the value if it exists, ``None`` otherwise
"""
return self.getvalue(id)
[docs]
def get_values(self, ids:list) -> dict:
"""Return a subset of values associated with the vertex.
:param ids: list of keys to extract
:return: dictionary ``{key: value}`` containing only the found keys
"""
ret = {}
for id in ids:
if id in self.values.keys():
ret[id] = self.values[id]
return ret
[docs]
def limit2bounds(self, bounds=None):
"""Clamp the vertex coordinates to the given bounding box.
Modifies ``self.x`` and ``self.y`` *in place*.
:param bounds: bounding box ``[[xmin, xmax], [ymin, ymax]]``.
If ``None``, no action is taken.
"""
if bounds is None:
return
self.x = max(self.x, bounds[0][0])
self.x = min(self.x, bounds[0][1])
self.y = max(self.y, bounds[1][0])
self.y = min(self.y, bounds[1][1])
[docs]
def is_like(self, v:"wolfvertex", tol:float=1.e-6) -> bool:
"""Test near-equality with another vertex.
Comparison is done component-wise (x, y, z) using absolute
differences.
:param v: reference vertex to compare against
:param tol: absolute tolerance on each component (default 1e-6)
:return: ``True`` if all three differences are below *tol*
"""
if abs(self.x - v.x) < tol and abs(self.y - v.y) < tol and abs(self.z - v.z) < tol:
return True
else:
return False
# ----------------------------------------------------------------
# Cloud properties model
# ----------------------------------------------------------------
[docs]
class cloudproperties:
"""Visual and legend properties for a cloud of vertices.
Stores the display configuration (color, size, style, transparency…)
as well as the legend parameters associated with the cloud.
:ivar used: whether these properties are active
:ivar color: drawing color (RGB integer encoded by ``getIfromRGB``)
:ivar width: point size in pixels
:ivar style: rendering style index (see ``Cloud_Styles`` in the GUI)
:ivar alpha: opacity (0 = opaque, 255 = fully transparent)
:ivar filled: symbol fill (True = filled)
:ivar legendvisible: legend display (True = visible)
:ivar transparent: OpenGL transparency toggle
:ivar animationspeed: animation speed multiplier (cycles per second)
:ivar animationmode: animation mode (0=none, 1=blink, 2=fade, 3=grow, 4=seasons, 5=pulse)
:ivar animationamplitude: animation amplitude factor
:ivar legendtext: text displayed in the legend
:ivar legendrelpos: relative legend position (1–9, numpad layout)
:ivar legendx: absolute X coordinate of the legend (when ``legendrelpos == 0``)
:ivar legendy: absolute Y coordinate of the legend (when ``legendrelpos == 0``)
:ivar legendbold: bold text
:ivar legenditalic: italic text
:ivar legendunderlined: underlined text
:ivar legendfontname: font name (e.g. ``'Arial'``)
:ivar legendfontsize: font size in points
:ivar legendcolor: legend text color (RGB integer)
:ivar legendpriority: rendering priority used by ``Font_Priority``
:ivar legendorientation: text orientation angle in degrees
:ivar legendwidth: legend texture width in pixels
:ivar legendheight: legend texture height in pixels
:ivar renderingmode: OpenGL backend for cloud points (0=list, 1=shader)
:ivar symbolpreset: symbol selected from the bundled ``wolfhece/symbols`` library
:ivar symbolsource: shared symbol image path used when style=SYMBOL
:ivar symboltintwithcolor: if ``True``, multiplies symbol by draw color
:ivar symbolrastersize: target SVG rasterization size in pixels
:ivar symbolrotation: per-cloud symbol rotation in degrees (CCW, Option A)
:ivar symbolscale: per-cloud symbol scale factor (Option A)
:ivar highlightselectedpoint: if ``True``, highlights currently selected cloud point in interactive tools
:ivar highlightselectedpointsizefactor: multiplicative size factor for the selected point highlight marker
:ivar highlightselectedpointcolor: color used for selected point highlight marker (RGB integer)
:ivar extrude: if ``True``, the cloud is extruded in 3D (rendering only)
"""
[docs]
animationamplitude: float
[docs]
symboltintwithcolor:bool
[docs]
highlightselectedpoint:bool
[docs]
highlightselectedpointsizefactor:float
[docs]
highlightselectedpointcolor:int
def __init__(self, lines=[], parent:"cloud_vertices"=None) -> None:
"""Initialize the cloud properties.
:param lines: text lines read from a WOLF file.
If non-empty, properties are parsed from these lines.
If empty, properties are set to default values.
:param parent: owning ``cloud_vertices`` instance (back-reference).
"""
if len(lines) > 0:
pass
else:
self.color = 0
self.width = 10
self.style = 0
self.filled = False
self.legendvisible = False
self.transparent = False
self.alpha = 0
self.animationspeed = 1.0
self.animationmode = 0
self.animationamplitude = 1.0
self.legendtext = ''
self.legendrelpos = 5
self.legendx = 0.
self.legendy = 0.
self.legendbold = False
self.legenditalic = False
self.legendfontname = 'Arial'
self.legendfontsize = 10
self.legendcolor = 0
self.legendunderlined = False
self.legendpriority = Font_Priority.FONTSIZE.value
self.legendorientation = 0
self.legendwidth = 100
self.legendheight = 100
self.renderingmode = 1
self.symbolpreset = ''
self.symbolsource = ''
self.symboltintwithcolor = False
self.symbolrastersize = 128
self.symbolrotation = 0.0
self.symbolscale = 1.0
self.highlightselectedpoint = True
self.highlightselectedpointsizefactor = 0.5
self.highlightselectedpointcolor = getIfromRGB((255, 220, 40))
self.used = True
pass
# ----------------------------------------------------------------
# JSON serialization
# ----------------------------------------------------------------
[docs]
def to_dict(self) -> dict:
"""Serialize properties to a plain dictionary.
Colors are stored as ``[R, G, B]`` lists for readability.
"""
return {
'color': list(getRGBfromI(self.color)),
'width': self.width,
'style': self.style,
'filled': self.filled,
'alpha': self.alpha,
'transparent': self.transparent,
'renderingmode': self.renderingmode,
'animation': {
'mode': self.animationmode,
'speed': self.animationspeed,
'amplitude': self.animationamplitude,
},
'symbol': {
'preset': self.symbolpreset,
'source': self.symbolsource,
'tint': self.symboltintwithcolor,
'rastersize': self.symbolrastersize,
'rotation': self.symbolrotation,
'scale': self.symbolscale,
},
'highlight': {
'enabled': self.highlightselectedpoint,
'sizefactor': self.highlightselectedpointsizefactor,
'color': list(getRGBfromI(self.highlightselectedpointcolor)),
},
'legend': {
'visible': self.legendvisible,
'text': self.legendtext,
'position': self.legendrelpos,
'x': self.legendx,
'y': self.legendy,
'bold': self.legendbold,
'italic': self.legenditalic,
'underlined': self.legendunderlined,
'fontname': self.legendfontname,
'fontsize': self.legendfontsize,
'color': list(getRGBfromI(self.legendcolor)),
'priority': self.legendpriority,
'orientation': self.legendorientation,
'width': self.legendwidth,
'height': self.legendheight,
},
}
@classmethod
[docs]
def from_dict(cls, d: dict, parent: "cloud_vertices" = None) -> "cloudproperties":
"""Create a :class:`cloudproperties` from a dictionary.
:param d: dictionary as produced by :meth:`to_dict`.
:param parent: owning cloud instance.
:return: new :class:`cloudproperties` instance.
"""
p = cls(parent=parent)
if 'color' in d:
c = d['color']
p.color = getIfromRGB(tuple(c)) if isinstance(c, list) else int(c)
if 'width' in d:
p.width = int(d['width'])
if 'style' in d:
p.style = int(d['style'])
if 'filled' in d:
p.filled = bool(d['filled'])
if 'alpha' in d:
p.alpha = int(d['alpha'])
if 'transparent' in d:
p.transparent = bool(d['transparent'])
if 'renderingmode' in d:
p.renderingmode = int(d['renderingmode'])
anim = d.get('animation', {})
if 'mode' in anim:
p.animationmode = int(anim['mode'])
if 'speed' in anim:
p.animationspeed = float(anim['speed'])
if 'amplitude' in anim:
p.animationamplitude = float(anim['amplitude'])
sym = d.get('symbol', {})
if 'preset' in sym:
p.symbolpreset = str(sym['preset'])
if 'source' in sym:
p.symbolsource = str(sym['source'])
if 'tint' in sym:
p.symboltintwithcolor = bool(sym['tint'])
if 'rastersize' in sym:
p.symbolrastersize = int(sym['rastersize'])
if 'rotation' in sym:
p.symbolrotation = float(sym['rotation'])
if 'scale' in sym:
p.symbolscale = float(sym['scale'])
hl = d.get('highlight', {})
if 'enabled' in hl:
p.highlightselectedpoint = bool(hl['enabled'])
if 'sizefactor' in hl:
p.highlightselectedpointsizefactor = float(hl['sizefactor'])
if 'color' in hl:
c = hl['color']
p.highlightselectedpointcolor = getIfromRGB(tuple(c)) if isinstance(c, list) else int(c)
leg = d.get('legend', {})
if 'visible' in leg:
p.legendvisible = bool(leg['visible'])
if 'text' in leg:
p.legendtext = str(leg['text'])
if 'position' in leg:
p.legendrelpos = int(leg['position'])
if 'x' in leg:
p.legendx = float(leg['x'])
if 'y' in leg:
p.legendy = float(leg['y'])
if 'bold' in leg:
p.legendbold = bool(leg['bold'])
if 'italic' in leg:
p.legenditalic = bool(leg['italic'])
if 'underlined' in leg:
p.legendunderlined = bool(leg['underlined'])
if 'fontname' in leg:
p.legendfontname = str(leg['fontname'])
if 'fontsize' in leg:
p.legendfontsize = int(leg['fontsize'])
if 'color' in leg:
c = leg['color']
p.legendcolor = getIfromRGB(tuple(c)) if isinstance(c, list) else int(c)
if 'priority' in leg:
p.legendpriority = int(leg['priority'])
if 'orientation' in leg:
p.legendorientation = int(leg['orientation'])
if 'width' in leg:
p.legendwidth = int(leg['width'])
if 'height' in leg:
p.legendheight = int(leg['height'])
return p
# ----------------------------------------------------------------
# Point cloud model
# ----------------------------------------------------------------
[docs]
class cloud_vertices:
"""3D point cloud with associated values.
Supported formats: DXF (``.dxf``), Shapefile (``.shp``), ASCII (all others).
For ASCII files, the separator is auto-detected among tab, semicolon,
comma and space.
DXF format is recognised by the file extension; otherwise an ASCII file
is assumed.
If a header exists on the first line, it must be indicated with
``header=True``.
The total number of columns (*nb*) determines the interpretation:
- *nb* > 3: a header is required.
- if ``header[2].lower() == 'z'``, the 3rd column is the Z elevation;
otherwise all columns beyond the 1st are values associated with (X, Y).
- number of values = *nb* − (2 or 3) depending on whether Z is present.
Data are stored in ``myvertices`` (indexed dictionary):
.. code-block:: python
{0: {'vertex': wolfvertex, 'head1': val1, 'head2': val2, ...},
1: {'vertex': wolfvertex, ...}, ...}
See :meth:`readfile`, :meth:`import_from_dxf`, :meth:`import_from_shapefile`.
:ivar filename: source file path (empty string if created in memory)
:ivar myvertices: dictionary ``{id: {'vertex': wolfvertex, key: value, ...}}``
:ivar xbounds: tuple ``(xmin, xmax)`` of the X extent
:ivar ybounds: tuple ``(ymin, ymax)`` of the Y extent
:ivar zbounds: tuple ``(zmin, zmax)`` of the Z extent
:ivar myprop: visual properties (:class:`cloudproperties` instance)
:ivar mytree: Scipy KDTree, ``None`` until :meth:`create_kdtree` is called
:ivar loaded: ``True`` if data was loaded successfully
:ivar idx: text identifier of the cloud
"""
myvertices: dict[int, dict['vertex':wolfvertex, str:float]]
[docs]
_myvertices: dict[int, dict['vertex':wolfvertex, str:float]]
[docs]
myprop: cloudproperties
[docs]
_mytree_dim: int | None
[docs]
AUTO_NUMPY_SWITCH_THRESHOLD = 100_000
# ----------------------------------------------------------------
# Construction / lifecycle hooks
# ----------------------------------------------------------------
def __init__(self,
fname: Union[str, Path] = '',
fromxls: str = '',
header: bool = False,
toload=True,
idx: str = '',
bbox:Polygon = None,
dxf_imported_elts = ['MTEXT', 'INSERT'],
**kwargs) -> None:
"""Create a point cloud, optionally loaded from a file.
If *fname* is provided and *toload* is ``True``, the file is read
automatically (DXF, Shapefile or ASCII depending on extension).
:param fname: source file path. Empty string = empty cloud.
:param fromxls: raw string from an XLS file to be parsed
(not used internally, intended for an external parser).
:param header: ``True`` if the first line of the ASCII file contains
column names.
:param toload: ``True`` to load the file at initialisation.
:param idx: text identifier for the cloud (also used by the GUI).
:param bbox: Shapely polygon delimiting the area of interest.
Used by :meth:`import_from_shapefile` to spatially filter
features during reading.
:param dxf_imported_elts: list of DXF entity types to import
(e.g. ``['MTEXT', 'INSERT', 'POLYLINE']``).
:param kwargs: extra keyword arguments (silently absorbed);
allows GUI-only parameters such as ``mapviewer``,
``need_for_wx``, ``plotted`` to be passed without error.
"""
[docs]
self.parent_collection: "cloud_of_clouds | None" = None
# Spatial bounds (also used by find_minmax)
self._myvertices = {}
[docs]
self._numpy_keys = None
[docs]
self._numpy_values = {}
self.filename = str(fname)
self.xbounds = (0., 0.)
self.ybounds = (0., 0.)
self.zbounds = (0., 0.)
self.myprop = self._make_cloudproperties(parent=self)
self.mytree = None
self._mytree_dim = None
if self.filename != '':
if toload:
if Path(fname).suffix.lower() == '.dxf':
self.import_from_dxf(self.filename, imported_elts=dxf_imported_elts)
elif Path(fname).suffix.lower() == '.shp':
self.import_from_shapefile(self.filename, bbox=bbox)
elif Path(fname).suffix.lower() == '.gpkg':
self.import_from_geopackage(self.filename, bbox=bbox)
else:
self.readfile(self.filename, header)
[docs]
def on_changed_vertices(self):
"""Hook called after vertices are added/removed/updated.
Base model implementation is a no-op. GUI subclasses can override
this method to invalidate OpenGL caches and trigger a redraw.
"""
pass
# ----------------------------------------------------------------
# Factory methods (overridden in GUI subclass)
# ----------------------------------------------------------------
[docs]
def _make_cloud_vertices(self, **kwargs) -> "cloud_vertices":
"""Create a sibling cloud_vertices. GUI subclass returns the GUI variant."""
return cloud_vertices(**kwargs)
[docs]
def _make_cloudproperties(self, **kwargs) -> cloudproperties:
"""Create a cloudproperties. GUI subclass returns the GUI variant."""
return cloudproperties(**kwargs)
[docs]
def _make_cloudproperties_from_dict(self, d: dict, **kwargs) -> cloudproperties:
"""Create a cloudproperties from a dictionary. GUI subclass returns the GUI variant."""
return cloudproperties.from_dict(d, **kwargs)
# ----------------------------------------------------------------
# Storage backend access / conversion
# ----------------------------------------------------------------
@property
[docs]
def myname(self) -> str:
"""Cloud name accessor (alias for ``idx``)."""
return self.idx
@myname.setter
def myname(self, value: str):
"""Cloud name setter (alias for ``idx``)."""
self.idx = value
def __str__(self):
return str(self.idx)
@property
[docs]
def myvertices(self) -> dict:
"""Legacy row storage accessor.
Reading ``myvertices`` guarantees a dict-based view. If the cloud is
currently in NumPy backend mode, rows are materialized first.
"""
if self.storage_mode is StorageMode.NUMPY:
self._materialize_numpy_storage()
return self._myvertices
@myvertices.setter
def myvertices(self, value: dict):
"""Set legacy row storage explicitly."""
self._myvertices = {} if value is None else value
def __getitem__(self, key) -> dict:
"""Direct access to a cloud element by its identifier.
:param key: integer key in ``myvertices``
:return: dictionary ``{'vertex': wolfvertex, ...}``
"""
if self.storage_mode is StorageMode.NUMPY:
self._materialize_numpy_storage()
return self.myvertices[key]
[docs]
def _materialize_numpy_storage(self):
"""Convert optional NumPy storage back to legacy dict rows."""
if self._numpy_xyz is None:
return
keys = self._numpy_keys if self._numpy_keys is not None else list(range(len(self._numpy_xyz)))
self._myvertices = {}
for idx, (row_key, cur) in enumerate(zip(keys, self._numpy_xyz)):
entry = {'vertex': wolfvertex(cur[0], cur[1], cur[2])}
for key, arr in self._numpy_values.items():
val = arr[idx]
if val is _NUMPY_MISSING:
continue
entry[key] = val
self._myvertices[row_key] = entry
self._numpy_xyz = None
self._numpy_keys = None
self._numpy_values = {}
[docs]
def _reset_storage_for_reload(self):
"""Clear both storage backends before a full data reload."""
self._myvertices = {}
self._numpy_xyz = None
self._numpy_keys = None
self._numpy_values = {}
self.mytree = None
@property
[docs]
def storage_mode(self) -> StorageMode:
"""Current storage backend for cloud rows."""
return StorageMode.NUMPY if self._numpy_xyz is not None else StorageMode.DICT
[docs]
def switch_storage_mode(self, mode:Union[Literal['dict', 'numpy'], StorageMode] = StorageMode.DICT):
"""Switch storage backend between legacy dict rows and NumPy arrays.
:param mode: target backend. ``'dict'`` materializes rows in
``myvertices``; ``'numpy'`` compacts current rows into
array storage while preserving row keys.
"""
try:
mode = StorageMode(mode)
except ValueError:
raise ValueError("mode must be 'dict' or 'numpy'")
if mode is StorageMode.DICT:
self._materialize_numpy_storage()
return
if self.storage_mode is StorageMode.NUMPY:
return
n = len(self.myvertices)
keys = list(self.myvertices.keys())
if n == 0:
self._numpy_xyz = np.empty((0, 3), dtype=np.float64)
self._numpy_keys = []
self._numpy_values = {}
self.myvertices = {}
return
vals = [self.myvertices[k] for k in keys]
xs = np.fromiter((cur['vertex'].x for cur in vals), dtype=np.float64, count=n)
ys = np.fromiter((cur['vertex'].y for cur in vals), dtype=np.float64, count=n)
zs = np.fromiter((cur['vertex'].z for cur in vals), dtype=np.float64, count=n)
all_attr_keys = []
seen = set()
for cur in vals:
for attr in cur.keys():
if attr == 'vertex':
continue
if attr not in seen:
seen.add(attr)
all_attr_keys.append(attr)
numpy_values = {}
for attr in all_attr_keys:
arr = np.empty(n, dtype=object)
arr[:] = _NUMPY_MISSING
for idx, cur in enumerate(vals):
if attr in cur:
arr[idx] = cur[attr]
numpy_values[attr] = arr
self._numpy_xyz = np.column_stack((xs, ys, zs))
self._numpy_keys = list(keys)
self._numpy_values = numpy_values
self.myvertices = {}
self.mytree = None
if len(self._numpy_xyz) > 0:
self.xbounds = (float(np.min(self._numpy_xyz[:, 0])), float(np.max(self._numpy_xyz[:, 0])))
self.ybounds = (float(np.min(self._numpy_xyz[:, 1])), float(np.max(self._numpy_xyz[:, 1])))
self.zbounds = (float(np.min(self._numpy_xyz[:, 2])), float(np.max(self._numpy_xyz[:, 2])))
# ----------------------------------------------------------------
# Spatial index / nearest-neighbour helpers
# ----------------------------------------------------------------
[docs]
def create_kdtree(self):
"""Build a Scipy KDTree from the current vertex coordinates.
The KDTree is stored in ``self.mytree`` and used by
:meth:`find_nearest` for nearest-neighbor queries.
"""
dim = 3 if self.z_dimension_mode == '3d' else 2
coords = self.get_xyz()[:, :dim]
self.mytree = KDTree(coords)
self._mytree_dim = dim
@staticmethod
[docs]
def _is_undefined_z(z: np.ndarray | float, atol: float = 1e-9):
"""Return mask/flag for coordinates considered undefined in Z."""
return np.isclose(z, -99999.0, atol=atol)
@property
[docs]
def z_dimension_mode(self) -> Literal['2d', '3d', 'mixed']:
"""Describe cloud Z content mode.
- ``'2d'``: all Z are undefined (default sentinel ``-99999``)
- ``'3d'``: all Z are defined
- ``'mixed'``: both defined and undefined Z values coexist
"""
if self.storage_mode is StorageMode.NUMPY:
if self._numpy_xyz is None or len(self._numpy_xyz) == 0:
return '2d'
zvals = np.asarray(self._numpy_xyz[:, 2], dtype=np.float64)
else:
if len(self.myvertices) == 0:
return '2d'
zvals = np.fromiter((cur['vertex'].z for cur in self.myvertices.values()), dtype=np.float64, count=len(self.myvertices))
undef = self._is_undefined_z(zvals)
if bool(np.all(undef)):
return '2d'
if bool(np.any(undef)):
return 'mixed'
return '3d'
[docs]
def _normalize_query_xyz(self, xyz: np.ndarray | list) -> np.ndarray:
"""Normalize query coordinates to a 2D float64 array."""
if isinstance(xyz, list):
if len(xyz) > 0 and isinstance(xyz[0], float | int):
logging.warning(_('xyz is a list of floats -- converting to a list of lists'))
xyz = [xyz]
xyz = np.asarray(xyz, dtype=np.float64)
if xyz.ndim == 1:
xyz = xyz.reshape(1, -1)
return xyz
[docs]
def _select_kdtree_dim(self, query_cols: int) -> int | None:
"""Choose KDTree dimensionality (2 or 3) based on cloud/query context."""
if query_cols not in (2, 3):
return None
# Robust policy:
# - mixed Z clouds => always XY distance (2D)
# - pure 2D clouds => XY distance (2D)
# - pure 3D clouds => use 3D only when query provides Z
mode = self.z_dimension_mode
if mode in ('2d', 'mixed'):
return 2
return 3 if query_cols >= 3 else 2
[docs]
def _get_query_and_tree(self, xyz: np.ndarray | list):
"""Return normalized query array, KDTree and row keys for nearest search."""
q = self._normalize_query_xyz(xyz)
if q.ndim != 2:
logging.error(_('Error in find_nearest -- xyz must be a list or 2D array'))
return None, None, None
dim = self._select_kdtree_dim(q.shape[1])
if dim is None:
logging.error(_('Error in find_nearest -- xyz must have 2 or 3 columns'))
return None, None, None
if self.storage_mode is StorageMode.NUMPY:
keys = self._numpy_keys if self._numpy_keys is not None else list(range(len(self._numpy_xyz)))
coords = self._numpy_xyz[:, :dim] if self._numpy_xyz is not None else np.empty((0, dim), dtype=np.float64)
else:
keys = list(self.myvertices.keys())
coords = self.get_xyz()[:, :dim]
if len(coords) == 0:
logging.warning(_('No vertices in cloud -- nearest search aborted'))
return None, None, None
if self.mytree is None or self._mytree_dim != dim:
self.mytree = KDTree(coords)
self._mytree_dim = dim
return q[:, :dim], self.mytree, keys
[docs]
def find_nearest(self, xyz:np.ndarray | list, nb:int =1):
"""
Find nearest neighbors from Scipy KDTree structure based on a copy of the vertices.
See : https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.KDTree.query.html
:param xyz: coordinates to find nearest neighbors -- shape (n, m) - where m is the number of coordinates (2 or 3)
:param nb: number of nearest neighbors to find
:return: list of distances, list of "Wolfvertex", list of elements stored in self.myvertices - or list of lists if xyz is a list of coordinates
"""
try:
xyz, tree, keys = self._get_query_and_tree(xyz)
if xyz is None or tree is None or keys is None:
return None, None, None
if self.storage_mode is StorageMode.NUMPY:
dist, ii = tree.query(xyz, k=nb)
def _build_item(idx:int):
cur = self._numpy_xyz[idx]
item = {'vertex': wolfvertex(cur[0], cur[1], cur[2])}
for key, arr in self._numpy_values.items():
item[key] = arr[idx]
return item
if xyz.shape[0] == 1:
if nb == 1:
item = _build_item(int(ii[0]))
return dist[0], item['vertex'], item
else:
items = [_build_item(int(curi)) for curi in ii[0]]
return dist[0], [item['vertex'] for item in items], items
else:
if nb == 1:
items = [_build_item(int(curi)) for curi in ii]
return dist, [item['vertex'] for item in items], items
else:
items = [[_build_item(int(curi)) for curi in curii] for curii in ii]
return dist, [[item['vertex'] for item in curitems] for curitems in items], items
keys = list(self.myvertices.keys())
dist, ii = tree.query(xyz, k=nb)
if xyz.shape[0] == 1:
if nb == 1:
return dist[0], self.myvertices[keys[ii[0]]]['vertex'], self.myvertices[keys[ii[0]]]
else:
return dist[0], [self.myvertices[keys[curi]]['vertex'] for curi in ii[0]], [self.myvertices[keys[curi]] for curi in ii[0]]
else:
if nb == 1:
return dist, [self.myvertices[keys[curi]]['vertex'] for curi in ii], [self.myvertices[keys[curi]] for curi in ii]
else:
return dist, [[self.myvertices[keys[curi]]['vertex'] for curi in curii] for curii in ii], [[self.myvertices[keys[curi]] for curi in curii] for curii in ii]
except Exception as e:
logging.error(_('Error in find_nearest -- {}').format(e))
logging.error(_('Check your input data -- it must be a list or 2D array with 3 columns'))
return None, None, None
[docs]
def find_nearest_id(self, xyz:np.ndarray | list, max_distance:float | None = None):
"""Find nearest row id(s) for one or several query points.
:param xyz: coordinates used for nearest-neighbor search.
Accepted shapes: ``[x, y, z]`` or ``[[x, y, z], ...]``.
:param max_distance: optional upper bound on accepted nearest
distance. If provided and the nearest point is
farther than this value, ``None`` is returned for
that query.
:return: nearest id (single query), list of nearest ids (multiple
queries), or ``None`` on error.
"""
try:
xyz, tree, keys = self._get_query_and_tree(xyz)
if xyz is None or tree is None or keys is None:
return None
dist, ii = tree.query(xyz, k=1)
ii = np.asarray(ii)
dist = np.asarray(dist)
if xyz.shape[0] == 1:
if max_distance is not None and float(dist[0]) > float(max_distance):
return None
return keys[int(ii[0])]
nearest_ids = [keys[int(curi)] for curi in ii]
if max_distance is None:
return nearest_ids
return [
curid if float(curdist) <= float(max_distance) else None
for curid, curdist in zip(nearest_ids, dist)
]
except Exception as e:
logging.error(_('Error in find_nearest -- {}').format(e))
logging.error(_('Check your input data -- it must be a list or 2D array with 3 columns'))
return None
# ----------------------------------------------------------------
# Import / export routines
# ----------------------------------------------------------------
[docs]
def init_from_nparray(self, array:np.ndarray, numpy_backend: bool | None = None):
"""Populate the cloud from a NumPy array.
Existing vertices are overwritten (added with sequential keys
starting from 0).
:param array: array of shape ``(n, 3)`` with columns X, Y, Z.
:param numpy_backend: backend selection mode:
- ``True``: force NumPy backend;
- ``False``: force legacy dict rows;
- ``None`` (default): auto-switch to NumPy
when ``len(array) >= AUTO_NUMPY_SWITCH_THRESHOLD``
(default threshold: 100_000 points).
"""
self.mytree = None
if numpy_backend is None:
numpy_backend = len(array) >= self.AUTO_NUMPY_SWITCH_THRESHOLD
if numpy_backend:
arr = np.asarray(array, dtype=np.float64)
if arr.ndim != 2 or arr.shape[1] != 3:
raise ValueError('array must have shape (n, 3)')
self.myvertices = {}
self._numpy_xyz = arr.copy()
self._numpy_keys = list(range(len(arr)))
self._numpy_values = {}
if len(arr) > 0:
self.xbounds = (float(np.min(arr[:, 0])), float(np.max(arr[:, 0])))
self.ybounds = (float(np.min(arr[:, 1])), float(np.max(arr[:, 1])))
self.zbounds = (float(np.min(arr[:, 2])), float(np.max(arr[:, 2])))
else:
self.xbounds = (0., 0.)
self.ybounds = (0., 0.)
self.zbounds = (0., 0.)
self.loaded = True
self._header = False
return
self._numpy_xyz = None
self._numpy_keys = None
self._numpy_values = {}
k = 0
for curv in array:
mv = wolfvertex(curv[0], curv[1], curv[2])
self.myvertices[k] = {'vertex': mv}
k += 1
self.xbounds = (np.min(array[:, 0]), np.max(array[:, 0]))
self.ybounds = (np.min(array[:, 1]), np.max(array[:, 1]))
self.loaded = True
[docs]
def readfile(self, fname:str='', header: bool = False):
"""
Reading an ascii file with or without header
:param fname: (str) file name
:param header: (bool) header in file (first line with column names)
The separator is automatically detected among : tabulation, semicolon, space, comma.
The file must contain at least 2 columns (X, Y) and may contain a third one (Z) and more (values).
If values are present, they are stored in the dictionnary with their header name as key.
"""
if fname != '':
self._reset_storage_for_reload()
headers = None
nbcols = 0
zpresent = False
nbvals = 0
firstval = 0
xmin = 1.e300
xmax = -1.e300
ymin = 1.e300
ymax = -1.e300
zmin = 1.e300
zmax = -1.e300
f = open(fname, 'r')
content = f.read().splitlines()
f.close()
if header:
curhead = content[0]
content.pop(0)
sep = ' '
if '\t' in curhead:
# tab separator
sep = '\t'
elif ';' in curhead:
# semicolon separator
sep = ';'
elif ',' in curhead:
# comma separator
sep = ','
elif ' ' in curhead:
# space separator
sep = ' '
headers = curhead.split(sep)
nbcols = len(headers)
else:
curline = content[0]
sep = ' '
if '\t' in curline:
# tab separator
sep = '\t'
elif ';' in curline:
# semicolon separator
sep = ';'
elif ',' in curline:
# comma separator
sep = ','
curline = re.sub(' +', ' ', curline)
elif ' ' in curline:
# space separator
sep = ' '
curline = re.sub(' +', ' ', curline)
curval = curline.split(sep)
nbcols = len(curval)
if not header:
try:
x = float(curval[0])
y = float(curval[1])
z = 0.
if nbcols > 2:
z = float(curval[2])
except ValueError as e:
logging.error(_('Error converting first row to float as "header" is set to False.'))
logging.error(_('Check your input file : ')+fname)
logging.info(_('Using the first row as header.'))
header = True
headers = curhead.split(sep)
if nbcols < 2:
logging.warning(_('Not enough values on one line -- Retry !!'))
return
elif nbcols > 3:
if headers is None:
logging.warning(_('No headers -- Retry !!'))
return
else:
if headers[2].lower() == 'z':
zpresent = True
nbvals = nbcols - 3
firstval = 3
else:
# 3rd column interpreted as value, not Z
zpresent = False
nbvals = nbcols - 2
firstval = 2
elif nbcols == 3:
if headers is None:
zpresent = True
else:
if headers[2].lower() == 'z':
zpresent = True
else:
# 3rd column interpreted as value, not Z
zpresent = False
nbvals = 1
firstval = 2
k = 0
for curline in content:
curline = re.sub(' +', ' ', curline)
curval = curline.split(sep)
x = float(curval[0])
y = float(curval[1])
z = 0
if zpresent:
z = float(curval[2])
curvert = wolfvertex(x, y, z)
curdict = {'vertex': curvert}
self.myvertices[k] = curdict
xmin = min(x, xmin)
xmax = max(x, xmax)
ymin = min(y, ymin)
ymax = max(y, ymax)
zmin = min(z, zmin)
zmax = max(z, zmax)
if nbvals > 0:
for i in range(firstval, firstval + nbvals):
curdict[headers[i]] = curval[i]
k += 1
self.xbounds = (xmin, xmax)
self.ybounds = (ymin, ymax)
self.loaded = True
[docs]
def import_from_dxf(self, fn:str='', imported_elts=['MTEXT', 'INSERT']):
"""Import points from a DXF file using the ``ezdxf`` library.
Supported entity types: MTEXT, INSERT, POLYLINE, LWPOLYLINE, LINE.
Only entities on visible (active) layers are imported.
For MTEXT/INSERT, points with Z == 0 are skipped.
:param fn: DXF file path. If empty or non-existent, no action is taken.
:param imported_elts: list of DXF entity types to import
(e.g. ``['MTEXT', 'INSERT', 'POLYLINE', 'LINE']``).
:return: number of imported points, or ``None`` if the file is not found.
"""
if fn == '' or not exists(fn):
logging.error(_('File not found : ')+fn)
return
self._reset_storage_for_reload()
import ezdxf
# Read DXF file and identify modelspace
logging.info(_('Reading DXF file : ')+fn)
doc = ezdxf.readfile(fn)
msp = doc.modelspace()
logging.info(_('Number of entities : ')+str(len(msp)))
logging.info(_('Number of layers : ')+str(len(doc.layers)))
logging.info(_('Treating entities... '))
# Loop over DXF entities
k=0
for e in msp:
if doc.layers.get(e.dxf.layer).is_on():
if e.dxftype() in imported_elts:
if e.dxftype() == "MTEXT" or e.dxftype()=='INSERT':
x = e.dxf.insert[0]
y = e.dxf.insert[1]
z = e.dxf.insert[2]
if z!=0.:
curvert = wolfvertex(x, y, z)
curdict = {'vertex': curvert}
self.myvertices[k] = curdict
k += 1
elif e.dxftype() == "POLYLINE":
# extract coordinates
verts = [cur.dxf.location.xyz for cur in e.vertices]
for cur in verts:
curvert = wolfvertex(cur[0],cur[1],cur[2])
curdict = {'vertex': curvert}
self.myvertices[k] = curdict
k += 1
elif e.dxftype() == "LWPOLYLINE":
# extract coordinates
verts = np.array(e.lwpoints.values)
verts = verts.reshape([verts.size // 5,5])[:,:2] # in ezdxf 1.3.5, the lwpoints.values attribute is a np.ndarray [n,5]
verts = np.column_stack([verts,[e.dxf.elevation]*len(verts)])
for cur in verts:
curvert = wolfvertex(cur[0],cur[1],cur[2])
curdict = {'vertex': curvert}
self.myvertices[k] = curdict
k += 1
elif e.dxftype() == "LINE":
# extract coordinates
curvert = wolfvertex(e.dxf.start[0],e.dxf.start[1],e.dxf.start[2])
curdict = {'vertex': curvert}
self.myvertices[k] = curdict
k += 1
curvert = wolfvertex(e.dxf.end[0],e.dxf.end[1],e.dxf.end[2])
curdict = {'vertex': curvert}
self.myvertices[k] = curdict
k += 1
else:
logging.warning(_('DXF element not supported/ignored : ') + e.dxftype())
else:
logging.info(_('Layer {} is off'.format(e.dxf.layer)))
self.find_minmax(True)
self.loaded = True
logging.info(_('Number of imported points : ')+str(k))
logging.info(_('Importation finished'))
return k
[docs]
def _resolve_shapefile_column(self, gdf, targetcolumn:str) -> str:
"""Resolve the column to use when *targetcolumn* is not found.
Called by :meth:`import_from_shapefile` when neither *targetcolumn*
nor ``'geometry'`` are available. The base implementation logs an error
and returns ``None``. The GUI subclass overrides this to present an
interactive column chooser.
:param gdf: ``GeoDataFrame`` already loaded from the Shapefile.
:param targetcolumn: the originally requested column name.
:return: resolved column name, or ``None`` to abort the import.
"""
logging.error(_('Column not found : ')+targetcolumn)
return None
[docs]
def _resolve_value_columns(self, gdf, value_columns, excluded_columns:list[str] | None = None) -> list[str]:
"""Resolve the list of attributes to import from a GeoDataFrame.
:param gdf: source ``GeoDataFrame``.
:param value_columns: ``None`` (disabled), ``'all'`` or explicit iterable.
:param excluded_columns: columns that must not be imported.
:return: list of selected column names.
"""
if value_columns is None:
return []
excluded = set(excluded_columns or [])
if value_columns == 'all':
return [col for col in gdf.columns if col not in excluded]
if isinstance(value_columns, (list, tuple, set)):
return [col for col in value_columns if col in gdf.columns and col not in excluded]
logging.warning(_('Unsupported value_columns option: {}').format(value_columns))
return []
[docs]
def _import_from_geodataframe(self, gdf, source_label:str, targetcolumn:str = 'X1_Y1_Z1', value_columns=None):
"""Import points and optional attributes from an existing GeoDataFrame.
Three extraction strategies are tried in order:
1. *targetcolumn* is present: each cell is a ``'X,Y,Z'`` comma-separated
string (format used by SPW-ARNE-DCENN).
2. A ``geometry`` column is present: coordinates are read from the
Shapely ``Point`` geometry of each row.
3. Neither: :meth:`_resolve_shapefile_column` is called to let
subclasses (e.g. the GUI) choose an alternative column.
After import, :meth:`find_minmax` is called and ``self.loaded`` is set
to ``True``.
:param gdf: source ``GeoDataFrame`` (already read by the caller).
:param source_label: human-readable file path used in log/error messages.
:param targetcolumn: name of the column containing ``'X,Y,Z'``
coordinate strings. Default: ``'X1_Y1_Z1'``.
:param value_columns: optional attribute import selector.
``None`` (default) imports geometry only;
``'all'`` imports all non-geometry/non-coordinate columns;
a list/tuple/set imports the named columns only.
If the number of imported rows reaches
``AUTO_NUMPY_SWITCH_THRESHOLD``, storage is
automatically switched to the NumPy backend.
:return: number of imported points, or ``None`` on error.
"""
if gdf is None:
logging.error(_('Error during import of file : ')+source_label)
return
self._reset_storage_for_reload()
# _header means "rows contain value columns in addition to vertex".
# Reset it at each full import and set it explicitly per branch below.
self._header = False
if gdf.empty or len(gdf) == 0:
logging.error(_('Imported file is empty : ')+source_label)
return
n_rows = len(gdf)
use_numpy_direct = n_rows >= self.AUTO_NUMPY_SWITCH_THRESHOLD
def _build_numpy_storage(xyz:np.ndarray, value_cols:list[str]):
self._myvertices = {}
self._numpy_xyz = np.asarray(xyz, dtype=np.float64)
self._numpy_keys = list(range(len(self._numpy_xyz)))
numpy_values = {}
for col in value_cols:
numpy_values[col] = np.asarray(gdf[col].to_numpy(copy=True), dtype=object)
self._numpy_values = numpy_values
self._header = len(value_cols) > 0
if len(self._numpy_xyz) > 0:
self.xbounds = (float(np.min(self._numpy_xyz[:, 0])), float(np.max(self._numpy_xyz[:, 0])))
self.ybounds = (float(np.min(self._numpy_xyz[:, 1])), float(np.max(self._numpy_xyz[:, 1])))
self.zbounds = (float(np.min(self._numpy_xyz[:, 2])), float(np.max(self._numpy_xyz[:, 2])))
self.xmin, self.xmax = self.xbounds
self.ymin, self.ymax = self.ybounds
self.zmin, self.zmax = self.zbounds
else:
self.xbounds = (0., 0.)
self.ybounds = (0., 0.)
self.zbounds = (0., 0.)
self.mytree = None
self.loaded = True
def _xyz_from_csv_column(column_name:str) -> np.ndarray:
xyz = gdf[column_name].str.split(',', expand=True)
if xyz.shape[1] < 3:
raise ValueError('Expected three comma-separated values (X,Y,Z) per row')
return xyz.iloc[:, :3].astype(np.float64).to_numpy(copy=True)
if targetcolumn in gdf.columns:
value_cols = self._resolve_value_columns(gdf, value_columns, excluded_columns=['geometry', targetcolumn])
self._header = len(value_cols) > 0
if use_numpy_direct:
try:
xyz = _xyz_from_csv_column(targetcolumn)
except Exception as e:
logging.error(_('Error during import of file : ')+source_label)
logging.error(e)
return
_build_numpy_storage(xyz, value_cols)
k = len(xyz)
logging.info(_('Number of imported points : ')+str(k))
return k
k=0
for _row_index, row in gdf.iterrows():
x, y, z = row[targetcolumn].split(',')
x = float(x)
y = float(y)
z = float(z)
curvert = wolfvertex(x, y, z)
curdict = {'vertex': curvert}
for col in value_cols:
curdict[col] = row[col]
self.myvertices[k] = curdict
k += 1
elif 'geometry' in gdf.columns:
value_cols = self._resolve_value_columns(gdf, value_columns, excluded_columns=['geometry'])
self._header = len(value_cols) > 0
if use_numpy_direct:
try:
xs = np.asarray(gdf.geometry.x.to_numpy(copy=True), dtype=np.float64)
ys = np.asarray(gdf.geometry.y.to_numpy(copy=True), dtype=np.float64)
zs = np.full(len(gdf), -99999., dtype=np.float64)
xyz = np.column_stack((xs, ys, zs))
except Exception as e:
logging.error(_('Geometry not found (MultiPoint or Point) -- Please check your file : ')+source_label)
logging.error(e)
return
_build_numpy_storage(xyz, value_cols)
# Keep existing return convention for geometry mode.
k = len(xyz) - 1
logging.info(_('Number of imported points : ')+str(k))
return k
try:
for k, (_row_index, row) in enumerate(gdf.iterrows()):
curvert = wolfvertex(row.geometry.x, row.geometry.y)
curdict = {'vertex': curvert}
for col in value_cols:
curdict[col] = row[col]
self.myvertices[k] = curdict
except Exception as e:
logging.error(_('Geometry not found (MultiPoint or Point) -- Please check your file : ')+source_label)
logging.error(e)
return
else:
resolved = self._resolve_shapefile_column(gdf, targetcolumn)
if resolved is None:
return
value_cols = self._resolve_value_columns(gdf, value_columns, excluded_columns=['geometry', resolved])
self._header = len(value_cols) > 0
if use_numpy_direct:
try:
xyz = _xyz_from_csv_column(resolved)
except Exception as e:
logging.error(_('Error during import of file : ')+source_label)
logging.error(e)
return
_build_numpy_storage(xyz, value_cols)
k = len(xyz)
logging.info(_('Number of imported points : ')+str(k))
return k
try:
k=0
for _row_index, row in gdf.iterrows():
x, y, z = row[resolved].split(',')
x = float(x)
y = float(y)
z = float(z)
curvert = wolfvertex(x, y, z)
curdict = {'vertex': curvert}
for col in value_cols:
curdict[col] = row[col]
self.myvertices[k] = curdict
k += 1
except Exception as e:
logging.error(_('Error during import of file : ')+source_label)
logging.error(e)
return
self.find_minmax(True)
self.loaded = True
if self.nbvertices >= self.AUTO_NUMPY_SWITCH_THRESHOLD:
self.switch_storage_mode('numpy')
logging.info(_('Number of imported points : ')+str(k))
return k
[docs]
def import_from_shapefile(self, fn:str='', targetcolumn:str = 'X1_Y1_Z1', bbox:Polygon = None, value_columns=None):
"""Import points from a Shapefile using ``geopandas``.
Two extraction modes:
1. If *targetcolumn* exists in the columns, each row is read as a
``'X,Y,Z'`` string (format used by SPW-ARNE-DCENN).
2. Otherwise, the ``geometry`` column is used (Point or MultiPoint).
If neither is found, :meth:`_resolve_shapefile_column` is called to
allow subclasses (e.g. the GUI) to propose an alternative column.
:param fn: Shapefile path (``.shp``). If empty or non-existent, no action is taken.
:param targetcolumn: column name containing coordinates as ``'X,Y,Z'``.
:param bbox: Shapely polygon delimiting the area of interest.
Passed to ``gpd.read_file(fn, bbox=...)`` to spatially
filter features during reading.
:param value_columns: optional attribute import selector.
``None`` (default) imports geometry only;
``'all'`` imports all non-geometry/non-XYZ-source
columns; explicit list imports selected columns.
:return: number of imported points, or ``None`` on error.
"""
if fn == '' or not exists(fn):
logging.error(_('File not found : ')+fn)
return
import geopandas as gpd
# read data
gdf:gpd.GeoDataFrame = gpd.read_file(fn, bbox=bbox)
return self._import_from_geodataframe(gdf, source_label=fn, targetcolumn=targetcolumn, value_columns=value_columns)
[docs]
def import_from_geopackage(self, fn:str='', layer:str = None, targetcolumn:str = 'X1_Y1_Z1', bbox:Polygon = None, value_columns=None):
"""Import points from a GeoPackage using ``geopandas``.
:param fn: GeoPackage path (``.gpkg``). If empty or non-existent, no action is taken.
:param layer: optional layer name. If ``None``, geopandas default layer is used.
:param targetcolumn: column name containing coordinates as ``'X,Y,Z'``.
:param bbox: optional spatial filter passed to ``gpd.read_file``.
:param value_columns: optional attribute import selector.
:return: number of imported points, or ``None`` on error.
"""
if fn == '' or not exists(fn):
logging.error(_('File not found : ')+fn)
return
import geopandas as gpd
if layer is None:
gdf = gpd.read_file(fn, bbox=bbox)
else:
gdf = gpd.read_file(fn, layer=layer, bbox=bbox)
return self._import_from_geodataframe(gdf, source_label=fn, targetcolumn=targetcolumn, value_columns=value_columns)
[docs]
def _resolve_export_value_columns(self, value_columns) -> list[str]:
"""Resolve which attributes should be exported.
:param value_columns: ``None`` (no attributes), ``'all'`` or explicit iterable.
:return: ordered list of attribute names to export.
"""
if value_columns is None:
return []
if self.storage_mode is StorageMode.NUMPY:
available = list(self._numpy_values.keys())
else:
available = []
seen = set()
for row in self._myvertices.values():
for key in row.keys():
if key == 'vertex' or key in seen:
continue
seen.add(key)
available.append(key)
if value_columns == 'all':
return available
if isinstance(value_columns, (list, tuple, set)):
return [col for col in value_columns if col in available]
logging.warning(_('Unsupported value_columns option: {}').format(value_columns))
return []
[docs]
def _build_geodataframe_for_export(self, value_columns='all', include_xyz_column:bool = True,
xyz_column:str = 'X1_Y1_Z1', crs=None):
"""Build a GeoDataFrame representation of the cloud for export."""
import geopandas as gpd
cols = self._resolve_export_value_columns(value_columns)
data = {}
if self.storage_mode is StorageMode.NUMPY:
xyz = np.asarray(self._numpy_xyz, dtype=np.float64)
for col in cols:
arr = np.asarray(self._numpy_values[col], dtype=object)
data[col] = [None if cur is _NUMPY_MISSING else cur for cur in arr]
if include_xyz_column:
data[xyz_column] = [f'{x},{y},{z}' for x, y, z in xyz]
geometry = [Point(float(x), float(y), float(z)) for x, y, z in xyz]
return gpd.GeoDataFrame(data, geometry=geometry, crs=crs)
keys = list(self._myvertices.keys())
rows = [self._myvertices[k] for k in keys]
for col in cols:
data[col] = [row.get(col, None) for row in rows]
if include_xyz_column:
data[xyz_column] = [
f"{row['vertex'].x},{row['vertex'].y},{row['vertex'].z}"
for row in rows
]
geometry = [
Point(float(row['vertex'].x), float(row['vertex'].y), float(row['vertex'].z))
for row in rows
]
return gpd.GeoDataFrame(data, geometry=geometry, crs=crs)
[docs]
def export_to_shapefile(self, fn:str, value_columns='all', include_xyz_column:bool = True,
xyz_column:str = 'X1_Y1_Z1', crs=None):
"""Export cloud vertices to a Shapefile using ``geopandas``.
:param fn: destination ``.shp`` path.
:param value_columns: attributes to export (``None``, ``'all'`` or explicit iterable).
:param include_xyz_column: write ``X,Y,Z`` CSV string column for roundtrip import.
:param xyz_column: name of the optional XYZ text column.
:param crs: optional CRS forwarded to GeoDataFrame.
:return: number of exported points, or ``None`` on error.
"""
if fn == '':
logging.error(_('File path is empty'))
return
try:
gdf = self._build_geodataframe_for_export(
value_columns=value_columns,
include_xyz_column=include_xyz_column,
xyz_column=xyz_column,
crs=crs,
)
Path(fn).parent.mkdir(parents=True, exist_ok=True)
gdf.to_file(fn, driver='ESRI Shapefile')
return len(gdf)
except Exception as e:
logging.error(_('Error during shapefile export : ')+fn)
logging.error(e)
return
[docs]
def export_to_geopackage(self, fn:str, layer:str = 'points', value_columns='all',
include_xyz_column:bool = True, xyz_column:str = 'X1_Y1_Z1', crs=None):
"""Export cloud vertices to a GeoPackage using ``geopandas``.
:param fn: destination ``.gpkg`` path.
:param layer: destination layer name.
:param value_columns: attributes to export (``None``, ``'all'`` or explicit iterable).
:param include_xyz_column: write ``X,Y,Z`` CSV string column for roundtrip import.
:param xyz_column: name of the optional XYZ text column.
:param crs: optional CRS forwarded to GeoDataFrame.
:return: number of exported points, or ``None`` on error.
"""
if fn == '':
logging.error(_('File path is empty'))
return
try:
gdf = self._build_geodataframe_for_export(
value_columns=value_columns,
include_xyz_column=include_xyz_column,
xyz_column=xyz_column,
crs=crs,
)
Path(fn).parent.mkdir(parents=True, exist_ok=True)
gdf.to_file(fn, layer=layer, driver='GPKG')
return len(gdf)
except Exception as e:
logging.error(_('Error during geopackage export : ')+fn)
logging.error(e)
return
# ----------------------------------------------------------------
# JSON serialization
# ----------------------------------------------------------------
[docs]
def to_dict(self) -> dict:
"""Serialize the cloud to a plain dictionary.
The dictionary contains the cloud identifier, visual properties,
column headers, and vertices as a compact 2-D list.
:return: dictionary suitable for :func:`json.dumps`.
"""
# Build header: always X, Y, Z + any value columns
value_keys: list[str] = []
if self._header:
if self.storage_mode is StorageMode.NUMPY:
value_keys = list(self._numpy_values.keys())
elif self.nbvertices > 0:
value_keys = [k for k in self._myvertices[next(iter(self._myvertices))].keys()
if k != 'vertex']
header = ['X', 'Y', 'Z'] + value_keys
# Build vertex rows
vertices: list[list] = []
for _row_id, row in self.iter_rows():
v = row['vertex']
line = [v.x, v.y, v.z]
for k in value_keys:
val = row.get(k)
# Convert numpy scalars to Python types for JSON
if isinstance(val, (np.integer,)):
val = int(val)
elif isinstance(val, (np.floating,)):
val = float(val)
elif isinstance(val, np.ndarray):
val = val.tolist()
line.append(val)
vertices.append(line)
return {
'idx': self.idx,
'properties': self.myprop.to_dict(),
'header': header,
'vertices': vertices,
}
@classmethod
[docs]
def from_dict(cls, d: dict, **kwargs) -> "cloud_vertices":
"""Create a :class:`cloud_vertices` from a dictionary.
:param d: dictionary as produced by :meth:`to_dict`.
:param kwargs: extra keyword arguments forwarded to the constructor
(e.g. ``mapviewer``, ``plotted`` for the GUI subclass).
:return: new :class:`cloud_vertices` instance.
"""
idx = d.get('idx', '')
c = cls(idx=idx, **kwargs)
# Restore properties
props = d.get('properties')
if props is not None:
c.myprop = c._make_cloudproperties_from_dict(props, parent=c)
# Restore vertices
header = d.get('header', ['X', 'Y', 'Z'])
rows = d.get('vertices', [])
if not rows:
return c
value_keys = header[3:] # everything after X, Y, Z
# Build internal dict storage
cloud_dict: dict[int, dict] = {}
for i, row in enumerate(rows):
x, y, z = float(row[0]), float(row[1]), float(row[2])
entry = {'vertex': wolfvertex(x, y, z)}
for j, key in enumerate(value_keys):
if 3 + j < len(row):
entry[key] = row[3 + j]
cloud_dict[i] = entry
c.add_vertex(cloud=cloud_dict)
if value_keys:
c._header = True
c.find_minmax(force=True)
return c
[docs]
def save_json(self, fn: str | Path, indent: int = 2) -> None:
"""Save the cloud to a JSON file.
:param fn: destination file path.
:param indent: JSON indentation level (``None`` for compact output).
"""
d = {
'version': 1,
'format': 'cloud_vertices',
**self.to_dict(),
}
fn = Path(fn)
fn.parent.mkdir(parents=True, exist_ok=True)
# ensure ".json" extension
if fn.suffix.lower() != '.json':
fn = fn.with_suffix(fn.suffix + '.json')
with open(fn, 'w', encoding='utf-8') as f:
json.dump(d, f, indent=indent, ensure_ascii=False)
self.filename = str(fn)
@classmethod
[docs]
def load_json(cls, fn: str | Path, **kwargs) -> "cloud_vertices":
"""Load a cloud from a JSON file.
:param fn: source file path.
:param kwargs: forwarded to :meth:`from_dict` (and then to the
constructor, e.g. ``mapviewer``, ``plotted``).
:return: new :class:`cloud_vertices` instance.
:raises ValueError: if the file format is not recognized.
"""
fn = Path(fn)
with open(fn, 'r', encoding='utf-8') as f:
d = json.load(f)
fmt = d.get('format', '')
if fmt not in ('cloud_vertices', 'cloud_of_clouds'):
raise ValueError(
_('Unsupported JSON format: {}').format(fmt))
return cls.from_dict(d, **kwargs)
[docs]
def duplicate(self, idx: str | None = None, **kwargs) -> "cloud_vertices":
"""Create a deep copy of this cloud.
All vertices, properties and metadata are duplicated. The new
cloud shares no mutable state with the original.
:param idx: identifier for the copy. If ``None``, the original
``idx`` is reused.
:param kwargs: extra keyword arguments forwarded to the
constructor (e.g. ``mapviewer`` for the GUI).
:return: independent :class:`cloud_vertices` copy.
"""
d = self.to_dict()
if idx is not None:
d['idx'] = idx
copy = type(self).from_dict(d, **kwargs)
copy.loaded = self.loaded
return copy
[docs]
def copy(self, idx: str | None = None, **kwargs) -> "cloud_vertices":
"""Alias for duplicate method."""
return self.duplicate(idx=idx, **kwargs)
# ----------------------------------------------------------------
# Iteration
# ----------------------------------------------------------------
[docs]
def iter_on_vertices(self):
"""Generator over the cloud vertices.
:yields: :class:`wolfvertex` instances one by one.
"""
if self.storage_mode is StorageMode.NUMPY:
for cur in self._numpy_xyz:
yield wolfvertex(cur[0], cur[1], cur[2])
return
for cur in self.myvertices.values():
yield cur['vertex']
[docs]
def iter_rows(self):
"""Yield cloud rows as ``(row_id, row_dict)`` for both backends.
``row_dict`` always contains at least ``{'vertex': wolfvertex(...)}``.
In NumPy backend mode, additional value columns are attached when
present for that row.
"""
if self.storage_mode is StorageMode.NUMPY:
n = len(self._numpy_xyz)
keys = self._numpy_keys if self._numpy_keys is not None else list(range(n))
for idx, (row_id, cur) in enumerate(zip(keys, self._numpy_xyz)):
row = {'vertex': wolfvertex(cur[0], cur[1], cur[2])}
for key, arr in self._numpy_values.items():
val = arr[idx]
if val is _NUMPY_MISSING:
continue
row[key] = val
yield row_id, row
return
for row_id, row in self.myvertices.items():
yield row_id, row
# ----------------------------------------------------------------
# Properties & coordinate accessors
# ----------------------------------------------------------------
@property
[docs]
def nbvertices(self) -> int:
"""
Number of vertices in the cloud
"""
if self.storage_mode is StorageMode.NUMPY:
return len(self._numpy_xyz)
return len(self.myvertices)
@property
[docs]
def xyz(self) -> np.ndarray:
"""
Alias for get_xyz method
"""
return self.get_xyz(key='vertex')
[docs]
def get_xyz(self, key='vertex') -> np.ndarray:
"""
Return the vertices as numpy array
:param key: key to be used for the third column (Z) -- 'vertex' or any key in the dictionnary -- if 'vertex'', [[X,Y,Z]] are returned
"""
if self.storage_mode is StorageMode.NUMPY:
if not self._header or key == 'vertex':
return self._numpy_xyz.copy()
if key not in self._numpy_values:
raise KeyError(key)
return np.column_stack((
self._numpy_xyz[:, 0],
self._numpy_xyz[:, 1],
np.asarray(self._numpy_values[key], dtype=np.float64)
))
n = len(self.myvertices)
if n == 0:
return np.empty((0, 3), dtype=np.float64)
vals = self.myvertices.values()
xs = np.fromiter((cur['vertex'].x for cur in vals), dtype=np.float64, count=n)
ys = np.fromiter((cur['vertex'].y for cur in self.myvertices.values()), dtype=np.float64, count=n)
if self._header and key != 'vertex':
zs = np.fromiter((float(cur[key]) for cur in self.myvertices.values()), dtype=np.float64, count=n)
else:
zs = np.fromiter((cur['vertex'].z for cur in self.myvertices.values()), dtype=np.float64, count=n)
return np.column_stack((xs, ys, zs))
@property
[docs]
def has_values(self) -> bool:
"""
Check if the cloud has values (other than X,Y,Z)
"""
if self._header:
if self.storage_mode is StorageMode.NUMPY:
return len(self._numpy_values) > 0
return len(self.myvertices[0]) > 1
else:
return False
@property
[docs]
def has_value_columns(self) -> bool:
"""Whether rows include value columns in addition to ``vertex``.
This explicit alias mirrors the historical ``_header`` flag while
keeping backward compatibility.
"""
return self._header
@has_value_columns.setter
def has_value_columns(self, value: bool):
"""Set the explicit value-column flag (backward-compatible alias)."""
self._header = bool(value)
@property
[docs]
def get_vertices(self) -> list[wolfvertex]:
"""Return all vertices as a list.
:return: list of :class:`wolfvertex` instances
(references, not copies).
"""
return list(self.iter_on_vertices())
[docs]
def get_multipoint(self) -> MultiPoint:
"""Convert the cloud to a ``shapely.geometry.MultiPoint``.
:return: MultiPoint object containing all vertices.
"""
return MultiPoint([cur.as_shapelypoint() for cur in self.iter_on_vertices()])
# ----------------------------------------------------------------
# Bounds
# ----------------------------------------------------------------
[docs]
def _updatebounds(self, newvert: wolfvertex = None, newcloud: dict = None):
"""
Update the bounds of the cloud
:param newvert : (optional) vertex added to the cloud
:param newcloud: (optional) cloud added to the cloud
'newvert' or 'newcloud' can be passed as argument during add_vertex operation.
In this way, the bounds are updated without going through all the vertices -> expected more rapid.
"""
xmin = self.xbounds[0]
xmax = self.xbounds[1]
ymin = self.ybounds[0]
ymax = self.ybounds[1]
zmin = self.zbounds[0]
zmax = self.zbounds[1]
if not newvert is None:
xmin = min(newvert.x, xmin)
xmax = max(newvert.x, xmax)
ymin = min(newvert.y, ymin)
ymax = max(newvert.y, ymax)
zmin = min(newvert.z, zmin)
zmax = max(newvert.z, zmax)
self.xbounds = (xmin, xmax)
self.ybounds = (ymin, ymax)
self.zbounds = (zmin, zmax)
if not newcloud is None:
for item in newcloud.values():
curvert = item['vertex']
xmin = min(curvert.x, xmin)
xmax = max(curvert.x, xmax)
ymin = min(curvert.y, ymin)
ymax = max(curvert.y, ymax)
zmin = min(curvert.z, zmin)
zmax = max(curvert.z, zmax)
self.xbounds = (xmin, xmax)
self.ybounds = (ymin, ymax)
self.zbounds = (zmin, zmax)
[docs]
def find_minmax(self, force:bool=False):
"""Compute the spatial bounds of the cloud.
Updates ``xmin``, ``xmax``, ``ymin``, ``ymax``, ``zmin``, ``zmax``
as well as ``xbounds``, ``ybounds``, ``zbounds``.
:param force: if ``True``, recompute from all coordinates.
If ``False``, no action is taken (bounds are already
up-to-date thanks to incremental updates from
:meth:`_updatebounds`).
"""
if force:
self.xmin = 1.e300
self.xmax = -1.e300
self.ymin = 1.e300
self.ymax = -1.e300
self.zmin = 1.e300
self.zmax = -1.e300
# if no vertice or file not present -> return
if self.storage_mode is StorageMode.NUMPY:
if len(self._numpy_xyz) == 0:
return
self.xmin = float(np.min(self._numpy_xyz[:, 0]))
self.xmax = float(np.max(self._numpy_xyz[:, 0]))
self.ymin = float(np.min(self._numpy_xyz[:, 1]))
self.ymax = float(np.max(self._numpy_xyz[:, 1]))
self.zmin = float(np.min(self._numpy_xyz[:, 2]))
self.zmax = float(np.max(self._numpy_xyz[:, 2]))
self.xbounds = (self.xmin, self.xmax)
self.ybounds = (self.ymin, self.ymax)
self.zbounds = (self.zmin, self.zmax)
return
if len(self.myvertices) == 0 :
return
n = len(self.myvertices)
vals = self.myvertices.values()
xs = np.fromiter((cur['vertex'].x for cur in vals), dtype=np.float64, count=n)
ys = np.fromiter((cur['vertex'].y for cur in self.myvertices.values()), dtype=np.float64, count=n)
zs = np.fromiter((cur['vertex'].z for cur in self.myvertices.values()), dtype=np.float64, count=n)
self.xmin = float(np.min(xs))
self.xmax = float(np.max(xs))
self.ymin = float(np.min(ys))
self.ymax = float(np.max(ys))
self.zmin = float(np.min(zs))
self.zmax = float(np.max(zs))
self.xbounds = (self.xmin, self.xmax)
self.ybounds = (self.ymin, self.ymax)
self.zbounds = (self.zmin, self.zmax)
pass
# ----------------------------------------------------------------
# Vertex mutation
# ----------------------------------------------------------------
[docs]
def add_vertex(self, vertextoadd: wolfvertex = None, id=None, cloud: dict = None):
"""Add one or more vertices to the cloud.
Two usage modes:
- *vertextoadd*: add a single vertex. If *id* is not provided,
the identifier defaults to ``len(myvertices)``.
- *cloud*: merge a dictionary ``{id: {'vertex': wolfvertex, ...}}``
into ``myvertices``. Existing keys are overwritten.
Spatial bounds are updated incrementally.
:param vertextoadd: single vertex to add.
:param id: integer vertex identifier. ``None`` = auto-assigned.
:param cloud: dictionary of vertices to merge. ``wolfvertex``
instances are referenced, not copied.
"""
if self.storage_mode is StorageMode.NUMPY:
keys = self._numpy_keys if self._numpy_keys is not None else list(range(len(self._numpy_xyz)))
def _ensure_attr_columns(attr_keys:list[str], target_len:int):
for attr in attr_keys:
if attr not in self._numpy_values:
arr = np.empty(target_len, dtype=object)
arr[:] = _NUMPY_MISSING
self._numpy_values[attr] = arr
def _set_row_from_item(row_idx:int, item:dict):
vert = item['vertex']
self._numpy_xyz[row_idx] = [vert.x, vert.y, vert.z]
attrs = [cur for cur in item.keys() if cur != 'vertex']
_ensure_attr_columns(attrs, len(self._numpy_xyz))
for attr, arr in self._numpy_values.items():
if attr in item:
arr[row_idx] = item[attr]
else:
arr[row_idx] = _NUMPY_MISSING
def _append_item(row_key, item:dict):
vert = item['vertex']
new_xyz = np.array([[vert.x, vert.y, vert.z]], dtype=np.float64)
self._numpy_xyz = np.vstack((self._numpy_xyz, new_xyz))
keys.append(row_key)
for attr, arr in self._numpy_values.items():
add = np.empty(1, dtype=object)
add[0] = _NUMPY_MISSING
self._numpy_values[attr] = np.concatenate((arr, add))
attrs = [cur for cur in item.keys() if cur != 'vertex']
_ensure_attr_columns(attrs, len(self._numpy_xyz))
for attr in attrs:
self._numpy_values[attr][-1] = item[attr]
if vertextoadd is not None:
curid = id if id is not None else len(self._numpy_xyz)
item = {'vertex': vertextoadd}
if curid in keys:
_set_row_from_item(keys.index(curid), item)
else:
_append_item(curid, item)
if cloud is not None:
for curid, item in cloud.items():
if curid in keys:
_set_row_from_item(keys.index(curid), item)
else:
_append_item(curid, item)
self._numpy_keys = keys
self.mytree = None
self.find_minmax(force=True)
self.on_changed_vertices()
return
if vertextoadd is not None:
curid = id
if curid is None:
curid = len(self.myvertices)
self.myvertices[curid] = {'vertex': vertextoadd}
self._updatebounds(vertextoadd)
if cloud is not None:
for id, item in cloud.items():
self.myvertices[id] = item
self._updatebounds(newcloud=cloud)
if vertextoadd is not None or cloud is not None:
self.mytree = None
self.on_changed_vertices()
[docs]
def remove_vertex(self, id: int):
"""Remove a vertex from the cloud and recompute bounds.
:param id: integer identifier of the vertex to remove.
A warning is logged if the identifier does not exist.
"""
if self.storage_mode is StorageMode.NUMPY:
keys = self._numpy_keys if self._numpy_keys is not None else list(range(len(self._numpy_xyz)))
if id in keys:
idx = keys.index(id)
self._numpy_xyz = np.delete(self._numpy_xyz, idx, axis=0)
for attr, arr in self._numpy_values.items():
self._numpy_values[attr] = np.delete(arr, idx)
keys.pop(idx)
self._numpy_keys = keys
self.mytree = None
self.find_minmax(force=True)
self.on_changed_vertices()
else:
logging.warning(_('Vertex with id {} not found in the cloud').format(id))
return
if id in self.myvertices:
del self.myvertices[id]
self._updatebounds()
self.mytree = None
self.on_changed_vertices()
else:
logging.warning(_('Vertex with id {} not found in the cloud').format(id))
[docs]
def move_vertex(self,
id: int,
x: float,
y: float,
z: float | None = None,
invalidate_tree: bool = True,
notify: bool = True,
recompute_bounds: bool = True) -> bool:
"""Move an existing vertex while preserving its row identifier.
:param id: row identifier to move.
:param x: new X coordinate.
:param y: new Y coordinate.
:param z: optional new Z coordinate. If ``None``, keeps current Z.
:param invalidate_tree: if ``True``, clears KDTree cache.
:param notify: if ``True``, calls :meth:`on_changed_vertices`.
:param recompute_bounds: if ``True``, recomputes cloud bounds.
:return: ``True`` if the vertex was moved, ``False`` otherwise.
"""
if self.storage_mode is StorageMode.NUMPY:
keys = self._numpy_keys if self._numpy_keys is not None else list(range(len(self._numpy_xyz)))
if id not in keys:
logging.warning(_('Vertex with id {} not found in the cloud').format(id))
return False
idx = keys.index(id)
self._numpy_xyz[idx, 0] = float(x)
self._numpy_xyz[idx, 1] = float(y)
if z is not None:
self._numpy_xyz[idx, 2] = float(z)
if invalidate_tree:
self.mytree = None
if recompute_bounds:
self.find_minmax(force=True)
if notify:
self.on_changed_vertices()
return True
if id not in self.myvertices:
logging.warning(_('Vertex with id {} not found in the cloud').format(id))
return False
curv = self.myvertices[id]['vertex']
curv.x = float(x)
curv.y = float(y)
if z is not None:
curv.z = float(z)
if invalidate_tree:
self.mytree = None
if recompute_bounds:
self.find_minmax(force=True)
if notify:
self.on_changed_vertices()
return True
[docs]
def remove_nearest_vertex(self, x: float, y: float, z: float = 0., max_distance:float | None = None):
""" Remove the vertex closest to the given coordinates and recompute bounds. """
nearest_id = self.find_nearest_id([[x, y, z]], max_distance=max_distance)
if nearest_id is not None:
self.remove_vertex(nearest_id)
else:
logging.warning(_('No vertex found in the cloud to remove.'))
[docs]
def remove_last_vertex(self):
""" Remove the last added vertex (highest identifier) from the cloud and recompute bounds. """
if self.storage_mode is StorageMode.NUMPY:
if self._numpy_keys:
last_id = self._numpy_keys[-1]
self.remove_vertex(last_id)
else:
logging.warning(_('No vertices in the cloud to remove.'))
return
if self.myvertices:
last_id = max(self.myvertices.keys())
self.remove_vertex(last_id)
else:
logging.warning(_('No vertices in the cloud to remove.'))
[docs]
def add_vertices(self, vertices:list[wolfvertex]):
"""Add a list of vertices to the cloud.
Identifiers are assigned sequentially starting from
``len(myvertices)``.
:param vertices: list of :class:`wolfvertex` instances to add.
"""
if self.storage_mode is StorageMode.NUMPY:
if len(vertices) == 0:
return
if self._numpy_keys is None:
self._numpy_keys = list(range(len(self._numpy_xyz)))
first_id = len(self._numpy_xyz)
new_xyz = np.array([[cur.x, cur.y, cur.z] for cur in vertices], dtype=np.float64)
self._numpy_xyz = np.vstack((self._numpy_xyz, new_xyz))
self._numpy_keys.extend([first_id + i for i in range(len(vertices))])
for attr, arr in self._numpy_values.items():
add = np.empty(len(vertices), dtype=object)
add[:] = _NUMPY_MISSING
self._numpy_values[attr] = np.concatenate((arr, add))
self.mytree = None
self.find_minmax(force=True)
self.on_changed_vertices()
return
first_id = len(self.myvertices)
for curid, curvert in enumerate(vertices):
self.myvertices[first_id + curid] = {'vertex': curvert}
self._updatebounds(curvert)
if len(vertices) > 0:
self.mytree = None
self.on_changed_vertices()
[docs]
def add_values_by_id_list(self, id:str, values:list[float]):
"""
Add values to the cloud
:param id: use as key for the values
:param values: list of values to be added - must be the same length as number of vertices
"""
expected_len = len(self._numpy_xyz) if self.storage_mode is StorageMode.NUMPY else len(self.myvertices)
if len(values) != expected_len:
logging.warning(_('Number of values does not match the number of vertices -- Retry !!'))
logging.info(_(('Number of vertices : ')+str(expected_len)))
return
if self.storage_mode is StorageMode.NUMPY:
self._numpy_values[id] = np.asarray(values)
self._header = True
return
for item, val in zip(self.myvertices.values(), values):
item[id] = val
self._header = True
# ----------------------------------------------------------------
# Cloud splitting helpers
# ----------------------------------------------------------------
[docs]
def split_by_keys(self, keys: str | list[str], include_missing: bool = False) -> dict:
"""Split the cloud into sub-clouds grouped by one or several keys.
Grouping keys are read from each row dictionary (same keys as
:meth:`iter_rows`). For a single key, the returned mapping uses the
scalar value as dictionary key. For multiple keys, it uses tuples.
:param keys: one key name or a list of key names used for grouping.
:param include_missing: if ``True``, rows missing at least one grouping
key are still included with value ``None`` for missing entries.
If ``False``, such rows are ignored.
:return: ``{group_value: cloud_vertices}`` where each cloud contains
only rows belonging to this group.
"""
if isinstance(keys, str):
keys = [keys]
else:
keys = list(keys)
if len(keys) == 0:
return {}
def _hashable(val):
if isinstance(val, np.ndarray):
return tuple(val.tolist())
if isinstance(val, list):
return tuple(val)
if isinstance(val, dict):
return tuple(sorted(val.items()))
return val
grouped_rows = {}
for __, row in self.iter_rows():
group_values = []
missing = False
for curkey in keys:
if curkey in row:
group_values.append(_hashable(row[curkey]))
else:
missing = True
group_values.append(None)
if missing and not include_missing:
continue
group_id = group_values[0] if len(group_values) == 1 else tuple(group_values)
if group_id not in grouped_rows:
grouped_rows[group_id] = {}
new_row = {'vertex': row['vertex'].copy()}
for curkey, curval in row.items():
if curkey == 'vertex':
continue
new_row[curkey] = curval
curgroup = grouped_rows[group_id]
curgroup[len(curgroup)] = new_row
result = {}
for group_id, cloud_rows in grouped_rows.items():
newcloud = self._make_cloud_vertices(idx=f'{self.idx}')
if len(cloud_rows) > 0:
newcloud.add_vertex(cloud=cloud_rows)
newcloud._header = self._header
result[group_id] = newcloud
return result
[docs]
def split_cloud(self, splitter, inside_prefix: str = 'inside_', outside_prefix: str = 'outside_'):
"""Split this cloud into inside/outside subsets using an external splitter.
The *splitter* object is expected to provide either:
- ``select_points_inside(cloud_vertices) -> list[bool]``
- or ``isinside(x, y) -> bool``
This duck-typed contract avoids importing vector classes here,
preventing circular imports between PyVertex and pyvertexvectors.
:param splitter: Geometry-like object used to classify points.
:param inside_prefix: Prefix for the inside cloud identifier.
:param outside_prefix: Prefix for the outside cloud identifier.
:return: ``(cloud_inside, cloud_outside)``.
"""
inside_flags = None
selector = getattr(splitter, 'select_points_inside', None)
if callable(selector):
inside_flags = selector(self)
else:
checker = getattr(splitter, 'isinside', None)
if not callable(checker):
raise TypeError('splitter must provide select_points_inside(cloud) or isinside(x, y)')
inside_flags = [bool(checker(curvert.x, curvert.y)) for curvert in self.iter_on_vertices()]
if len(inside_flags) != self.nbvertices:
raise ValueError('splitter returned an invalid number of flags')
cloud_inside = self._make_cloud_vertices(idx=f'{inside_prefix}{self.idx}')
cloud_outside = self._make_cloud_vertices(idx=f'{outside_prefix}{self.idx}')
for is_inside, (_, row) in zip(inside_flags, self.iter_rows()):
new_row = {'vertex': row['vertex'].copy()}
for curkey, curval in row.items():
if curkey == 'vertex':
continue
new_row[curkey] = curval
if is_inside:
cloud_inside.add_vertex(cloud={cloud_inside.nbvertices: new_row})
else:
cloud_outside.add_vertex(cloud={cloud_outside.nbvertices: new_row})
cloud_inside._header = self._header
cloud_outside._header = self._header
return cloud_inside, cloud_outside
[docs]
def split_by_vector(self, vector_like, inside_prefix: str = 'inside_', outside_prefix: str = 'outside_'):
"""Explicit alias to split this cloud using a vector-like splitter.
This is a convenience wrapper around :meth:`split_cloud` for the
common case where the splitter is a vector object exposing
``select_points_inside`` and/or ``isinside``.
:param vector_like: Vector-like object used to classify points.
:param inside_prefix: Prefix for the inside cloud identifier.
:param outside_prefix: Prefix for the outside cloud identifier.
:return: ``(cloud_inside, cloud_outside)``.
"""
return self.split_cloud(vector_like, inside_prefix=inside_prefix, outside_prefix=outside_prefix)
# ----------------------------------------------------------------
# Display / legend
# ----------------------------------------------------------------
[docs]
def set_legend_column(self, key: str, visible: bool = True):
"""Set the legend to display a named value column for each point.
:param key: Column name to display. Use ``''`` for the row identifier,
``'ID'`` for the sequential index, ``'X'`` / ``'Y'`` / ``'Z'`` for
coordinates, or any column name previously added with
:meth:`add_values_by_id_list`.
:param visible: Whether to make the legend visible. Defaults to ``True``.
"""
self.myprop.legendtext = key
self.myprop.legendvisible = visible
# ----------------------------------------------------------------
# Interpolation & geometry projection
# ----------------------------------------------------------------
[docs]
def interp_on_array(self, myarray, key:str='vertex', method:Literal['linear', 'nearest', 'cubic'] = 'linear'):
"""
Interpolation of the cloud on a 2D array
:param myarray: WolfArray instance
:param key: key to be used for the third column (Z) -- 'vertex' or any key in the dictionnary
:param method: interpolation method -- 'linear', 'nearest', 'cubic'
see interpolate_on_cloud method of WolfArray for more information
"""
xyz = self.get_xyz(key)
myarray.interpolate_on_cloud(xyz[:,:2], xyz[:,2], method)
[docs]
def projectontrace(self, trace, return_cloud:bool = True, proximity:float = 99999.):
"""Project the cloud onto a trace (polyline of type ``vector``).
Each point is orthogonally projected onto the trace; the curvilinear
coordinate *s* (distance along the trace) and the original point's
elevation *z* are extracted.
:param trace: ``vector`` instance (must have ``asshapely_ls()`` and
``myname`` attributes).
:param return_cloud: if ``True``, return a new ``cloud_vertices``
whose vertices are ``(s, z)``.
If ``False``, return two lists ``(s_list, z_list)``.
:param proximity: search radius around the trace (in map units).
Only points within this buffer are kept.
The default value (99999) keeps all points.
:return: ``cloud_vertices`` or tuple ``(list[float], list[float])``
depending on *return_cloud*.
"""
# trace:vector
tracels:LineString
tracels = trace.asshapely_ls() # convert to Shapely LineString
if proximity == 99999.:
# keep all points
all_s = [tracels.project(Point(cur.x, cur.y)) for cur in self.iter_on_vertices()] # project points onto trace and get curvilinear coordinate
all_z = [cur.z for cur in self.iter_on_vertices()]
else:
buffer = tracels.buffer(proximity) # buffer around the trace
multipoints = self.get_multipoint() # convert to Shapely MultiPoint
mp = multipoints.intersection(buffer) # intersect with buffer
all_s = [tracels.project(Point(cur.x,cur.y)) for cur in mp.geoms] # project points onto trace and get curvilinear coordinate
all_z = [cur.z for cur in mp.geoms] # extract elevation
new_dict = {}
k=0
for s,z in zip(all_s,all_z):
new_dict[k] = {'vertex':wolfvertex(s,z)}
k+=1
if return_cloud:
newcloud = self._make_cloud_vertices(idx=_('Projection on ')+trace.myname)
newcloud.add_vertex(cloud=new_dict)
return newcloud
else:
return all_s, all_z
# ====================================================================
# Cloud-of-clouds model
# ====================================================================
[docs]
class cloud_of_clouds:
"""Collection of :class:`cloud_vertices` instances.
Mirrors the ``Zones → zone → vector`` hierarchy but for point clouds:
``cloud_of_clouds → cloud_vertices → wolfvertex``.
Provides:
- cloud management (add, remove, reorder, access by index or name);
- bulk display-property propagation (color, width, style, alpha, legend…);
- value manipulation across all clouds (add, get, colorize);
- iteration helpers;
- spatial queries (bounds, nearest).
:ivar myclouds: ordered list of :class:`cloud_vertices` instances.
:ivar idx: text identifier for the collection.
"""
[docs]
myclouds: list[cloud_vertices]
# ----------------------------------------------------------------
# Construction
# ----------------------------------------------------------------
def __init__(self, idx: str = '', clouds: list[cloud_vertices] | None = None) -> None:
"""Create a cloud collection.
:param idx: text identifier for the collection.
:param clouds: optional initial list of clouds to include.
"""
self.idx = idx
self.myclouds = []
if clouds:
for c in clouds:
self.add_cloud(c)
# ----------------------------------------------------------------
# Cloud management
# ----------------------------------------------------------------
[docs]
def add_cloud(self, cloud: cloud_vertices) -> None:
"""Append a cloud to the collection.
:param cloud: cloud to add.
"""
self.myclouds.append(cloud)
cloud.parent_collection = self
# ----------------------------------------------------------------
# Factory methods (overridden in GUI subclass)
# ----------------------------------------------------------------
[docs]
def _make_cloud_vertices(self, **kwargs) -> cloud_vertices:
"""Create a cloud_vertices. GUI subclass returns the GUI variant."""
return cloud_vertices(**kwargs)
[docs]
def _make_cloud_vertices_from_dict(self, d: dict, **kwargs) -> cloud_vertices:
"""Create a cloud_vertices from a dictionary. GUI subclass returns the GUI variant."""
return cloud_vertices.from_dict(d, **kwargs)
[docs]
def create_cloud(self, idx: str = '', **kwargs) -> cloud_vertices:
"""Create a new empty cloud and add it to the collection.
:param idx: identifier for the new cloud.
:param kwargs: forwarded to :meth:`_make_cloud_vertices`.
:return: the newly created cloud.
"""
c = self._make_cloud_vertices(idx=idx, **kwargs)
self.add_cloud(c)
return c
[docs]
def remove_cloud(self, key: int | str) -> cloud_vertices | None:
"""Remove a cloud by index or name.
:param key: integer index or string ``idx`` of the cloud.
:return: the removed cloud, or ``None`` if not found.
"""
cloud = self._resolve(key)
if cloud is not None:
self.myclouds.remove(cloud)
cloud.parent_collection = None
return cloud
[docs]
def _resolve(self, key: int | str) -> cloud_vertices | None:
"""Resolve a cloud by index or name.
:param key: integer index or string ``idx``.
:return: cloud instance, or ``None`` if not found.
"""
if isinstance(key, int):
if 0 <= key < len(self.myclouds):
return self.myclouds[key]
return None
for c in self.myclouds:
if c.idx == key:
return c
return None
def __getitem__(self, key: int | str) -> cloud_vertices:
"""Access a cloud by index or name.
:param key: integer index or string ``idx``.
:return: the cloud.
:raises KeyError: if not found.
"""
c = self._resolve(key)
if c is None:
raise KeyError(f'Cloud not found: {key}')
return c
def __len__(self) -> int:
return len(self.myclouds)
def __iter__(self):
return iter(self.myclouds)
def __contains__(self, key: int | str) -> bool:
return self._resolve(key) is not None
# ----------------------------------------------------------------
# Properties & accessors
# ----------------------------------------------------------------
@property
[docs]
def nbclouds(self) -> int:
"""Number of clouds in the collection."""
return len(self.myclouds)
@property
[docs]
def cloud_names(self) -> list[str]:
"""List of cloud identifiers."""
return [c.idx for c in self.myclouds]
@property
[docs]
def nbvertices(self) -> int:
"""Total number of vertices across all clouds."""
return sum(c.nbvertices for c in self.myclouds)
# ----------------------------------------------------------------
# Bounds
# ----------------------------------------------------------------
[docs]
def find_minmax(self, force: bool = True):
"""Recompute bounds for all clouds.
:param force: forwarded to each cloud's :meth:`find_minmax`.
"""
for c in self.myclouds:
c.find_minmax(force=force)
@property
[docs]
def xbounds(self) -> tuple[float, float]:
"""Global X extent across all clouds."""
if not self.myclouds:
return (0., 0.)
return (min(c.xbounds[0] for c in self.myclouds),
max(c.xbounds[1] for c in self.myclouds))
@property
[docs]
def ybounds(self) -> tuple[float, float]:
"""Global Y extent across all clouds."""
if not self.myclouds:
return (0., 0.)
return (min(c.ybounds[0] for c in self.myclouds),
max(c.ybounds[1] for c in self.myclouds))
@property
[docs]
def zbounds(self) -> tuple[float, float]:
"""Global Z extent across all clouds."""
if not self.myclouds:
return (0., 0.)
return (min(c.zbounds[0] for c in self.myclouds),
max(c.zbounds[1] for c in self.myclouds))
# ----------------------------------------------------------------
# Iteration
# ----------------------------------------------------------------
[docs]
def iter_all_vertices(self):
"""Yield every vertex across all clouds.
:yields: :class:`wolfvertex` instances.
"""
for c in self.myclouds:
yield from c.iter_on_vertices()
[docs]
def iter_all_rows(self):
"""Yield ``(cloud_idx, row_id, row_dict)`` for every row across all clouds.
:yields: ``(str, row_id, dict)`` tuples.
"""
for c in self.myclouds:
for row_id, row in c.iter_rows():
yield c.idx, row_id, row
# ----------------------------------------------------------------
# Value manipulation
# ----------------------------------------------------------------
[docs]
def add_values(self, key: str, values: np.ndarray | dict):
"""Add values to the clouds.
:param key: value column identifier.
:param values: either a dict ``{cloud_idx: list}`` mapping cloud
names to per-vertex value lists, or a flat ndarray whose length
must equal :pyattr:`nbvertices` (values are distributed to clouds
in order).
"""
if isinstance(values, dict):
for name, vals in values.items():
c = self._resolve(name)
if c is not None:
c.add_values_by_id_list(key, vals)
else:
logging.warning(_('Cloud not found: {}').format(name))
elif isinstance(values, np.ndarray):
offset = 0
for c in self.myclouds:
n = c.nbvertices
c.add_values_by_id_list(key, values[offset:offset + n])
offset += n
else:
logging.warning(_('Unsupported values type for add_values'))
[docs]
def get_values(self, key: str) -> dict[str, np.ndarray]:
"""Retrieve values from all clouds.
:param key: value column identifier.
:return: dict ``{cloud_idx: ndarray}`` for clouds that have the key.
"""
result = {}
for c in self.myclouds:
if c.has_values and key in c.header:
xyz = c.get_xyz(key)
result[c.idx] = xyz[:, 2]
return result
[docs]
def get_all_xyz(self) -> np.ndarray:
"""Return all XYZ coordinates as a single array.
:return: ``(N, 3)`` array with all vertices concatenated.
"""
arrays = [c.get_xyz() for c in self.myclouds if c.nbvertices > 0]
if not arrays:
return np.empty((0, 3), dtype=np.float64)
return np.vstack(arrays)
# ----------------------------------------------------------------
# Display property propagation
# ----------------------------------------------------------------
[docs]
def set_color(self, color: int) -> None:
"""Set uniform drawing color for all clouds.
:param color: RGB integer (see ``getIfromRGB``).
"""
for c in self.myclouds:
c.myprop.color = color
[docs]
def set_width(self, width: int) -> None:
"""Set point size for all clouds.
:param width: size in pixels.
"""
for c in self.myclouds:
c.myprop.width = width
[docs]
def set_style(self, style: int) -> None:
"""Set rendering style for all clouds.
:param style: style index (see ``Cloud_Styles``).
"""
for c in self.myclouds:
c.myprop.style = style
[docs]
def set_alpha(self, alpha: int) -> None:
"""Set transparency for all clouds.
:param alpha: opacity value (0 = opaque, 255 = fully transparent).
"""
for c in self.myclouds:
c.myprop.alpha = alpha
[docs]
def set_filled(self, filled: bool) -> None:
"""Set symbol fill for all clouds.
:param filled: ``True`` = filled symbols.
"""
for c in self.myclouds:
c.myprop.filled = filled
# ----------------------------------------------------------------
# Legend propagation
# ----------------------------------------------------------------
[docs]
def set_legend_visible(self, visible: bool = True) -> None:
"""Show or hide legends for all clouds.
:param visible: ``True`` to display.
"""
for c in self.myclouds:
c.myprop.legendvisible = visible
[docs]
def set_legend_text(self, text: str) -> None:
"""Set legend text for all clouds.
:param text: legend text.
"""
for c in self.myclouds:
c.myprop.legendtext = text
[docs]
def set_legend_color(self, color: int) -> None:
"""Set legend text color for all clouds.
:param color: RGB integer.
"""
for c in self.myclouds:
c.myprop.legendcolor = color
[docs]
def set_legend_fontsize(self, size: int) -> None:
"""Set legend font size for all clouds.
:param size: font size in points.
"""
for c in self.myclouds:
c.myprop.legendfontsize = size
[docs]
def set_legend_from_idx(self, visible: bool = True) -> None:
"""Set each cloud's legend text to its own ``idx``.
:param visible: whether to make legends visible.
"""
for c in self.myclouds:
c.myprop.legendtext = c.idx
c.myprop.legendvisible = visible
# ----------------------------------------------------------------
# Spatial queries
# ----------------------------------------------------------------
[docs]
def find_nearest(self, xyz: np.ndarray | list, nb: int = 1):
"""Find the nearest vertex across all clouds.
Queries each cloud's KDTree and returns the overall nearest.
:param xyz: query coordinates ``[x, y, z]`` or ``[[x, y, z], ...]``.
:param nb: number of nearest neighbors.
:return: ``(distance, wolfvertex, row_dict, cloud_idx)`` for the
closest result, or ``(None, None, None, None)`` if empty.
"""
best_dist = None
best_vert = None
best_item = None
best_cloud_idx = None
for c in self.myclouds:
if c.nbvertices == 0:
continue
dist, vert, item = c.find_nearest(xyz, nb=1)
if dist is None:
continue
d = float(dist) if not isinstance(dist, np.ndarray) else float(dist)
if best_dist is None or d < best_dist:
best_dist = d
best_vert = vert
best_item = item
best_cloud_idx = c.idx
return best_dist, best_vert, best_item, best_cloud_idx
# ----------------------------------------------------------------
# Merge / flatten
# ----------------------------------------------------------------
[docs]
def merge(self, idx: str = '') -> cloud_vertices:
"""Merge all clouds into a single cloud.
Vertex values are preserved. Cloud origin is tracked via a
``'__source__'`` value column.
:param idx: identifier for the merged cloud.
:return: new :class:`cloud_vertices` containing all vertices.
"""
merged = self._make_cloud_vertices(idx=idx or self.idx)
for c in self.myclouds:
cloud_dict = {}
for row_id, row in c.iter_rows():
new_row = {'vertex': row['vertex'].copy()}
for k, v in row.items():
if k == 'vertex':
continue
new_row[k] = v
new_row['__source__'] = c.idx
cloud_dict[merged.nbvertices + len(cloud_dict)] = new_row
if cloud_dict:
merged.add_vertex(cloud=cloud_dict)
merged._header = True
return merged
# ----------------------------------------------------------------
# JSON serialization
# ----------------------------------------------------------------
[docs]
def to_dict(self) -> dict:
"""Serialize the collection to a plain dictionary.
Each child cloud is serialized via its own :meth:`cloud_vertices.to_dict`.
"""
return {
'idx': self.idx,
'clouds': [c.to_dict() for c in self.myclouds],
}
@classmethod
[docs]
def from_dict(cls, d: dict, **kwargs) -> "cloud_of_clouds":
"""Create a :class:`cloud_of_clouds` from a dictionary.
:param d: dictionary as produced by :meth:`to_dict`.
:param kwargs: forwarded to each child's :meth:`cloud_vertices.from_dict`.
:return: new :class:`cloud_of_clouds` instance.
"""
idx = d.get('idx', '')
coc = cls(idx=idx)
for cloud_d in d.get('clouds', []):
c = coc._make_cloud_vertices_from_dict(cloud_d, **kwargs)
coc.add_cloud(c)
return coc
[docs]
def save_json(self, fn: str | Path, indent: int = 2) -> None:
"""Save the collection to a JSON file.
:param fn: destination file path.
:param indent: JSON indentation level (``None`` for compact output).
"""
d = {
'version': 1,
'format': 'cloud_of_clouds',
**self.to_dict(),
}
fn = Path(fn)
fn.parent.mkdir(parents=True, exist_ok=True)
# ensure ".json" extension
if fn.suffix.lower() != '.json':
fn = fn.with_suffix(fn.suffix + '.json')
with open(fn, 'w', encoding='utf-8') as f:
json.dump(d, f, indent=indent, ensure_ascii=False)
self.filename = str(fn)
@classmethod
[docs]
def load_json(cls, fn: str | Path, **kwargs) -> "cloud_of_clouds":
"""Load a collection from a JSON file.
:param fn: source file path.
:param kwargs: forwarded to each child's :meth:`cloud_vertices.from_dict`.
:return: new :class:`cloud_of_clouds` instance.
:raises ValueError: if the file format is not ``cloud_of_clouds``.
"""
fn = Path(fn)
with open(fn, 'r', encoding='utf-8') as f:
d = json.load(f)
fmt = d.get('format', '')
if fmt == 'cloud_vertices':
# legacy format: single cloud without collection wrapper
coc = cls()
cloud = coc._make_cloud_vertices_from_dict(d, **kwargs)
coc.idx = cloud.idx
coc.add_cloud(cloud)
coc.filename = str(fn)
return coc
if fmt != 'cloud_of_clouds':
raise ValueError(
_('Expected format "cloud_of_clouds", got: {}').format(fmt))
newobj = cls.from_dict(d, **kwargs)
newobj.filename = str(fn)
return newobj
[docs]
def duplicate(self, idx: str | None = None, **kwargs) -> "cloud_of_clouds":
"""Create a deep copy of this collection and all its clouds.
Every child cloud is duplicated independently; the new collection
shares no mutable state with the original.
:param idx: identifier for the copy. If ``None``, the original
``idx`` is reused.
:param kwargs: extra keyword arguments forwarded to each child's
constructor (e.g. ``mapviewer`` for the GUI).
:return: independent :class:`cloud_of_clouds` copy.
"""
d = self.to_dict()
if idx is not None:
d['idx'] = idx
return type(self).from_dict(d, **kwargs)
[docs]
def copy(self, idx: str | None = None, **kwargs) -> "cloud_of_clouds":
"""Alias to :meth:`duplicate`."""
return self.duplicate(idx=idx, **kwargs)