wolfhece.eikonal ================ .. py:module:: wolfhece.eikonal .. autoapi-nested-parse:: Author: HECE - University of Liege, Pierre Archambeau Date: 2025, 2026 Copyright (c) 2026 University of Liege. All rights reserved. This script and its content are protected by copyright law. Unauthorized copying or distribution of this file, via any medium, is strictly prohibited. Module Contents --------------- .. py:function:: _evaluate_distance_and_data_first_order_iso(i, j, fixed: numpy.ndarray, where_compute: numpy.ndarray, dx_mesh: float, dy_mesh: float, times: numpy.ndarray, base_data: numpy.ndarray, test_data: numpy.ndarray, speed: float = 1.0) -> tuple[float, float] Evaluate the time and data using a first order isotropic method. :param i: (int): The row index. :param j: (int): The column index. :param fixed: (2D numpy array): A boolean array where True indicates fixed points. :param where_compute: (2D numpy array): A boolean array where True indicates cells to be included in computation. :param dx_mesh: (float): The mesh size in x direction. :param dy_mesh: (float): The mesh size in y direction. :param times: (2D numpy array): The time function. :param base_data: (2D numpy array): The base data that will propagate. :param test_data: (2D numpy array): The test data that will be used to validate propagation. :param speed: (float): The isotropic propagation speed. :return: tuple[float, float]: The time and data. .. py:function:: _evaluate_distance_and_data_second_order_iso(i, j, fixed: numpy.ndarray, where_compute: numpy.ndarray, dx_mesh: float, dy_mesh: float, times: numpy.ndarray, base_data: numpy.ndarray, test_data: numpy.ndarray, speed: float = 1.0) -> tuple[float, float] Evaluate the time and data using a **second-order** isotropic scheme. Implements the quadratic upwind approximation of Sethian (1999): For each axis (x and y), if the *two* upwind neighbours are both *fixed*, a second-order backward difference is used:: (9/4) T² - (6 T_{-1} - 3/2 T_{-2})T + … = (dx/speed)² Otherwise the standard first-order one-sided difference is used for that axis. The resulting quadratic ``a T² - 2 b T + c = 0`` is solved exactly. Parameters are identical to :func:`_evaluate_distance_and_data_first_order_iso`. .. py:function:: _evaluate_distance_anisotropic(i, j, fixed: numpy.ndarray, where_compute: numpy.ndarray, dx_mesh: float, dy_mesh: float, times: numpy.ndarray, speed_xx: numpy.ndarray, speed_xy: numpy.ndarray, speed_yy: numpy.ndarray) -> float Evaluate the arrival time for an **anisotropic** eikonal equation. Solves (∇T)^T D (∇T) = 1 where D is the local metric tensor:: D = [[speed_xx, speed_xy], [speed_xy, speed_yy]] This supports direction-dependent propagation speeds (e.g. flow along a preferential direction, wind-driven fire spread, etc.). The upwind finite-difference approximations of T_x and T_y are first-order; each axis picks the smallest fixed neighbour. The quadratic to solve is (dropping mesh-size scaling for clarity):: D_xx (T - Tx)² + 2 D_xy (T - Tx)(T - Ty) + D_yy (T - Ty)² = h² which expands to a standard a T² + b T + c = 0. :param speed_xx: 2D array, D_xx component of the metric tensor. :param speed_xy: 2D array, D_xy component (off-diagonal, symmetric). :param speed_yy: 2D array, D_yy component of the metric tensor. :return: The new arrival time (float), or np.inf if no update. .. py:function:: _evaluate_distance_anisotropic_8conn(i, j, fixed, where_compute, dx_mesh, dy_mesh, times, speed_xx, speed_xy, speed_yy) Evaluate the arrival time for an **anisotropic** eikonal equation using an **8-connected** stencil (Ordered Upwind Method). Solves (∇T)^T D (∇T) = 1 where D is the local metric tensor. Unlike :func:`_evaluate_distance_anisotropic` which only considers the 4 cardinal neighbours, this kernel considers all 8 neighbours (N, NE, E, SE, S, SW, W, NW) and forms **triangle updates** for each pair of adjacent fixed neighbours. For each triangle (P, A, B) where A and B are two adjacent fixed neighbours, the update solves:: T(P) = min_{λ ∈ [0,1]} [ λ T_A + (1-λ) T_B + ‖λ e_A + (1-λ) e_B‖_{D⁻¹} ] where e_A = A − P, e_B = B − P in physical coordinates. This properly captures **off-diagonal anisotropy** (D_xy ≠ 0) along diagonal directions, which a 4-connected stencil cannot resolve. Reference: Sethian & Vladimirsky (2003), "Ordered Upwind Methods for Static Hamilton–Jacobi Equations: Theory and Algorithms", SIAM J. Numer. Anal. :param speed_xx: 2D array, D_xx component of the metric tensor. :param speed_xy: 2D array, D_xy component (off-diagonal, symmetric). :param speed_yy: 2D array, D_yy component. :return: The new arrival time (float), or np.inf if no update. .. py:function:: _heap_push(heap_t, heap_i, heap_j, heap_size, t, row, col) Push (t, row, col) onto the binary min-heap. Returns updated size. Uses a sift-up strategy: the new element is placed at the end then bubbled up until the heap invariant is restored. .. py:function:: _heap_pop(heap_t, heap_i, heap_j, heap_size) Pop the minimum element from the binary min-heap. Returns (new_size, t, row, col). The last leaf replaces the root and is sifted down until the heap invariant holds. .. py:function:: __solve_eikonal_with_data(sources, where_compute, base_data, test_data, speed, dx_mesh, dy_mesh) Solve the Eikonal equation using the Fast Marching Method (FMM). Fully @njit-compiled version using a manual binary min-heap (three parallel numpy arrays: times, rows, cols) instead of Python's heapq module. This eliminates the CPython interpreter overhead on the hot FMM loop (heap operations + neighbor evaluation), typically yielding a x5-x20 speed-up on large grids. The first call incurs a one-time Numba compilation cost (~2-5 s); subsequent calls run at native speed. :param sources: 2D int64 array of shape (N, 2) — source (row, col). .. py:function:: _solve_eikonal_with_data(sources: list[tuple[int, int]], where_compute: numpy.ndarray = None, base_data: numpy.ndarray = None, test_data: numpy.ndarray = None, speed: numpy.ndarray = None, dx_mesh: float = 1.0, dy_mesh: float = 1.0) -> numpy.ndarray Solve the Eikonal equation using the Fast Marching Method (FMM). :param sources: (list of tuples): The coordinates of the source points. :param where_compute: (2D numpy array, optional): A boolean array where True indicates cells to be included in computation. :param base_data: (2D numpy array, optional): The base data that will propagate. :param test_data: (2D numpy array, optional): The test data that will be used to validate propagation. :param speed: (2D numpy array): The speed function. :param dx_mesh: (float, optional): The mesh size in x direction. :param dy_mesh: (float, optional): The mesh size in y direction. :return: 2D numpy array The solution to the Eikonal equation. .. py:function:: __solve_eikonal_with_data_second_order(sources, where_compute, base_data, test_data, speed, dx_mesh, dy_mesh) Solve the Eikonal equation with **second-order** spatial accuracy. Same structure as :func:`__solve_eikonal_with_data` but calls :func:`_evaluate_distance_and_data_second_order_iso` which uses a quadratic upwind stencil (Sethian, 1999) whenever two consecutive upwind neighbours are available, falling back to first-order otherwise. :param sources: 2D int64 array of shape (N, 2) — source (row, col). .. py:function:: _solve_eikonal_with_data_second_order(sources: list[tuple[int, int]], where_compute: numpy.ndarray = None, base_data: numpy.ndarray = None, test_data: numpy.ndarray = None, speed: numpy.ndarray = None, dx_mesh: float = 1.0, dy_mesh: float = 1.0) -> numpy.ndarray Solve the Eikonal equation with second-order spatial accuracy. API-compatible with :func:`_solve_eikonal_with_data`. Uses a quadratic upwind stencil when two consecutive upwind neighbours are available, falling back to first order otherwise. :param sources: (list of tuples): The coordinates of the source points. :param where_compute: (2D numpy array): A boolean array where True indicates cells to be included in computation. :param base_data: (2D numpy array): The base data that will propagate. :param test_data: (2D numpy array): The test data that will be used to validate propagation. :param speed: (2D numpy array): The speed function. :param dx_mesh: (float, optional): The mesh size in x direction. :param dy_mesh: (float, optional): The mesh size in y direction. :return: 2D numpy array The solution to the Eikonal equation. .. py:function:: __solve_eikonal_anisotropic(sources, where_compute, speed_xx, speed_xy, speed_yy, dx_mesh, dy_mesh) Solve the **anisotropic** Eikonal equation using FMM. Computes arrival times for (∇T)^T D (∇T) = 1 where D is a spatially-varying 2×2 symmetric positive-definite metric tensor given component-wise by ``speed_xx``, ``speed_xy``, ``speed_yy``. Uses an **8-connected** stencil (Ordered Upwind Method) with triangle updates to properly capture off-diagonal anisotropy. When a cell is fixed, all 8 neighbours are notified; each neighbour's update considers the 8 adjacent-pair triangles in :func:`_evaluate_distance_anisotropic_8conn`. For the isotropic case, set ``speed_xx = speed_yy = speed²`` and ``speed_xy = 0``. This variant does **not** propagate data — it only computes T. :param sources: 2D int64 array of shape (N, 2) — source (row, col). :param speed_xx: 2D array, D_xx component. :param speed_xy: 2D array, D_xy component (symmetric off-diagonal). :param speed_yy: 2D array, D_yy component. .. py:function:: _solve_eikonal_anisotropic(sources: list[tuple[int, int]], where_compute: numpy.ndarray, speed_xx: numpy.ndarray, speed_xy: numpy.ndarray, speed_yy: numpy.ndarray, dx_mesh: float = 1.0, dy_mesh: float = 1.0) -> numpy.ndarray Solve the **anisotropic** Eikonal equation using the Fast Marching Method. Computes arrival times T such that:: (∇T)^T D (∇T) = 1 where D(x,y) is a spatially-varying 2×2 symmetric positive-definite metric tensor. The three upper-triangle components are given as separate 2-D arrays. For direction-dependent speed, build D from the speed ellipse: if the speed along angle θ is ``s_max`` and perpendicular is ``s_min``, then:: D = R(θ) diag(s_max², s_min²) R(θ)^T where R is the rotation matrix. :param sources: list of (row, col) source locations. :param where_compute: boolean 2D array — True where propagation is allowed. :param speed_xx: 2D array, D_xx component of metric tensor. :param speed_xy: 2D array, D_xy component (off-diagonal). :param speed_yy: 2D array, D_yy component. :param dx_mesh: mesh spacing in x (column) direction. :param dy_mesh: mesh spacing in y (row) direction. :return: 2D array of arrival times T. Example — isotropic equivalent:: speed = 2.0 # m/s Dxx = Dyy = speed**2 * np.ones(shape) Dxy = np.zeros(shape) T = solve_eikonal_anisotropic(sources, mask, Dxx, Dxy, Dyy) Example — preferential direction:: theta = np.pi / 4 # 45 degrees s_max, s_min = 3.0, 1.0 # m/s c, s = np.cos(theta), np.sin(theta) Dxx = (s_max*c)**2 + (s_min*s)**2 Dxy = (s_max**2 - s_min**2) * c * s Dyy = (s_max*s)**2 + (s_min*c)**2 T = solve_eikonal_anisotropic(sources, mask, Dxx, Dxy, Dyy) .. py:function:: __solve_eikonal_anisotropic_4conn(sources, where_compute, speed_xx, speed_xy, speed_yy, dx_mesh, dy_mesh) Solve the anisotropic Eikonal equation using the legacy 4-connected stencil. Kept for comparison with the 8-connected OUM solver. See :func:`__solve_eikonal_anisotropic` for the full docstring. .. py:function:: _solve_eikonal_anisotropic_4conn(sources: list[tuple[int, int]], where_compute: numpy.ndarray, speed_xx: numpy.ndarray, speed_xy: numpy.ndarray, speed_yy: numpy.ndarray, dx_mesh: float = 1.0, dy_mesh: float = 1.0) -> numpy.ndarray Solve anisotropic Eikonal with the legacy **4-connected** stencil. Kept for benchmarking and comparison with the 8-connected OUM solver :func:`solve_eikonal_anisotropic`. Same parameters as :func:`solve_eikonal_anisotropic`. .. py:function:: _evaluate_distance_and_data_anisotropic_8conn(i, j, fixed, where_compute, dx_mesh, dy_mesh, times, base_data, test_data, speed_xx, speed_xy, speed_yy) Evaluate arrival time **and** propagated data for an anisotropic eikonal equation on an 8-connected stencil. Combines the triangle-update logic of :func:`_evaluate_distance_anisotropic_8conn` with the data-propagation mechanism of :func:`_evaluate_distance_and_data_first_order_iso`. Data propagation rule: * Only fixed neighbours whose ``base_data > test_data[i, j]`` are eligible (same filter as the isotropic data kernel). * The propagated data is the **inverse-distance–weighted average** of the eligible neighbours' ``base_data``, where the distance is the physical edge length to that neighbour. This gives more weight to closer (cardinal) neighbours than to diagonal ones. * If no eligible neighbour exists, ``(inf, test_data[i, j])`` is returned so that the update is rejected. :return: ``(new_time, new_data)`` .. py:function:: __solve_eikonal_anisotropic_with_data(sources, where_compute, base_data, test_data, speed_xx, speed_xy, speed_yy, dx_mesh, dy_mesh) Solve the anisotropic Eikonal equation with data propagation. Combines the 8-connected OUM FMM loop of :func:`__solve_eikonal_anisotropic` with data propagation as in :func:`__solve_eikonal_with_data`. ``base_data`` is modified **in-place**: at each newly fixed cell, the propagated data value replaces the previous content. :param sources: 2D int64 array of shape (N, 2) — source (row, col). :param base_data: 2D array — data to propagate (modified in-place). :param test_data: 2D array — validation threshold for propagation. .. py:function:: _solve_eikonal_anisotropic_with_data(sources: list[tuple[int, int]], where_compute: numpy.ndarray, base_data: numpy.ndarray, test_data: numpy.ndarray, speed_xx: numpy.ndarray, speed_xy: numpy.ndarray, speed_yy: numpy.ndarray, dx_mesh: float = 1.0, dy_mesh: float = 1.0) -> numpy.ndarray Solve the **anisotropic** Eikonal equation with **data propagation**. Computes arrival times T such that:: (∇T)^T D (∇T) = 1 and simultaneously propagates ``base_data`` outward from the sources along the FMM wavefront. At each newly reached cell, the data value is set to the inverse-distance–weighted average of its eligible fixed neighbours (those with ``base_data[neighbour] > test_data[cell]``). ``base_data`` is modified **in-place**. :param sources: list of (row, col) source locations. :param where_compute: boolean 2D array — 1.0 where propagation is allowed, 0.0 for frozen cells (sources, obstacles). :param base_data: 2D array of data to propagate (modified in-place). :param test_data: 2D array — propagation is accepted only when the candidate data value exceeds ``test_data[i, j]``. :param speed_xx: 2D array, D_xx component of metric tensor. :param speed_xy: 2D array, D_xy component (off-diagonal). :param speed_yy: 2D array, D_yy component. :param dx_mesh: mesh spacing in x (column) direction. :param dy_mesh: mesh spacing in y (row) direction. :return: 2D array of arrival times T. .. py:function:: _extract_patch_slices_with_margin(shape, labels, margin=2) Extract the slices of the patches with a margin around the labels. :param shape: (tuple): The shape of the array. :param labels: (2D numpy array): The labels of the patches. :param margin: (int, optional): The margin around the labels. .. py:function:: _process_submatrix(args) Function to process a submatrix in a multiprocess. .. py:function:: _solve_eikonal_with_value_on_submatrices(where_compute: numpy.ndarray, base_data: numpy.ndarray, test_data: numpy.ndarray = None, speed: numpy.ndarray = None, multiprocess: bool = False, dx: float = 1.0, dy: float = 1.0, ignore_last_patches: int = 1, NoDataValue: float = 0.0) -> numpy.ndarray Propagate data inside the mask using the Fast Marching Method (FMM). "base_data" will be updated with the new values. :param where_compute: (2D numpy array): A boolean array where True indicates cells to be included in computation. :param base_data: (2D numpy array): The base data that will propagate. :param test_data: (2D numpy array, optional): The test data that will be used to validate propagation (we used upstream data only if base_data > test_data). :param speed: (2D numpy array, optional): The isotropic propagation speed. :param multiprocess: (bool, optional): Whether to use multiprocessing. :param dx: (float, optional): The mesh size in x direction. :param dy: (float, optional): The mesh size in y direction. :return: 2D numpy array The solution to the Eikonal equation. .. py:function:: count_holes(mask: numpy.ndarray = None) Count the number of holes in the mask. .. py:function:: inpaint_array(data: numpy.ndarray | numpy.ma.MaskedArray, where_compute: numpy.ndarray, test: numpy.ndarray, ignore_last_patches: int = 1, inplace: bool = True, dx: float = 1.0, dy: float = 1.0, NoDataValue: float = 0.0, multiprocess: bool = True) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray] Inpaint the array using the Fast Marching Method (FMM). Main idea: - We have a 2D array "data" that we want to inpaint. - We have a 2D array "test" that will be used to validate the inpainting. - We have a 2D array "where_compute" that indicates where to compute the inpainting. - We will use the FMM to propagate the data inside the mask. - We will update the data only if the new value is greater than the test data. - (We can ignore n greatest patches to avoid computation in some large areas.) :param data: (2D numpy array): The water level to inpaint. :param mask: (2D numpy array): The simulation's Digital Elevation Model (DEM). :param test: (2D numpy array, optional): The digital terrain model (DTM). :param ignore_last_patches: (int, optional): The number of last patches to ignore. :param inplace: (bool, optional): Whether to update the water_level in place. :param dx: (float, optional): The mesh size in x direction. :param dy: (float, optional): The mesh size in y direction. :param NoDataValue: (float, optional): The NoDataValue, used if mask is not explicitly provided (mask atribute or water_level as a Numpy MaskedArray). Default is 0. :param multiprocess: (bool, optional): Whether to use multiprocessing. .. py:function:: inpaint_waterlevel(water_level: numpy.ndarray | numpy.ma.MaskedArray, dem: numpy.ndarray, dtm: numpy.ndarray, ignore_last_patches: int = 1, inplace: bool = True, dx: float = 1.0, dy: float = 1.0, NoDataValue: float = 0.0, multiprocess: bool = True, epsilon: float = 0.001) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray] Inpaint the water level using the Fast Marching Method (FMM). Similar to the HOLES.EXE Fortran program. Assumptions: - The simulations are in a steady state. - The flood extent is limited by: - natural topography (where DEM == DTM) - buildings or blocks of buildings - protective structures The goal is to propagate the free surface elevations into the buildings. We calculate the difference between the DEM (including buildings and walls) and the DTM to identify where the buildings are. Specifically: - if it is natural topography, the differences will be zero or almost zero - if it is a building or anthropogenic structure, the differences are significant We set the elevations of the cells where the difference is zero to a value unreachable by the flood. Thus, only buildings in contact with the flood will be affected and filled. HOLES.EXE vs Python code: - In "holes.exe", we must provide "in", "mask", and "dem" files: - "in" is the water level - "dem" is the digital terrain model (DTM) associated with the simulation's topo-bathymetry (not the topo-bathymetry itself) - "mask" is the DTM where the buildings are identified and a large value (above the maximum water level) if the cell is not inside a building. - The patches are identified by the water level "0.0" in the "in" file. - FMM is used and new water levels are calculated and retained if greater than the "mask" value. - The DTM is only used in the final part when evaluating the new water depths (Z-DTM). - In Python, we must provide the water level, the DEM, and the DTM: - The patches will be identified by the buildings array (filtered DEM - DTM). - FMM is used and new water levels are calculated and retained if greater than the DTM value. - FMM is only propagated in the cells where "buildings = True". - Final results must be the same even if the algorithm is a little bit different. - Fortran code demands the "mask" file to be provided (pre-computed/modified by the user), but in Python, we calculate it automatically from the DEM and DTM. - We can use "inpaint_array" to be more flexible... by manually providing a "where_compute" and a "test" arrays :param water_level: (2D numpy array): The water level to inpaint. :param dem: (2D numpy array): The simulation's Digital Elevation Model (DEM). :param dtm: (2D numpy array, optional): The digital terrain model (DTM). :param ignore_last_patches: (int, optional): The number of last patches to ignore. :param inplace: (bool, optional): Whether to update the water_level in place. :param dx: (float, optional): The mesh size in x direction. :param dy: (float, optional): The mesh size in y direction. :param NoDataValue: (float, optional): The NoDataValue, used if mask is not explicitly provided (mask attribute or water_level as a Numpy MaskedArray). Default is 0. :param multiprocess: (bool, optional): Whether to use multiprocessing. :param epsilon: (float, optional): The minimum value to consider that a water height is present. .. py:function:: _evaluate_distance_aniso_drift_8conn(i, j, fixed, where_compute, dx_mesh, dy_mesh, times, speed_xx, speed_xy, speed_yy, drift_vx, drift_vy) Evaluate arrival time for anisotropic eikonal with drift on 8-conn stencil. Solves sqrt( (∇T)^T M (∇T) ) + v · ∇T = 1 For a single edge from fixed neighbour A to point P with edge vector e = A - P (in physical coords): ∇T ≈ (T_P - T_A) / ||e||² * e (1D approximation) Substituting into the equation and solving for T_P: T_P = T_A + [ -v·e + sqrt( e^T M e - (v·e)² + e^T M e * (v·e / ||e||_M)² ) ] ... which simplifies (see derivation) to solving a 1D quadratic for each edge direction. For triangle updates (A, B adjacent fixed neighbours), we use the parametric approach T = λ T_A + (1-λ) T_B + cost(λ) where cost includes the drift correction. :param drift_vx: 2D array, x-component of drift velocity at each cell. :param drift_vy: 2D array, y-component of drift velocity at each cell. :return: new arrival time (float), or np.inf if no valid update. .. py:function:: __solve_eikonal_aniso_drift(sources, where_compute, speed_xx, speed_xy, speed_yy, drift_vx, drift_vy, dx_mesh, dy_mesh) Solve anisotropic Eikonal with drift using FMM (8-connected). sqrt( (∇T)^T M (∇T) ) + v · ∇T = 1 This variant does NOT propagate data — only computes T. :param drift_vx: 2D array, x-component of drift velocity. :param drift_vy: 2D array, y-component of drift velocity. .. py:function:: _solve_eikonal_aniso_drift(sources: list[tuple[int, int]], where_compute: numpy.ndarray, speed_xx: numpy.ndarray, speed_xy: numpy.ndarray, speed_yy: numpy.ndarray, drift_vx: numpy.ndarray, drift_vy: numpy.ndarray, dx_mesh: float = 1.0, dy_mesh: float = 1.0) -> numpy.ndarray Solve the **anisotropic Eikonal equation with drift** using FMM. Computes arrival times T such that:: sqrt( (∇T)^T D⁻¹ (∇T) ) + v · ∇T = 1 where D is a 2×2 symmetric positive-definite metric tensor and v = (vx, vy) is a spatially-varying drift (advection) velocity. The drift breaks the symmetry: propagation is faster **downstream** (in the direction of v) and slower upstream. **Sub-critical condition:** The FMM is valid as long as, at every cell, the drift speed does not exceed the intrinsic wavefront speed in any direction. In practice, ``||v||_D = sqrt(v^T D v) < 1`` should hold everywhere. This function does NOT propagate data — only arrival times. For data propagation with drift, use :func:`_solve_eikonal_aniso_drift_with_data`. :param sources: list of (row, col) source locations. :param where_compute: 2D float array — 1.0 where propagation is allowed, 0.0 for frozen cells. :param speed_xx: 2D array, D_xx component of metric tensor. :param speed_xy: 2D array, D_xy component (off-diagonal). :param speed_yy: 2D array, D_yy component. :param drift_vx: 2D array, x-component of drift velocity. :param drift_vy: 2D array, y-component of drift velocity. :param dx_mesh: mesh spacing in x (column) direction. :param dy_mesh: mesh spacing in y (row) direction. :return: 2D array of arrival times T. Example — uniform flow along x:: speed = 2.0 Dxx = Dyy = speed**2 * np.ones(shape) Dxy = np.zeros(shape) vx = 0.3 * np.ones(shape) # drift along +x vy = np.zeros(shape) T = _solve_eikonal_aniso_drift(sources, mask, Dxx, Dxy, Dyy, vx, vy) .. py:function:: _evaluate_distance_and_data_aniso_drift_8conn(i, j, fixed, where_compute, dx_mesh, dy_mesh, times, base_data, test_data, speed_xx, speed_xy, speed_yy, drift_vx, drift_vy) Evaluate arrival time AND propagated data for anisotropic eikonal with drift on an 8-connected stencil. Combines the drift-aware time update of :func:`_evaluate_distance_aniso_drift_8conn` with the inverse-distance–weighted data propagation of :func:`_evaluate_distance_and_data_anisotropic_8conn`. :return: ``(new_time, new_data)`` .. py:function:: __solve_eikonal_aniso_drift_with_data(sources, where_compute, base_data, test_data, speed_xx, speed_xy, speed_yy, drift_vx, drift_vy, dx_mesh, dy_mesh) Solve anisotropic Eikonal with drift and data propagation (FMM, 8-conn). ``base_data`` is modified **in-place**. .. py:function:: _solve_eikonal_aniso_drift_with_data(sources: list[tuple[int, int]], where_compute: numpy.ndarray, base_data: numpy.ndarray, test_data: numpy.ndarray, speed_xx: numpy.ndarray, speed_xy: numpy.ndarray, speed_yy: numpy.ndarray, drift_vx: numpy.ndarray, drift_vy: numpy.ndarray, dx_mesh: float = 1.0, dy_mesh: float = 1.0) -> numpy.ndarray Solve the **anisotropic Eikonal equation with drift and data propagation**. Computes arrival times T such that:: sqrt( (∇T)^T D⁻¹ (∇T) ) + v · ∇T = 1 and simultaneously propagates ``base_data`` outward from the sources along the FMM wavefront. ``base_data`` is modified **in-place**. :param sources: list of (row, col) source locations. :param where_compute: 2D float array — 1.0 where propagation is allowed, 0.0 for frozen cells. :param base_data: 2D array of data to propagate (modified in-place). :param test_data: 2D array — propagation accepted only when candidate data > test_data[i, j]. :param speed_xx: 2D array, D_xx component of metric tensor. :param speed_xy: 2D array, D_xy component (off-diagonal). :param speed_yy: 2D array, D_yy component. :param drift_vx: 2D array, x-component of drift velocity. :param drift_vy: 2D array, y-component of drift velocity. :param dx_mesh: mesh spacing in x (column) direction. :param dy_mesh: mesh spacing in y (row) direction. :return: 2D array of arrival times T. .. py:function:: diffusion_tensor_from_velocity(Ux: numpy.ndarray, Uy: numpy.ndarray, D_longitudinal, D_transverse, min_speed: float = 1e-10) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray] Build an anisotropic diffusion tensor aligned with a velocity field. The tensor is constructed as: .. math:: D = D_L \;(\mathbf{n} \otimes \mathbf{n}) + D_T \;(\mathbf{t} \otimes \mathbf{t}) where :math:`\mathbf{n} = (U_x, U_y) / |U|` is the unit flow direction and :math:`\mathbf{t} = (-n_y, n_x)` the perpendicular. In component form: - :math:`D_{xx} = D_L\, n_x^2 + D_T\, n_y^2` - :math:`D_{xy} = (D_L - D_T)\, n_x\, n_y` - :math:`D_{yy} = D_L\, n_y^2 + D_T\, n_x^2` At cells where :math:`|U| <` *min_speed*, the tensor falls back to isotropic: :math:`D = D_T\, I`. If *Ux* or *Uy* are :class:`numpy.ma.MaskedArray`, the returned arrays are also masked with the **union** of input masks. Masked cells are filled with *D_T* (isotropic) for Dxx/Dyy and 0 for Dxy. *D_longitudinal* and *D_transverse* may be **scalars** (uniform) or **2-D arrays** of the same shape as *Ux* / *Uy* (spatially variable). :param Ux: Velocity component in the **x** (column) direction (2-D array, rows × cols). :type Ux: numpy.ndarray or numpy.ma.MaskedArray :param Uy: Velocity component in the **y** (row) direction (2-D array, rows × cols). :type Uy: numpy.ndarray or numpy.ma.MaskedArray :param D_longitudinal: Diffusion coefficient along the flow direction (speed² units). Scalar or 2-D array. :type D_longitudinal: float or numpy.ndarray :param D_transverse: Diffusion coefficient perpendicular to the flow (speed² units). Scalar or 2-D array. :type D_transverse: float or numpy.ndarray :param min_speed: Minimum velocity magnitude below which the tensor is isotropic. Default ``1e-10``. :type min_speed: float, optional :return: ``(Dxx, Dxy, Dyy)`` — components of the symmetric 2×2 diffusion tensor, ready for :func:`_solve_eikonal_anisotropic` or :func:`_solve_eikonal_aniso_drift`. :rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray] Example:: Dxx, Dxy, Dyy = diffusion_tensor_from_velocity(Ux, Uy, D_L=9.0, D_T=1.0) T = _solve_eikonal_aniso_drift(sources, wc, Dxx, Dxy, Dyy, Ux, Uy, dx, dy) .. py:function:: eikonal_params_from_velocity(Ux: numpy.ndarray, Uy: numpy.ndarray, D_longitudinal, D_transverse, drift_scale: float = 1.0, min_speed: float = 1e-10) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray] Prepare all eikonal parameters from a hydrodynamic velocity field. Convenience wrapper that computes both the anisotropic diffusion tensor *D* (via :func:`diffusion_tensor_from_velocity`) and the drift velocity from a single velocity field (*Ux*, *Uy*). If *Ux* or *Uy* are :class:`numpy.ma.MaskedArray`, all five returned arrays are masked arrays with the union of input masks. *D_longitudinal* and *D_transverse* may be **scalars** or **2-D arrays** (spatially variable diffusivity). :param Ux: Velocity component in the **x** (column) direction (2-D array, rows × cols). :type Ux: numpy.ndarray or numpy.ma.MaskedArray :param Uy: Velocity component in the **y** (row) direction (2-D array, rows × cols). :type Uy: numpy.ndarray or numpy.ma.MaskedArray :param D_longitudinal: Speed² along the flow direction. Scalar or 2-D array. :type D_longitudinal: float or numpy.ndarray :param D_transverse: Speed² perpendicular to the flow. Scalar or 2-D array. :type D_transverse: float or numpy.ndarray :param drift_scale: Scaling factor applied to the drift velocity (``drift = drift_scale × (Ux, Uy)``). Use 1.0 to take the raw velocity as drift. Default ``1.0``. :type drift_scale: float, optional :param min_speed: Minimum |U| for directional tensor. Default ``1e-10``. :type min_speed: float, optional :return: ``(Dxx, Dxy, Dyy, drift_vx, drift_vy)`` — diffusion tensor components and drift velocity arrays. :rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray] Example:: Dxx, Dxy, Dyy, vx, vy = eikonal_params_from_velocity( Ux, Uy, D_longitudinal=9.0, D_transverse=1.0, drift_scale=0.8) T = _solve_eikonal_aniso_drift(sources, wc, Dxx, Dxy, Dyy, vx, vy, dx, dy)