Source code for wolfhece.eikonal

"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2025, 2026

Copyright (c) 2026 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import numpy as np
import heapq
import multiprocessing as mp
import scipy.ndimage
import logging
from numba import njit
from tqdm import tqdm

@njit(cache=True)
[docs] def _evaluate_distance_and_data_first_order_iso(i, j, fixed:np.ndarray, where_compute:np.ndarray, dx_mesh:float, dy_mesh:float, times:np.ndarray, base_data:np.ndarray, test_data:np.ndarray, speed:float = 1.0) -> tuple[float, float]: """ Evaluate the time and data using a first order isotropic method. :param i: (int): The row index. :param j: (int): The column index. :param fixed: (2D numpy array): A boolean array where True indicates fixed points. :param where_compute: (2D numpy array): A boolean array where True indicates cells to be included in computation. :param dx_mesh: (float): The mesh size in x direction. :param dy_mesh: (float): The mesh size in y direction. :param times: (2D numpy array): The time function. :param base_data: (2D numpy array): The base data that will propagate. :param test_data: (2D numpy array): The test data that will be used to validate propagation. :param speed: (float): The isotropic propagation speed. :return: tuple[float, float]: The time and data. """ # Search for fixed neighbors and accumulate sums in scalars. # # Performance note: we use scalar accumulators instead of building # a list of neighbor tuples followed by np.asarray() + np.sum(). # Under @njit, scalars are held in CPU registers — no heap # allocation per call. The previous approach created temporary # Python/Numba lists and arrays on every invocation, which is # costly given that this kernel is called once per neighbor per # FMM iteration (i.e. millions of times on large grids). # With at most 4 neighbors the loop is trivially unrollable by # the LLVM backend, making this essentially branch-free. n_fixed = np.int64(0) sum_abs_t = np.float64(0.0) sum_t2 = np.float64(0.0) sum_data = np.float64(0.0) for di, dj in ((-1, 0), (1, 0), (0, -1), (0, 1)): ni = i + di nj = j + dj if fixed[ni, nj] and not where_compute[ni, nj] and test_data[i, j] < base_data[ni, nj]: n_fixed += 1 t_val = times[ni, nj] sum_abs_t += abs(t_val) sum_t2 += t_val * t_val sum_data += base_data[ni, nj] if n_fixed == 0: return np.inf, test_data[i, j] a_data = a_time = np.float64(n_fixed) b_time = sum_abs_t c_time = sum_t2 b_data = sum_data # Résolution d'une équation du second degré pour trouver la distance b_time = -2.0 * b_time # l'hypothèse implicite dx = dy est faite ici c_time = c_time - dx_mesh*dy_mesh * speed**2.0 if b_time != 0. and c_time!= 0.: Realisant = abs(b_time*b_time-4*a_time*c_time) new_time = (-b_time+np.sqrt(Realisant)) / (2.0*a_time) elif c_time == 0.0 and b_time != 0.: new_time = -b_time/a_time elif b_time == 0.: new_time = np.sqrt(-c_time/a_time) data = b_data/a_data if data < test_data[i,j]: return np.inf, test_data[i,j] return new_time, data
@njit(cache=True)
[docs] def _evaluate_distance_and_data_second_order_iso(i, j, fixed:np.ndarray, where_compute:np.ndarray, dx_mesh:float, dy_mesh:float, times:np.ndarray, base_data:np.ndarray, test_data:np.ndarray, speed:float = 1.0) -> tuple[float, float]: """ Evaluate the time and data using a **second-order** isotropic scheme. Implements the quadratic upwind approximation of Sethian (1999): For each axis (x and y), if the *two* upwind neighbours are both *fixed*, a second-order backward difference is used:: (9/4) T² - (6 T_{-1} - 3/2 T_{-2})T + … = (dx/speed)² Otherwise the standard first-order one-sided difference is used for that axis. The resulting quadratic ``a T² - 2 b T + c = 0`` is solved exactly. Parameters are identical to :func:`_evaluate_distance_and_data_first_order_iso`. """ rows = fixed.shape[0] cols = fixed.shape[1] # ---- X axis (row direction) ---- # Check for second-order stencil: need (i-1,j) and (i-2,j) both fixed, # or (i+1,j) and (i+2,j) both fixed. Pick the side with the smaller T. ax = np.float64(0.0) bx = np.float64(0.0) cx = np.float64(0.0) data_x = np.float64(0.0) n_x = np.int64(0) has_x = False for sign in (-1, 1): n1i = i + sign n2i = i + 2 * sign if 0 <= n1i < rows and 0 <= n2i < rows: if (fixed[n1i, j] and not where_compute[n1i, j] and fixed[n2i, j] and not where_compute[n2i, j] and test_data[i, j] < base_data[n1i, j]): t1 = times[n1i, j] t2 = times[n2i, j] if t2 <= t1: # monotonicity: upwind second-order valid # second-order backward difference: (3T - 4T1 + T2) / (2h) # Squared → (9T² - (24T1-6T2)T + 16T1²-8T1T2+T2²) / (4h²) # Form: ax T² - 2 bx T + cx (without 1/h² factor) ax_cand = np.float64(9.0 / 4.0) bx_cand = np.float64(3.0 * t1 - 0.75 * t2) cx_cand = np.float64(4.0 * t1 * t1 - 2.0 * t1 * t2 + 0.25 * t2 * t2) if not has_x or t1 < times[i + (-sign), j]: ax = ax_cand bx = bx_cand cx = cx_cand data_x = base_data[n1i, j] n_x = 1 has_x = True # Fall back to first-order if second-order was not available if not has_x: for sign in (-1, 1): n1i = i + sign if 0 <= n1i < rows: if (fixed[n1i, j] and not where_compute[n1i, j] and test_data[i, j] < base_data[n1i, j]): t1 = times[n1i, j] if not has_x or t1 < bx: # pick smallest upwind T ax = np.float64(1.0) bx = t1 cx = t1 * t1 data_x = base_data[n1i, j] n_x = 1 has_x = True # ---- Y axis (column direction) ---- ay = np.float64(0.0) by = np.float64(0.0) cy = np.float64(0.0) data_y = np.float64(0.0) n_y = np.int64(0) has_y = False for sign in (-1, 1): n1j = j + sign n2j = j + 2 * sign if 0 <= n1j < cols and 0 <= n2j < cols: if (fixed[i, n1j] and not where_compute[i, n1j] and fixed[i, n2j] and not where_compute[i, n2j] and test_data[i, j] < base_data[i, n1j]): t1 = times[i, n1j] t2 = times[i, n2j] if t2 <= t1: ay_cand = np.float64(9.0 / 4.0) by_cand = np.float64(3.0 * t1 - 0.75 * t2) cy_cand = np.float64(4.0 * t1 * t1 - 2.0 * t1 * t2 + 0.25 * t2 * t2) if not has_y or t1 < times[i, j + (-sign)]: ay = ay_cand by = by_cand cy = cy_cand data_y = base_data[i, n1j] n_y = 1 has_y = True if not has_y: for sign in (-1, 1): n1j = j + sign if 0 <= n1j < cols: if (fixed[i, n1j] and not where_compute[i, n1j] and test_data[i, j] < base_data[i, n1j]): t1 = times[i, n1j] if not has_y or t1 < by: ay = np.float64(1.0) by = t1 cy = t1 * t1 data_y = base_data[i, n1j] n_y = 1 has_y = True if n_x == 0 and n_y == 0: return np.inf, test_data[i, j] # ---- Assemble and solve the quadratic ---- # (ax / dx² + ay / dy²) T² # - 2 (bx / dx² + by / dy²) T # + (cx / dx² + cy / dy²) - 1/speed² = 0 inv_dx2 = 1.0 / (dx_mesh * dx_mesh) inv_dy2 = 1.0 / (dy_mesh * dy_mesh) s2 = speed * speed a_coef = ax * inv_dx2 + ay * inv_dy2 b_coef = bx * inv_dx2 + by * inv_dy2 c_coef = cx * inv_dx2 + cy * inv_dy2 - 1.0 / s2 discr = b_coef * b_coef - a_coef * c_coef if discr < 0.0: discr = 0.0 if a_coef == 0.0: return np.inf, test_data[i, j] new_time = (b_coef + np.sqrt(discr)) / a_coef # ---- Data: weighted average of contributions ---- n_total = n_x + n_y data = (data_x * n_x + data_y * n_y) / np.float64(n_total) if data < test_data[i, j]: return np.inf, test_data[i, j] return new_time, data
@njit(cache=True)
[docs] def _evaluate_distance_anisotropic(i, j, fixed:np.ndarray, where_compute:np.ndarray, dx_mesh:float, dy_mesh:float, times:np.ndarray, speed_xx:np.ndarray, speed_xy:np.ndarray, speed_yy:np.ndarray) -> float: """ Evaluate the arrival time for an **anisotropic** eikonal equation. Solves (∇T)^T D (∇T) = 1 where D is the local metric tensor:: D = [[speed_xx, speed_xy], [speed_xy, speed_yy]] This supports direction-dependent propagation speeds (e.g. flow along a preferential direction, wind-driven fire spread, etc.). The upwind finite-difference approximations of T_x and T_y are first-order; each axis picks the smallest fixed neighbour. The quadratic to solve is (dropping mesh-size scaling for clarity):: D_xx (T - Tx)² + 2 D_xy (T - Tx)(T - Ty) + D_yy (T - Ty)² = h² which expands to a standard a T² + b T + c = 0. :param speed_xx: 2D array, D_xx component of the metric tensor. :param speed_xy: 2D array, D_xy component (off-diagonal, symmetric). :param speed_yy: 2D array, D_yy component of the metric tensor. :return: The new arrival time (float), or np.inf if no update. """ rows = fixed.shape[0] cols = fixed.shape[1] # ---- upwind T along x (row) ---- has_x = False Tx = np.float64(0.0) for sign in (-1, 1): ni = i + sign if 0 <= ni < rows: if fixed[ni, j] and not where_compute[ni, j]: t_val = times[ni, j] if not has_x or t_val < Tx: Tx = t_val has_x = True # ---- upwind T along y (col) ---- has_y = False Ty = np.float64(0.0) for sign in (-1, 1): nj = j + sign if 0 <= nj < cols: if fixed[i, nj] and not where_compute[i, nj]: t_val = times[i, nj] if not has_y or t_val < Ty: Ty = t_val has_y = True if not has_x and not has_y: return np.inf Dxx = speed_xx[i, j] Dxy = speed_xy[i, j] Dyy = speed_yy[i, j] hx = dx_mesh hy = dy_mesh if has_x and has_y: # Full 2D solve: # Dxx/hx² (T-Tx)² + 2 Dxy/(hx hy) (T-Tx)(T-Ty) + Dyy/hy² (T-Ty)² = 1 axx = Dxx / (hx * hx) ayy = Dyy / (hy * hy) axy = Dxy / (hx * hy) # note: factor 2 will appear below a = axx + 2.0 * axy + ayy b = -2.0 * (axx * Tx + axy * (Tx + Ty) + ayy * Ty) c = axx * Tx * Tx + 2.0 * axy * Tx * Ty + ayy * Ty * Ty - 1.0 if a == 0.0: # Degenerate metric — fall back to 1D t1d_x = Tx + hx / np.sqrt(Dxx) if Dxx > 0.0 else np.inf t1d_y = Ty + hy / np.sqrt(Dyy) if Dyy > 0.0 else np.inf return min(t1d_x, t1d_y) discr = b * b - 4.0 * a * c if discr < 0.0: # Fall back to 1D solutions and pick the best t1d_x = Tx + hx / np.sqrt(Dxx) if Dxx > 0.0 else np.inf t1d_y = Ty + hy / np.sqrt(Dyy) if Dyy > 0.0 else np.inf return min(t1d_x, t1d_y) new_t = (-b + np.sqrt(discr)) / (2.0 * a) # Causality: new_t must be >= both Tx and Ty if new_t >= Tx and new_t >= Ty: return new_t # else fall back t1d_x = Tx + hx / np.sqrt(Dxx) if Dxx > 0.0 else np.inf t1d_y = Ty + hy / np.sqrt(Dyy) if Dyy > 0.0 else np.inf return min(t1d_x, t1d_y) elif has_x: if Dxx > 0.0: return Tx + hx / np.sqrt(Dxx) return np.inf else: if Dyy > 0.0: return Ty + hy / np.sqrt(Dyy) return np.inf
@njit(cache=True)
[docs] def _evaluate_distance_anisotropic_8conn(i, j, fixed, where_compute, dx_mesh, dy_mesh, times, speed_xx, speed_xy, speed_yy): """ Evaluate the arrival time for an **anisotropic** eikonal equation using an **8-connected** stencil (Ordered Upwind Method). Solves (∇T)^T D (∇T) = 1 where D is the local metric tensor. Unlike :func:`_evaluate_distance_anisotropic` which only considers the 4 cardinal neighbours, this kernel considers all 8 neighbours (N, NE, E, SE, S, SW, W, NW) and forms **triangle updates** for each pair of adjacent fixed neighbours. For each triangle (P, A, B) where A and B are two adjacent fixed neighbours, the update solves:: T(P) = min_{λ ∈ [0,1]} [ λ T_A + (1-λ) T_B + ‖λ e_A + (1-λ) e_B‖_{D⁻¹} ] where e_A = A − P, e_B = B − P in physical coordinates. This properly captures **off-diagonal anisotropy** (D_xy ≠ 0) along diagonal directions, which a 4-connected stencil cannot resolve. Reference: Sethian & Vladimirsky (2003), "Ordered Upwind Methods for Static Hamilton–Jacobi Equations: Theory and Algorithms", SIAM J. Numer. Anal. :param speed_xx: 2D array, D_xx component of the metric tensor. :param speed_xy: 2D array, D_xy component (off-diagonal, symmetric). :param speed_yy: 2D array, D_yy component. :return: The new arrival time (float), or np.inf if no update. """ rows = fixed.shape[0] cols = fixed.shape[1] Dxx = speed_xx[i, j] Dxy = speed_xy[i, j] Dyy = speed_yy[i, j] # --- Invert the metric: M = D⁻¹ --- det_D = Dxx * Dyy - Dxy * Dxy if det_D <= 1e-30: return np.inf inv_det = 1.0 / det_D Mxx = Dyy * inv_det Mxy = -Dxy * inv_det Myy = Dxx * inv_det # --- 8 neighbours (clockwise): N, NE, E, SE, S, SW, W, NW --- # index k: 0 1 2 3 4 5 6 7 # row offset di: -1 -1 0 1 1 1 0 -1 # col offset dj: 0 1 1 1 0 -1 -1 -1 nb_di = np.empty(8, dtype=np.int64) nb_dj = np.empty(8, dtype=np.int64) nb_di[0] = -1; nb_di[1] = -1; nb_di[2] = 0; nb_di[3] = 1 nb_di[4] = 1; nb_di[5] = 1; nb_di[6] = 0; nb_di[7] = -1 nb_dj[0] = 0; nb_dj[1] = 1; nb_dj[2] = 1; nb_dj[3] = 1 nb_dj[4] = 0; nb_dj[5] = -1; nb_dj[6] = -1; nb_dj[7] = -1 # Gather validity and times for each neighbour nb_valid = np.zeros(8, dtype=np.int64) nb_t = np.full(8, np.inf) for k in range(8): ni = i + nb_di[k] nj = j + nb_dj[k] if 0 <= ni < rows and 0 <= nj < cols: if fixed[ni, nj] and not where_compute[ni, nj]: nb_valid[k] = 1 nb_t[k] = times[ni, nj] best = np.inf # --- 1D (single-neighbour) updates --- for k in range(8): if nb_valid[k]: # Physical edge vector from P to neighbour k ex = np.float64(nb_dj[k]) * dx_mesh ey = np.float64(nb_di[k]) * dy_mesh cost_sq = Mxx * ex * ex + 2.0 * Mxy * ex * ey + Myy * ey * ey if cost_sq > 0.0: cand = nb_t[k] + np.sqrt(cost_sq) if cand < best: best = cand # --- 2D triangle updates for adjacent neighbour pairs --- for k in range(8): k2 = (k + 1) % 8 if not (nb_valid[k] and nb_valid[k2]): continue Ta = nb_t[k] Tb = nb_t[k2] # Edge vectors from P to neighbours A (k) and B (k2) e1x = np.float64(nb_dj[k]) * dx_mesh e1y = np.float64(nb_di[k]) * dy_mesh e2x = np.float64(nb_dj[k2]) * dx_mesh e2y = np.float64(nb_di[k2]) * dy_mesh # Bilinear form coefficients with the slowness metric M = D⁻¹ # p = e₁ᵀ M e₁, q = e₁ᵀ M e₂, r = e₂ᵀ M e₂ p = Mxx * e1x * e1x + 2.0 * Mxy * e1x * e1y + Myy * e1y * e1y q = Mxx * e1x * e2x + Mxy * (e1x * e2y + e1y * e2x) + Myy * e1y * e2y r = Mxx * e2x * e2x + 2.0 * Mxy * e2x * e2y + Myy * e2y * e2y delta = Ta - Tb # T_A - T_B alpha = p - 2.0 * q + r # coefficient of λ² in Q(λ) beta = q - r # adjusted linear term pr_q2 = p * r - q * q # ≥ 0 by Cauchy-Schwarz if alpha < 1e-14 or pr_q2 < 0.0: continue # degenerate triangle in the metric d2 = delta * delta if d2 >= alpha: continue # no valid interior minimiser exists # Solve the optimality equation for λ: # λ = (-β ± |Δ|√(pr - q²) / √(α - Δ²)) / α sqrt_term = abs(delta) * np.sqrt(pr_q2 / (alpha - d2)) for s in range(2): if s == 0: lam = (-beta - sqrt_term) / alpha else: lam = (-beta + sqrt_term) / alpha if lam < 0.0 or lam > 1.0: continue # Evaluate Q(λ) = α λ² + 2 β λ + r Q_val = alpha * lam * lam + 2.0 * beta * lam + r if Q_val <= 0.0: continue d_lam = np.sqrt(Q_val) # Verify sign consistency (squaring introduced a spurious root) # Original: Δ · d(λ) = -(α λ + β) lhs = delta * d_lam rhs = -(alpha * lam + beta) tol = 1e-6 * (abs(lhs) + abs(rhs) + 1e-30) if abs(lhs - rhs) > tol: continue cand = Tb + lam * delta + d_lam if cand < best: best = cand return best
@njit(cache=True)
[docs] def _heap_push(heap_t, heap_i, heap_j, heap_size, t, row, col): """Push (t, row, col) onto the binary min-heap. Returns updated size. Uses a sift-up strategy: the new element is placed at the end then bubbled up until the heap invariant is restored. """ pos = heap_size heap_t[pos] = t heap_i[pos] = row heap_j[pos] = col while pos > 0: parent = (pos - 1) >> 1 if heap_t[parent] > heap_t[pos]: heap_t[parent], heap_t[pos] = heap_t[pos], heap_t[parent] heap_i[parent], heap_i[pos] = heap_i[pos], heap_i[parent] heap_j[parent], heap_j[pos] = heap_j[pos], heap_j[parent] pos = parent else: break return heap_size + 1
@njit(cache=True)
[docs] def _heap_pop(heap_t, heap_i, heap_j, heap_size): """Pop the minimum element from the binary min-heap. Returns (new_size, t, row, col). The last leaf replaces the root and is sifted down until the heap invariant holds. """ t = heap_t[0] row = heap_i[0] col = heap_j[0] heap_size -= 1 heap_t[0] = heap_t[heap_size] heap_i[0] = heap_i[heap_size] heap_j[0] = heap_j[heap_size] pos = np.int64(0) while True: left = 2 * pos + 1 right = 2 * pos + 2 smallest = pos if left < heap_size and heap_t[left] < heap_t[smallest]: smallest = left if right < heap_size and heap_t[right] < heap_t[smallest]: smallest = right if smallest != pos: heap_t[pos], heap_t[smallest] = heap_t[smallest], heap_t[pos] heap_i[pos], heap_i[smallest] = heap_i[smallest], heap_i[pos] heap_j[pos], heap_j[smallest] = heap_j[smallest], heap_j[pos] pos = smallest else: break return heap_size, t, row, col
@njit(cache=True)
[docs] def __solve_eikonal_with_data(sources, where_compute, base_data, test_data, speed, dx_mesh, dy_mesh): """Solve the Eikonal equation using the Fast Marching Method (FMM). Fully @njit-compiled version using a manual binary min-heap (three parallel numpy arrays: times, rows, cols) instead of Python's heapq module. This eliminates the CPython interpreter overhead on the hot FMM loop (heap operations + neighbor evaluation), typically yielding a x5-x20 speed-up on large grids. The first call incurs a one-time Numba compilation cost (~2-5 s); subsequent calls run at native speed. :param sources: 2D int64 array of shape (N, 2) — source (row, col). """ rows, cols = where_compute.shape # --- Fixed-point bookkeeping --- fixed = np.zeros((rows, cols), dtype=np.uint8) fixed[:, 0] = 1 fixed[:, -1] = 1 fixed[0, :] = 1 fixed[-1, :] = 1 time = np.full((rows, cols), np.inf) time[:, 0] = 0.0 time[:, -1] = 0.0 time[0, :] = 0.0 time[-1, :] = 0.0 # --- Binary min-heap (pre-allocated numpy arrays) --- # Capacity: each cell can be pushed at most once per cardinal # neighbour (lazy-deletion FMM), plus the initial sources. capacity = 4 * rows * cols + sources.shape[0] heap_t = np.empty(capacity, dtype=np.float64) heap_i = np.empty(capacity, dtype=np.int64) heap_j = np.empty(capacity, dtype=np.int64) heap_size = np.int64(0) # Push all source cells for k in range(sources.shape[0]): si = sources[k, 0] sj = sources[k, 1] time[si, sj] = 0.0 fixed[si, sj] = 1 heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, 0.0, si, sj) # --- Main FMM loop --- while heap_size > 0: heap_size, t, i, j = _heap_pop(heap_t, heap_i, heap_j, heap_size) fixed[i, j] = 1 where_compute[i, j] = 0.0 # this point is now fixed # Evaluate the 4 cardinal neighbours for d in range(4): if d == 0: new_i, new_j = i - 1, j elif d == 1: new_i, new_j = i + 1, j elif d == 2: new_i, new_j = i, j - 1 else: new_i, new_j = i, j + 1 if new_i < 0 or new_i >= rows or new_j < 0 or new_j >= cols: continue if fixed[new_i, new_j] or not where_compute[new_i, new_j]: continue new_t, new_data = _evaluate_distance_and_data_first_order_iso( new_i, new_j, fixed, where_compute, dx_mesh, dy_mesh, time, base_data, test_data, speed[new_i, new_j]) if new_t < time[new_i, new_j]: time[new_i, new_j] = new_t base_data[new_i, new_j] = new_data heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, new_t, new_i, new_j) return time
[docs] def _solve_eikonal_with_data(sources:list[tuple[int,int]], where_compute:np.ndarray=None, base_data:np.ndarray=None, test_data:np.ndarray=None, speed:np.ndarray = None, dx_mesh:float = 1.0, dy_mesh:float = 1.0) -> np.ndarray: """ Solve the Eikonal equation using the Fast Marching Method (FMM). :param sources: (list of tuples): The coordinates of the source points. :param where_compute: (2D numpy array, optional): A boolean array where True indicates cells to be included in computation. :param base_data: (2D numpy array, optional): The base data that will propagate. :param test_data: (2D numpy array, optional): The test data that will be used to validate propagation. :param speed: (2D numpy array): The speed function. :param dx_mesh: (float, optional): The mesh size in x direction. :param dy_mesh: (float, optional): The mesh size in y direction. :return: 2D numpy array The solution to the Eikonal equation. """ rows, cols = where_compute.shape assert base_data.shape == where_compute.shape assert test_data.shape == where_compute.shape if len(sources) == 0: logging.error("No sources provided") return np.zeros((rows, cols)) if speed is None: speed = np.ones((rows, cols)) # Convert sources to a contiguous int64 array for the @njit function. sources_arr = np.array(sources, dtype=np.int64).reshape(-1, 2) return __solve_eikonal_with_data(sources_arr, where_compute, base_data, test_data, speed, dx_mesh, dy_mesh)
@njit(cache=True)
[docs] def __solve_eikonal_with_data_second_order(sources, where_compute, base_data, test_data, speed, dx_mesh, dy_mesh): """Solve the Eikonal equation with **second-order** spatial accuracy. Same structure as :func:`__solve_eikonal_with_data` but calls :func:`_evaluate_distance_and_data_second_order_iso` which uses a quadratic upwind stencil (Sethian, 1999) whenever two consecutive upwind neighbours are available, falling back to first-order otherwise. :param sources: 2D int64 array of shape (N, 2) — source (row, col). """ rows, cols = where_compute.shape fixed = np.zeros((rows, cols), dtype=np.uint8) fixed[:, 0] = 1; fixed[:, -1] = 1 fixed[0, :] = 1; fixed[-1, :] = 1 time = np.full((rows, cols), np.inf) time[:, 0] = 0.0; time[:, -1] = 0.0 time[0, :] = 0.0; time[-1, :] = 0.0 capacity = 4 * rows * cols + sources.shape[0] heap_t = np.empty(capacity, dtype=np.float64) heap_i = np.empty(capacity, dtype=np.int64) heap_j = np.empty(capacity, dtype=np.int64) heap_size = np.int64(0) for k in range(sources.shape[0]): si, sj = sources[k, 0], sources[k, 1] time[si, sj] = 0.0 fixed[si, sj] = 1 heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, 0.0, si, sj) while heap_size > 0: heap_size, t, i, j = _heap_pop(heap_t, heap_i, heap_j, heap_size) fixed[i, j] = 1 where_compute[i, j] = 0.0 for d in range(4): if d == 0: new_i, new_j = i - 1, j elif d == 1: new_i, new_j = i + 1, j elif d == 2: new_i, new_j = i, j - 1 else: new_i, new_j = i, j + 1 if new_i < 0 or new_i >= rows or new_j < 0 or new_j >= cols: continue if fixed[new_i, new_j] or not where_compute[new_i, new_j]: continue new_t, new_data = _evaluate_distance_and_data_second_order_iso( new_i, new_j, fixed, where_compute, dx_mesh, dy_mesh, time, base_data, test_data, speed[new_i, new_j]) if new_t < time[new_i, new_j]: time[new_i, new_j] = new_t base_data[new_i, new_j] = new_data heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, new_t, new_i, new_j) return time
[docs] def _solve_eikonal_with_data_second_order(sources:list[tuple[int,int]], where_compute:np.ndarray=None, base_data:np.ndarray=None, test_data:np.ndarray=None, speed:np.ndarray = None, dx_mesh:float = 1.0, dy_mesh:float = 1.0) -> np.ndarray: """ Solve the Eikonal equation with second-order spatial accuracy. API-compatible with :func:`_solve_eikonal_with_data`. Uses a quadratic upwind stencil when two consecutive upwind neighbours are available, falling back to first order otherwise. :param sources: (list of tuples): The coordinates of the source points. :param where_compute: (2D numpy array): A boolean array where True indicates cells to be included in computation. :param base_data: (2D numpy array): The base data that will propagate. :param test_data: (2D numpy array): The test data that will be used to validate propagation. :param speed: (2D numpy array): The speed function. :param dx_mesh: (float, optional): The mesh size in x direction. :param dy_mesh: (float, optional): The mesh size in y direction. :return: 2D numpy array The solution to the Eikonal equation. """ rows, cols = where_compute.shape assert base_data.shape == where_compute.shape assert test_data.shape == where_compute.shape if len(sources) == 0: logging.error("No sources provided") return np.zeros((rows, cols)) if speed is None: speed = np.ones((rows, cols)) sources_arr = np.array(sources, dtype=np.int64).reshape(-1, 2) return __solve_eikonal_with_data_second_order(sources_arr, where_compute, base_data, test_data, speed, dx_mesh, dy_mesh)
@njit(cache=True)
[docs] def __solve_eikonal_anisotropic(sources, where_compute, speed_xx, speed_xy, speed_yy, dx_mesh, dy_mesh): """Solve the **anisotropic** Eikonal equation using FMM. Computes arrival times for (∇T)^T D (∇T) = 1 where D is a spatially-varying 2×2 symmetric positive-definite metric tensor given component-wise by ``speed_xx``, ``speed_xy``, ``speed_yy``. Uses an **8-connected** stencil (Ordered Upwind Method) with triangle updates to properly capture off-diagonal anisotropy. When a cell is fixed, all 8 neighbours are notified; each neighbour's update considers the 8 adjacent-pair triangles in :func:`_evaluate_distance_anisotropic_8conn`. For the isotropic case, set ``speed_xx = speed_yy = speed²`` and ``speed_xy = 0``. This variant does **not** propagate data — it only computes T. :param sources: 2D int64 array of shape (N, 2) — source (row, col). :param speed_xx: 2D array, D_xx component. :param speed_xy: 2D array, D_xy component (symmetric off-diagonal). :param speed_yy: 2D array, D_yy component. """ rows, cols = where_compute.shape fixed = np.zeros((rows, cols), dtype=np.uint8) fixed[:, 0] = 1; fixed[:, -1] = 1 fixed[0, :] = 1; fixed[-1, :] = 1 time = np.full((rows, cols), np.inf) time[:, 0] = 0.0; time[:, -1] = 0.0 time[0, :] = 0.0; time[-1, :] = 0.0 # 8 neighbour offsets (same order as in the kernel) exp_di = np.empty(8, dtype=np.int64) exp_dj = np.empty(8, dtype=np.int64) exp_di[0] = -1; exp_di[1] = -1; exp_di[2] = 0; exp_di[3] = 1 exp_di[4] = 1; exp_di[5] = 1; exp_di[6] = 0; exp_di[7] = -1 exp_dj[0] = 0; exp_dj[1] = 1; exp_dj[2] = 1; exp_dj[3] = 1 exp_dj[4] = 0; exp_dj[5] = -1; exp_dj[6] = -1; exp_dj[7] = -1 capacity = 8 * rows * cols + sources.shape[0] heap_t = np.empty(capacity, dtype=np.float64) heap_i = np.empty(capacity, dtype=np.int64) heap_j = np.empty(capacity, dtype=np.int64) heap_size = np.int64(0) for k in range(sources.shape[0]): si, sj = sources[k, 0], sources[k, 1] time[si, sj] = 0.0 fixed[si, sj] = 1 heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, 0.0, si, sj) while heap_size > 0: heap_size, t, i, j = _heap_pop(heap_t, heap_i, heap_j, heap_size) fixed[i, j] = 1 where_compute[i, j] = 0.0 # Expand to all 8 neighbours for d in range(8): new_i = i + exp_di[d] new_j = j + exp_dj[d] if new_i < 0 or new_i >= rows or new_j < 0 or new_j >= cols: continue if fixed[new_i, new_j] or not where_compute[new_i, new_j]: continue new_t = _evaluate_distance_anisotropic_8conn( new_i, new_j, fixed, where_compute, dx_mesh, dy_mesh, time, speed_xx, speed_xy, speed_yy) if new_t < time[new_i, new_j]: time[new_i, new_j] = new_t heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, new_t, new_i, new_j) return time
[docs] def _solve_eikonal_anisotropic(sources:list[tuple[int,int]], where_compute:np.ndarray, speed_xx:np.ndarray, speed_xy:np.ndarray, speed_yy:np.ndarray, dx_mesh:float = 1.0, dy_mesh:float = 1.0) -> np.ndarray: """ Solve the **anisotropic** Eikonal equation using the Fast Marching Method. Computes arrival times T such that:: (∇T)^T D (∇T) = 1 where D(x,y) is a spatially-varying 2×2 symmetric positive-definite metric tensor. The three upper-triangle components are given as separate 2-D arrays. For direction-dependent speed, build D from the speed ellipse: if the speed along angle θ is ``s_max`` and perpendicular is ``s_min``, then:: D = R(θ) diag(s_max², s_min²) R(θ)^T where R is the rotation matrix. :param sources: list of (row, col) source locations. :param where_compute: boolean 2D array — True where propagation is allowed. :param speed_xx: 2D array, D_xx component of metric tensor. :param speed_xy: 2D array, D_xy component (off-diagonal). :param speed_yy: 2D array, D_yy component. :param dx_mesh: mesh spacing in x (column) direction. :param dy_mesh: mesh spacing in y (row) direction. :return: 2D array of arrival times T. Example — isotropic equivalent:: speed = 2.0 # m/s Dxx = Dyy = speed**2 * np.ones(shape) Dxy = np.zeros(shape) T = solve_eikonal_anisotropic(sources, mask, Dxx, Dxy, Dyy) Example — preferential direction:: theta = np.pi / 4 # 45 degrees s_max, s_min = 3.0, 1.0 # m/s c, s = np.cos(theta), np.sin(theta) Dxx = (s_max*c)**2 + (s_min*s)**2 Dxy = (s_max**2 - s_min**2) * c * s Dyy = (s_max*s)**2 + (s_min*c)**2 T = solve_eikonal_anisotropic(sources, mask, Dxx, Dxy, Dyy) """ rows, cols = where_compute.shape assert speed_xx.shape == (rows, cols) assert speed_xy.shape == (rows, cols) assert speed_yy.shape == (rows, cols) if len(sources) == 0: logging.error("No sources provided") return np.zeros((rows, cols)) sources_arr = np.array(sources, dtype=np.int64).reshape(-1, 2) return __solve_eikonal_anisotropic(sources_arr, where_compute, speed_xx, speed_xy, speed_yy, dx_mesh, dy_mesh)
@njit(cache=True)
[docs] def __solve_eikonal_anisotropic_4conn(sources, where_compute, speed_xx, speed_xy, speed_yy, dx_mesh, dy_mesh): """Solve the anisotropic Eikonal equation using the legacy 4-connected stencil. Kept for comparison with the 8-connected OUM solver. See :func:`__solve_eikonal_anisotropic` for the full docstring. """ rows, cols = where_compute.shape fixed = np.zeros((rows, cols), dtype=np.uint8) fixed[:, 0] = 1; fixed[:, -1] = 1 fixed[0, :] = 1; fixed[-1, :] = 1 time = np.full((rows, cols), np.inf) time[:, 0] = 0.0; time[:, -1] = 0.0 time[0, :] = 0.0; time[-1, :] = 0.0 capacity = 4 * rows * cols + sources.shape[0] heap_t = np.empty(capacity, dtype=np.float64) heap_i = np.empty(capacity, dtype=np.int64) heap_j = np.empty(capacity, dtype=np.int64) heap_size = np.int64(0) for k in range(sources.shape[0]): si, sj = sources[k, 0], sources[k, 1] time[si, sj] = 0.0 fixed[si, sj] = 1 heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, 0.0, si, sj) while heap_size > 0: heap_size, t, i, j = _heap_pop(heap_t, heap_i, heap_j, heap_size) fixed[i, j] = 1 where_compute[i, j] = 0.0 for d in range(4): if d == 0: new_i, new_j = i - 1, j elif d == 1: new_i, new_j = i + 1, j elif d == 2: new_i, new_j = i, j - 1 else: new_i, new_j = i, j + 1 if new_i < 0 or new_i >= rows or new_j < 0 or new_j >= cols: continue if fixed[new_i, new_j] or not where_compute[new_i, new_j]: continue new_t = _evaluate_distance_anisotropic( new_i, new_j, fixed, where_compute, dx_mesh, dy_mesh, time, speed_xx, speed_xy, speed_yy) if new_t < time[new_i, new_j]: time[new_i, new_j] = new_t heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, new_t, new_i, new_j) return time
[docs] def _solve_eikonal_anisotropic_4conn(sources:list[tuple[int,int]], where_compute:np.ndarray, speed_xx:np.ndarray, speed_xy:np.ndarray, speed_yy:np.ndarray, dx_mesh:float = 1.0, dy_mesh:float = 1.0) -> np.ndarray: """Solve anisotropic Eikonal with the legacy **4-connected** stencil. Kept for benchmarking and comparison with the 8-connected OUM solver :func:`solve_eikonal_anisotropic`. Same parameters as :func:`solve_eikonal_anisotropic`. """ rows, cols = where_compute.shape if len(sources) == 0: return np.zeros((rows, cols)) sources_arr = np.array(sources, dtype=np.int64).reshape(-1, 2) return __solve_eikonal_anisotropic_4conn(sources_arr, where_compute, speed_xx, speed_xy, speed_yy, dx_mesh, dy_mesh)
# ============================================================ # Anisotropic 8-connected solver **with data propagation** # ============================================================ @njit(cache=True)
[docs] def _evaluate_distance_and_data_anisotropic_8conn( i, j, fixed, where_compute, dx_mesh, dy_mesh, times, base_data, test_data, speed_xx, speed_xy, speed_yy): """Evaluate arrival time **and** propagated data for an anisotropic eikonal equation on an 8-connected stencil. Combines the triangle-update logic of :func:`_evaluate_distance_anisotropic_8conn` with the data-propagation mechanism of :func:`_evaluate_distance_and_data_first_order_iso`. Data propagation rule: * Only fixed neighbours whose ``base_data > test_data[i, j]`` are eligible (same filter as the isotropic data kernel). * The propagated data is the **inverse-distance–weighted average** of the eligible neighbours' ``base_data``, where the distance is the physical edge length to that neighbour. This gives more weight to closer (cardinal) neighbours than to diagonal ones. * If no eligible neighbour exists, ``(inf, test_data[i, j])`` is returned so that the update is rejected. :return: ``(new_time, new_data)`` """ rows = fixed.shape[0] cols = fixed.shape[1] Dxx = speed_xx[i, j] Dxy = speed_xy[i, j] Dyy = speed_yy[i, j] # --- Invert the metric: M = D⁻¹ --- det_D = Dxx * Dyy - Dxy * Dxy if det_D <= 1e-30: return np.inf, test_data[i, j] inv_det = 1.0 / det_D Mxx = Dyy * inv_det Mxy = -Dxy * inv_det Myy = Dxx * inv_det # --- 8 neighbours --- nb_di = np.empty(8, dtype=np.int64) nb_dj = np.empty(8, dtype=np.int64) nb_di[0] = -1; nb_di[1] = -1; nb_di[2] = 0; nb_di[3] = 1 nb_di[4] = 1; nb_di[5] = 1; nb_di[6] = 0; nb_di[7] = -1 nb_dj[0] = 0; nb_dj[1] = 1; nb_dj[2] = 1; nb_dj[3] = 1 nb_dj[4] = 0; nb_dj[5] = -1; nb_dj[6] = -1; nb_dj[7] = -1 nb_valid = np.zeros(8, dtype=np.int64) nb_t = np.full(8, np.inf) # Track data eligibility separately (test_data filter) nb_data_ok = np.zeros(8, dtype=np.int64) for k in range(8): ni = i + nb_di[k] nj = j + nb_dj[k] if 0 <= ni < rows and 0 <= nj < cols: if fixed[ni, nj] and not where_compute[ni, nj]: nb_valid[k] = 1 nb_t[k] = times[ni, nj] if base_data[ni, nj] > test_data[i, j]: nb_data_ok[k] = 1 # --- Data: inverse-distance-weighted average of eligible neighbours --- sum_w = 0.0 sum_wd = 0.0 for k in range(8): if nb_valid[k] and nb_data_ok[k]: ni = i + nb_di[k] nj = j + nb_dj[k] edge_len = np.sqrt((nb_dj[k] * dx_mesh)**2 + (nb_di[k] * dy_mesh)**2) w = 1.0 / edge_len sum_w += w sum_wd += w * base_data[ni, nj] if sum_w == 0.0: return np.inf, test_data[i, j] new_data = sum_wd / sum_w if new_data < test_data[i, j]: return np.inf, test_data[i, j] # --- Time: same algorithm as _evaluate_distance_anisotropic_8conn --- # Uses ALL fixed neighbours for the time update (not filtered by data). best = np.inf # 1D updates for k in range(8): if nb_valid[k]: ex = np.float64(nb_dj[k]) * dx_mesh ey = np.float64(nb_di[k]) * dy_mesh cost_sq = Mxx * ex * ex + 2.0 * Mxy * ex * ey + Myy * ey * ey if cost_sq > 0.0: cand = nb_t[k] + np.sqrt(cost_sq) if cand < best: best = cand # 2D triangle updates for k in range(8): k2 = (k + 1) % 8 if not (nb_valid[k] and nb_valid[k2]): continue Ta = nb_t[k] Tb = nb_t[k2] e1x = np.float64(nb_dj[k]) * dx_mesh e1y = np.float64(nb_di[k]) * dy_mesh e2x = np.float64(nb_dj[k2]) * dx_mesh e2y = np.float64(nb_di[k2]) * dy_mesh p = Mxx * e1x * e1x + 2.0 * Mxy * e1x * e1y + Myy * e1y * e1y q = Mxx * e1x * e2x + Mxy * (e1x * e2y + e1y * e2x) + Myy * e1y * e2y r = Mxx * e2x * e2x + 2.0 * Mxy * e2x * e2y + Myy * e2y * e2y delta = Ta - Tb alpha = p - 2.0 * q + r beta = q - r pr_q2 = p * r - q * q if alpha < 1e-14 or pr_q2 < 0.0: continue d2 = delta * delta if d2 >= alpha: continue sqrt_term = abs(delta) * np.sqrt(pr_q2 / (alpha - d2)) for s in range(2): if s == 0: lam = (-beta - sqrt_term) / alpha else: lam = (-beta + sqrt_term) / alpha if lam < 0.0 or lam > 1.0: continue Q_val = alpha * lam * lam + 2.0 * beta * lam + r if Q_val <= 0.0: continue d_lam = np.sqrt(Q_val) lhs = delta * d_lam rhs = -(alpha * lam + beta) tol = 1e-6 * (abs(lhs) + abs(rhs) + 1e-30) if abs(lhs - rhs) > tol: continue cand = Tb + lam * delta + d_lam if cand < best: best = cand return best, new_data
@njit(cache=True)
[docs] def __solve_eikonal_anisotropic_with_data(sources, where_compute, base_data, test_data, speed_xx, speed_xy, speed_yy, dx_mesh, dy_mesh): """Solve the anisotropic Eikonal equation with data propagation. Combines the 8-connected OUM FMM loop of :func:`__solve_eikonal_anisotropic` with data propagation as in :func:`__solve_eikonal_with_data`. ``base_data`` is modified **in-place**: at each newly fixed cell, the propagated data value replaces the previous content. :param sources: 2D int64 array of shape (N, 2) — source (row, col). :param base_data: 2D array — data to propagate (modified in-place). :param test_data: 2D array — validation threshold for propagation. """ rows, cols = where_compute.shape fixed = np.zeros((rows, cols), dtype=np.uint8) fixed[:, 0] = 1; fixed[:, -1] = 1 fixed[0, :] = 1; fixed[-1, :] = 1 time = np.full((rows, cols), np.inf) time[:, 0] = 0.0; time[:, -1] = 0.0 time[0, :] = 0.0; time[-1, :] = 0.0 # 8 neighbour offsets exp_di = np.empty(8, dtype=np.int64) exp_dj = np.empty(8, dtype=np.int64) exp_di[0] = -1; exp_di[1] = -1; exp_di[2] = 0; exp_di[3] = 1 exp_di[4] = 1; exp_di[5] = 1; exp_di[6] = 0; exp_di[7] = -1 exp_dj[0] = 0; exp_dj[1] = 1; exp_dj[2] = 1; exp_dj[3] = 1 exp_dj[4] = 0; exp_dj[5] = -1; exp_dj[6] = -1; exp_dj[7] = -1 capacity = 8 * rows * cols + sources.shape[0] heap_t = np.empty(capacity, dtype=np.float64) heap_i = np.empty(capacity, dtype=np.int64) heap_j = np.empty(capacity, dtype=np.int64) heap_size = np.int64(0) for k in range(sources.shape[0]): si, sj = sources[k, 0], sources[k, 1] time[si, sj] = 0.0 fixed[si, sj] = 1 heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, 0.0, si, sj) while heap_size > 0: heap_size, t, i, j = _heap_pop(heap_t, heap_i, heap_j, heap_size) fixed[i, j] = 1 where_compute[i, j] = 0.0 for d in range(8): new_i = i + exp_di[d] new_j = j + exp_dj[d] if new_i < 0 or new_i >= rows or new_j < 0 or new_j >= cols: continue if fixed[new_i, new_j] or not where_compute[new_i, new_j]: continue new_t, new_data = _evaluate_distance_and_data_anisotropic_8conn( new_i, new_j, fixed, where_compute, dx_mesh, dy_mesh, time, base_data, test_data, speed_xx, speed_xy, speed_yy) if new_t < time[new_i, new_j]: time[new_i, new_j] = new_t base_data[new_i, new_j] = new_data heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, new_t, new_i, new_j) return time
[docs] def _solve_eikonal_anisotropic_with_data( sources: list[tuple[int, int]], where_compute: np.ndarray, base_data: np.ndarray, test_data: np.ndarray, speed_xx: np.ndarray, speed_xy: np.ndarray, speed_yy: np.ndarray, dx_mesh: float = 1.0, dy_mesh: float = 1.0) -> np.ndarray: """Solve the **anisotropic** Eikonal equation with **data propagation**. Computes arrival times T such that:: (∇T)^T D (∇T) = 1 and simultaneously propagates ``base_data`` outward from the sources along the FMM wavefront. At each newly reached cell, the data value is set to the inverse-distance–weighted average of its eligible fixed neighbours (those with ``base_data[neighbour] > test_data[cell]``). ``base_data`` is modified **in-place**. :param sources: list of (row, col) source locations. :param where_compute: boolean 2D array — 1.0 where propagation is allowed, 0.0 for frozen cells (sources, obstacles). :param base_data: 2D array of data to propagate (modified in-place). :param test_data: 2D array — propagation is accepted only when the candidate data value exceeds ``test_data[i, j]``. :param speed_xx: 2D array, D_xx component of metric tensor. :param speed_xy: 2D array, D_xy component (off-diagonal). :param speed_yy: 2D array, D_yy component. :param dx_mesh: mesh spacing in x (column) direction. :param dy_mesh: mesh spacing in y (row) direction. :return: 2D array of arrival times T. """ rows, cols = where_compute.shape assert base_data.shape == (rows, cols) assert test_data.shape == (rows, cols) assert speed_xx.shape == (rows, cols) assert speed_xy.shape == (rows, cols) assert speed_yy.shape == (rows, cols) if len(sources) == 0: logging.error("No sources provided") return np.zeros((rows, cols)) sources_arr = np.array(sources, dtype=np.int64).reshape(-1, 2) return __solve_eikonal_anisotropic_with_data( sources_arr, where_compute, base_data, test_data, speed_xx, speed_xy, speed_yy, dx_mesh, dy_mesh)
[docs] def _extract_patch_slices_with_margin(shape, labels, margin= 2): """ Extract the slices of the patches with a margin around the labels. :param shape: (tuple): The shape of the array. :param labels: (2D numpy array): The labels of the patches. :param margin: (int, optional): The margin around the labels. """ slices = scipy.ndimage.find_objects(labels) patches_slices = [] for s in slices: sl1 = slice(max(0, s[0].start - margin), min(shape[0], s[0].stop + margin)) sl2 = slice(max(0, s[1].start - margin), min(shape[1], s[1].stop + margin)) patches_slices.append((sl1, sl2)) return patches_slices
[docs] def _process_submatrix(args): """ Function to process a submatrix in a multiprocess. """ id_label, speed, where_compute, base_data, test_data, labels, dx, dy, NoData = args # Ignore the border labels[:,0] = -1 labels[:,-1] = -1 labels[0,:] = -1 labels[-1,:] = -1 sources = np.argwhere(np.logical_and(labels == 0, base_data != NoData)) if len(sources) == 0: return (None, None) return (args, _solve_eikonal_with_data(sources, where_compute, base_data, test_data, speed, dx, dy))
[docs] def _solve_eikonal_with_value_on_submatrices(where_compute:np.ndarray, base_data:np.ndarray, test_data:np.ndarray = None, speed:np.ndarray = None, multiprocess:bool = False, dx:float = 1., dy:float = 1., ignore_last_patches:int = 1, NoDataValue:float = 0.) -> np.ndarray: """ Propagate data inside the mask using the Fast Marching Method (FMM). "base_data" will be updated with the new values. :param where_compute: (2D numpy array): A boolean array where True indicates cells to be included in computation. :param base_data: (2D numpy array): The base data that will propagate. :param test_data: (2D numpy array, optional): The test data that will be used to validate propagation (we used upstream data only if base_data > test_data). :param speed: (2D numpy array, optional): The isotropic propagation speed. :param multiprocess: (bool, optional): Whether to use multiprocessing. :param dx: (float, optional): The mesh size in x direction. :param dy: (float, optional): The mesh size in y direction. :return: 2D numpy array The solution to the Eikonal equation. """ # Labelling the patches labels, numfeatures = scipy.ndimage.label(where_compute) if ignore_last_patches > 0: # counts the number of cells in each patch sizes = np.bincount(labels.ravel()) # get the indices of the patches sorted by size indices = np.argsort(sizes[1:])[::-1] # ignore the last patches for idx in indices[:ignore_last_patches]: where_compute[labels == idx+1] = 0 # idx +1 because np.argsort starts at 1, so "indices" are shifted by 1 # relabel the patches labels, numfeatures = scipy.ndimage.label(where_compute) logging.info(f"Ignoring {ignore_last_patches} last patches.") logging.info(f"Number of patches: {numfeatures}") if numfeatures == 0: logging.warning("No patch to compute.") return np.zeros_like(where_compute) if speed is None: logging.debug("Speed not provided. Using default value of 1.") speed = np.ones_like(where_compute) if test_data is None: logging.debug("Test data not provided. Using -inf.") test_data = np.full_like(base_data, -np.inf) # Extract the slices of the patches with a margin of 2 cells around the labels. # 2 cells are added to avoid test during computation. # The external border will be ignored. patch_slices = _extract_patch_slices_with_margin(where_compute.shape, labels) # Prepare the submatrices to be processed subs = [(idx+1, speed[cur], where_compute[cur], base_data[cur], test_data[cur], labels[cur].copy(), dx, dy, NoDataValue) for idx, cur in enumerate(patch_slices)] if multiprocess: # In multiprocess mode, the base_data is updated in a local copy of the submatrix. # We need to merge the results at the end. with mp.Pool(processes=max(min(mp.cpu_count(), numfeatures),1)) as pool: results = pool.map(_process_submatrix, subs) time = np.zeros_like(where_compute) for slice, (sub, result) in zip(patch_slices, results): if result is None: continue useful_result = sub[3] != NoDataValue base_data[slice][useful_result] = sub[3][useful_result] time[slice][useful_result] = result[useful_result] else: # In single process mode, the base_data is updated in place. # We do not need to merge the results but only the time. # results = [_process_submatrix(sub) for sub in subs] results = [] for sub in tqdm(subs): results.append(_process_submatrix(sub)) time = np.zeros_like(where_compute) for slice, (sub, result) in zip(patch_slices, results): if result is None: continue useful_result = result != NoDataValue time[slice][useful_result] = result[useful_result] return time
[docs] def count_holes(mask:np.ndarray = None): """ Count the number of holes in the mask. """ labels, numfeatures = scipy.ndimage.label(mask) return numfeatures
[docs] def inpaint_array(data:np.ndarray | np.ma.MaskedArray, where_compute:np.ndarray, test:np.ndarray, ignore_last_patches:int = 1, inplace:bool = True, dx:float = 1., dy:float = 1., NoDataValue:float = 0., multiprocess:bool = True) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Inpaint the array using the Fast Marching Method (FMM). Main idea: - We have a 2D array "data" that we want to inpaint. - We have a 2D array "test" that will be used to validate the inpainting. - We have a 2D array "where_compute" that indicates where to compute the inpainting. - We will use the FMM to propagate the data inside the mask. - We will update the data only if the new value is greater than the test data. - (We can ignore n greatest patches to avoid computation in some large areas.) :param data: (2D numpy array): The water level to inpaint. :param mask: (2D numpy array): The simulation's Digital Elevation Model (DEM). :param test: (2D numpy array, optional): The digital terrain model (DTM). :param ignore_last_patches: (int, optional): The number of last patches to ignore. :param inplace: (bool, optional): Whether to update the water_level in place. :param dx: (float, optional): The mesh size in x direction. :param dy: (float, optional): The mesh size in y direction. :param NoDataValue: (float, optional): The NoDataValue, used if mask is not explicitly provided (mask atribute or water_level as a Numpy MaskedArray). Default is 0. :param multiprocess: (bool, optional): Whether to use multiprocessing. """ if inplace: if isinstance(data, np.ma.MaskedArray): base_data = data.data else: base_data = data else: if isinstance(data, np.ma.MaskedArray): base_data = data.data.copy() else: base_data = data.copy() assert where_compute.shape == base_data.shape assert test.shape == base_data.shape time = _solve_eikonal_with_value_on_submatrices(where_compute, base_data, test, speed=None, dx=dx, dy=dy, ignore_last_patches=ignore_last_patches, NoDataValue=NoDataValue, multiprocess=multiprocess) if inplace: if isinstance(data, np.ma.MaskedArray): data.mask[:,:] = base_data == NoDataValue extra = np.ma.masked_array(base_data - test, mask=data.mask.copy()) extra.data[extra.data <= 0.] = NoDataValue extra.mask[extra.data == NoDataValue] = True else: extra = base_data - test extra[extra <= 0.] = NoDataValue else: if isinstance(data, np.ma.MaskedArray): data = np.ma.masked_array(base_data, mask=base_data == NoDataValue) extra = np.ma.masked_array(base_data - test, mask=data.mask) extra.data[extra.data <= 0.] = NoDataValue extra.mask[extra.data == NoDataValue] = True else: extra = base_data - test extra[extra <= 0.] = NoDataValue return time, data, extra
[docs] def inpaint_waterlevel(water_level:np.ndarray | np.ma.MaskedArray, dem:np.ndarray, dtm:np.ndarray, ignore_last_patches:int = 1, inplace:bool = True, dx:float = 1., dy:float = 1., NoDataValue:float = 0., multiprocess:bool = True, epsilon:float = 1e-3) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Inpaint the water level using the Fast Marching Method (FMM). Similar to the HOLES.EXE Fortran program. Assumptions: - The simulations are in a steady state. - The flood extent is limited by: - natural topography (where DEM == DTM) - buildings or blocks of buildings - protective structures The goal is to propagate the free surface elevations into the buildings. We calculate the difference between the DEM (including buildings and walls) and the DTM to identify where the buildings are. Specifically: - if it is natural topography, the differences will be zero or almost zero - if it is a building or anthropogenic structure, the differences are significant We set the elevations of the cells where the difference is zero to a value unreachable by the flood. Thus, only buildings in contact with the flood will be affected and filled. HOLES.EXE vs Python code: - In "holes.exe", we must provide "in", "mask", and "dem" files: - "in" is the water level - "dem" is the digital terrain model (DTM) associated with the simulation's topo-bathymetry (not the topo-bathymetry itself) - "mask" is the DTM where the buildings are identified and a large value (above the maximum water level) if the cell is not inside a building. - The patches are identified by the water level "0.0" in the "in" file. - FMM is used and new water levels are calculated and retained if greater than the "mask" value. - The DTM is only used in the final part when evaluating the new water depths (Z-DTM). - In Python, we must provide the water level, the DEM, and the DTM: - The patches will be identified by the buildings array (filtered DEM - DTM). - FMM is used and new water levels are calculated and retained if greater than the DTM value. - FMM is only propagated in the cells where "buildings = True". - Final results must be the same even if the algorithm is a little bit different. - Fortran code demands the "mask" file to be provided (pre-computed/modified by the user), but in Python, we calculate it automatically from the DEM and DTM. - We can use "inpaint_array" to be more flexible... by manually providing a "where_compute" and a "test" arrays :param water_level: (2D numpy array): The water level to inpaint. :param dem: (2D numpy array): The simulation's Digital Elevation Model (DEM). :param dtm: (2D numpy array, optional): The digital terrain model (DTM). :param ignore_last_patches: (int, optional): The number of last patches to ignore. :param inplace: (bool, optional): Whether to update the water_level in place. :param dx: (float, optional): The mesh size in x direction. :param dy: (float, optional): The mesh size in y direction. :param NoDataValue: (float, optional): The NoDataValue, used if mask is not explicitly provided (mask attribute or water_level as a Numpy MaskedArray). Default is 0. :param multiprocess: (bool, optional): Whether to use multiprocessing. :param epsilon: (float, optional): The minimum value to consider that a water height is present. """ if inplace: if isinstance(water_level, np.ma.MaskedArray): base_data = water_level.data else: base_data = water_level else: if isinstance(water_level, np.ma.MaskedArray): base_data = water_level.data.copy() else: base_data = water_level.copy() assert dem.shape == base_data.shape assert dtm.shape == base_data.shape # Create the mask where we can fill the water level # first we identify the buildings by the difference between the DEM and the DTM buildings = dem - dtm # If DTM is above DEM, we set the value to 0 if np.any(buildings < 0.) > 0: logging.warning("Some cells in the DTM are above the DEM.") logging.info("Setting these values to 0.") buildings[buildings < 0.] = 0. # If DTM is below DEM, we set the value to 1 buildings[buildings <= epsilon] = 0. buildings[buildings > 0.] = 1. # We interpolate only if building cells are not already in the water_level comp = np.logical_and(buildings == 1., base_data != NoDataValue) if np.any(comp): logging.warning("Some building cells are already flooded.") logging.info("Ignoring these cells in the interpolation.") buildings = np.logical_and(buildings, base_data == NoDataValue) time = _solve_eikonal_with_value_on_submatrices(buildings, base_data, dtm, speed=None, dx=dx, dy=dy, ignore_last_patches=ignore_last_patches, NoDataValue=NoDataValue, multiprocess=multiprocess) if inplace: if isinstance(water_level, np.ma.MaskedArray): water_level.mask[:,:] = base_data == NoDataValue water_height = np.ma.masked_array(base_data - dtm, mask=water_level.mask.copy()) water_height.data[water_height.data <= 0.] = NoDataValue water_height.mask[water_height.data == NoDataValue] = True else: water_height = base_data - dtm water_height[water_height <= 0.] = NoDataValue else: if isinstance(water_level, np.ma.MaskedArray): water_level = np.ma.masked_array(base_data, mask=base_data == NoDataValue) water_height = np.ma.masked_array(base_data - dtm, mask=water_level.mask) water_height.data[water_height.data <= 0.] = NoDataValue water_height.mask[water_height.data == NoDataValue] = True else: water_height = base_data - dtm water_height[water_height <= 0.] = NoDataValue return time, water_level, water_height
# ===================================================================== # ANISOTROPIC EIKONAL WITH DRIFT (Riemannian + advection) # ===================================================================== # # Solves: sqrt( (∇T)^T M (∇T) ) + v · ∇T = 1 # # where M = D⁻¹ is the inverse metric and v = (vx, vy) is a # spatially-varying drift/advection field. # # This produces NON-RECIPROCAL propagation: downstream is faster # than upstream. The FMM remains valid as long as ||v||_D < 1 # (sub-critical drift), i.e. the effective speed remains positive # in all directions. # # The 1D single-neighbour update for edge vector e = (ex, ey): # # T(P) = T(A) + [ sqrt(e^T M e - (v·e)²) + v·e ] / (1 - ||v||²_D) # ... solved via the standard quadratic with linear term. # # Simplification: let c = e^T M e, d = v · e. # Then T(P) = T(A) + sqrt(c) * [sqrt(1 - d²/c) + d/sqrt(c)] # but we use the direct approach below. # ===================================================================== @njit(cache=True)
[docs] def _evaluate_distance_aniso_drift_8conn( i, j, fixed, where_compute, dx_mesh, dy_mesh, times, speed_xx, speed_xy, speed_yy, drift_vx, drift_vy): """Evaluate arrival time for anisotropic eikonal with drift on 8-conn stencil. Solves sqrt( (∇T)^T M (∇T) ) + v · ∇T = 1 For a single edge from fixed neighbour A to point P with edge vector e = A - P (in physical coords): ∇T ≈ (T_P - T_A) / ||e||² * e (1D approximation) Substituting into the equation and solving for T_P: T_P = T_A + [ -v·e + sqrt( e^T M e - (v·e)² + e^T M e * (v·e / ||e||_M)² ) ] ... which simplifies (see derivation) to solving a 1D quadratic for each edge direction. For triangle updates (A, B adjacent fixed neighbours), we use the parametric approach T = λ T_A + (1-λ) T_B + cost(λ) where cost includes the drift correction. :param drift_vx: 2D array, x-component of drift velocity at each cell. :param drift_vy: 2D array, y-component of drift velocity at each cell. :return: new arrival time (float), or np.inf if no valid update. """ rows = fixed.shape[0] cols = fixed.shape[1] Dxx = speed_xx[i, j] Dxy = speed_xy[i, j] Dyy = speed_yy[i, j] vx = drift_vx[i, j] vy = drift_vy[i, j] # Check D is positive-definite det_D = Dxx * Dyy - Dxy * Dxy if det_D <= 1e-30: return np.inf # Use D directly for the norm √(∇T^T D ∇T) (same convention as base aniso) # 8 neighbours nb_di = np.empty(8, dtype=np.int64) nb_dj = np.empty(8, dtype=np.int64) nb_di[0] = -1; nb_di[1] = -1; nb_di[2] = 0; nb_di[3] = 1 nb_di[4] = 1; nb_di[5] = 1; nb_di[6] = 0; nb_di[7] = -1 nb_dj[0] = 0; nb_dj[1] = 1; nb_dj[2] = 1; nb_dj[3] = 1 nb_dj[4] = 0; nb_dj[5] = -1; nb_dj[6] = -1; nb_dj[7] = -1 nb_valid = np.zeros(8, dtype=np.int64) nb_t = np.full(8, np.inf) for k in range(8): ni = i + nb_di[k] nj = j + nb_dj[k] if 0 <= ni < rows and 0 <= nj < cols: if fixed[ni, nj] and not where_compute[ni, nj]: nb_valid[k] = 1 nb_t[k] = times[ni, nj] best = np.inf # --- 1D updates with drift --- # For edge e = (ex, ey) from P to neighbour A: # ∇T ≈ (T_P - T_A) * e / (e^T e) # sqrt((T_P-T_A)² * e^T M e / (e^T e)²) + (T_P-T_A) * v·e / (e^T e) = 1 # Let τ = T_P - T_A > 0, c = e^T M e, s = e^T e, d = v·e # c = e^T D e, s = e^T e, d = v·e # τ = s / (√c - d) # Valid only if √c - d > 0 (sub-critical in this direction) for k in range(8): if not nb_valid[k]: continue ex = np.float64(nb_dj[k]) * dx_mesh ey = np.float64(nb_di[k]) * dy_mesh c = Dxx * ex * ex + 2.0 * Dxy * ex * ey + Dyy * ey * ey if c <= 0.0: continue s = ex * ex + ey * ey # ||e||² d = vx * ex + vy * ey # v · e denom = np.sqrt(c) - d if denom <= 1e-30: continue # super-critical in this direction tau = s / denom if tau > 0.0: cand = nb_t[k] + tau if cand < best: best = cand # --- 2D triangle updates with drift --- # Interpolated edge e(λ) = λ e1 + (1-λ) e2 # τ = ||e(λ)||² / (||e(λ)||_D - v · e(λ)) for k in range(8): k2 = (k + 1) % 8 if not (nb_valid[k] and nb_valid[k2]): continue Ta = nb_t[k] Tb = nb_t[k2] e1x = np.float64(nb_dj[k]) * dx_mesh e1y = np.float64(nb_di[k]) * dy_mesh e2x = np.float64(nb_dj[k2]) * dx_mesh e2y = np.float64(nb_di[k2]) * dy_mesh best_tri = np.inf for step in range(22): if step < 21: lam = step / 20.0 else: p = Dxx * e1x * e1x + 2.0 * Dxy * e1x * e1y + Dyy * e1y * e1y q_val = Dxx * e1x * e2x + Dxy * (e1x * e2y + e1y * e2x) + Dyy * e1y * e2y r = Dxx * e2x * e2x + 2.0 * Dxy * e2x * e2y + Dyy * e2y * e2y delta = Ta - Tb alpha = p - 2.0 * q_val + r beta = q_val - r if alpha > 1e-14: lam = min(1.0, max(0.0, (-beta) / alpha)) else: continue # Interpolated edge ex = lam * e1x + (1.0 - lam) * e2x ey = lam * e1y + (1.0 - lam) * e2y # ||e||²_D cost_sq = Dxx * ex * ex + 2.0 * Dxy * ex * ey + Dyy * ey * ey if cost_sq <= 0.0: continue cost = np.sqrt(cost_sq) drift_dot = vx * ex + vy * ey denom = cost - drift_dot if denom <= 1e-30: continue e_sq = ex * ex + ey * ey tau = e_sq / denom cand = lam * Ta + (1.0 - lam) * Tb + tau if cand < best_tri: best_tri = cand if best_tri < best: best = best_tri return best
@njit(cache=True)
[docs] def __solve_eikonal_aniso_drift(sources, where_compute, speed_xx, speed_xy, speed_yy, drift_vx, drift_vy, dx_mesh, dy_mesh): """Solve anisotropic Eikonal with drift using FMM (8-connected). sqrt( (∇T)^T M (∇T) ) + v · ∇T = 1 This variant does NOT propagate data — only computes T. :param drift_vx: 2D array, x-component of drift velocity. :param drift_vy: 2D array, y-component of drift velocity. """ rows, cols = where_compute.shape fixed = np.zeros((rows, cols), dtype=np.uint8) fixed[:, 0] = 1; fixed[:, -1] = 1 fixed[0, :] = 1; fixed[-1, :] = 1 time = np.full((rows, cols), np.inf) time[:, 0] = 0.0; time[:, -1] = 0.0 time[0, :] = 0.0; time[-1, :] = 0.0 exp_di = np.empty(8, dtype=np.int64) exp_dj = np.empty(8, dtype=np.int64) exp_di[0] = -1; exp_di[1] = -1; exp_di[2] = 0; exp_di[3] = 1 exp_di[4] = 1; exp_di[5] = 1; exp_di[6] = 0; exp_di[7] = -1 exp_dj[0] = 0; exp_dj[1] = 1; exp_dj[2] = 1; exp_dj[3] = 1 exp_dj[4] = 0; exp_dj[5] = -1; exp_dj[6] = -1; exp_dj[7] = -1 capacity = 8 * rows * cols + sources.shape[0] heap_t = np.empty(capacity, dtype=np.float64) heap_i = np.empty(capacity, dtype=np.int64) heap_j = np.empty(capacity, dtype=np.int64) heap_size = np.int64(0) for k in range(sources.shape[0]): si, sj = sources[k, 0], sources[k, 1] time[si, sj] = 0.0 fixed[si, sj] = 1 heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, 0.0, si, sj) while heap_size > 0: heap_size, t, i, j = _heap_pop(heap_t, heap_i, heap_j, heap_size) fixed[i, j] = 1 where_compute[i, j] = 0.0 for d in range(8): new_i = i + exp_di[d] new_j = j + exp_dj[d] if new_i < 0 or new_i >= rows or new_j < 0 or new_j >= cols: continue if fixed[new_i, new_j] or not where_compute[new_i, new_j]: continue new_t = _evaluate_distance_aniso_drift_8conn( new_i, new_j, fixed, where_compute, dx_mesh, dy_mesh, time, speed_xx, speed_xy, speed_yy, drift_vx, drift_vy) if new_t < time[new_i, new_j]: time[new_i, new_j] = new_t heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, new_t, new_i, new_j) return time
[docs] def _solve_eikonal_aniso_drift( sources: list[tuple[int, int]], where_compute: np.ndarray, speed_xx: np.ndarray, speed_xy: np.ndarray, speed_yy: np.ndarray, drift_vx: np.ndarray, drift_vy: np.ndarray, dx_mesh: float = 1.0, dy_mesh: float = 1.0) -> np.ndarray: """Solve the **anisotropic Eikonal equation with drift** using FMM. Computes arrival times T such that:: sqrt( (∇T)^T D⁻¹ (∇T) ) + v · ∇T = 1 where D is a 2×2 symmetric positive-definite metric tensor and v = (vx, vy) is a spatially-varying drift (advection) velocity. The drift breaks the symmetry: propagation is faster **downstream** (in the direction of v) and slower upstream. **Sub-critical condition:** The FMM is valid as long as, at every cell, the drift speed does not exceed the intrinsic wavefront speed in any direction. In practice, ``||v||_D = sqrt(v^T D v) < 1`` should hold everywhere. This function does NOT propagate data — only arrival times. For data propagation with drift, use :func:`_solve_eikonal_aniso_drift_with_data`. :param sources: list of (row, col) source locations. :param where_compute: 2D float array — 1.0 where propagation is allowed, 0.0 for frozen cells. :param speed_xx: 2D array, D_xx component of metric tensor. :param speed_xy: 2D array, D_xy component (off-diagonal). :param speed_yy: 2D array, D_yy component. :param drift_vx: 2D array, x-component of drift velocity. :param drift_vy: 2D array, y-component of drift velocity. :param dx_mesh: mesh spacing in x (column) direction. :param dy_mesh: mesh spacing in y (row) direction. :return: 2D array of arrival times T. Example — uniform flow along x:: speed = 2.0 Dxx = Dyy = speed**2 * np.ones(shape) Dxy = np.zeros(shape) vx = 0.3 * np.ones(shape) # drift along +x vy = np.zeros(shape) T = _solve_eikonal_aniso_drift(sources, mask, Dxx, Dxy, Dyy, vx, vy) """ rows, cols = where_compute.shape assert speed_xx.shape == (rows, cols) assert speed_xy.shape == (rows, cols) assert speed_yy.shape == (rows, cols) assert drift_vx.shape == (rows, cols) assert drift_vy.shape == (rows, cols) if len(sources) == 0: logging.error("No sources provided") return np.zeros((rows, cols)) sources_arr = np.array(sources, dtype=np.int64).reshape(-1, 2) return __solve_eikonal_aniso_drift(sources_arr, where_compute, speed_xx, speed_xy, speed_yy, drift_vx, drift_vy, dx_mesh, dy_mesh)
# ===================================================================== # ANISOTROPIC EIKONAL WITH DRIFT + DATA PROPAGATION # ===================================================================== @njit(cache=True)
[docs] def _evaluate_distance_and_data_aniso_drift_8conn( i, j, fixed, where_compute, dx_mesh, dy_mesh, times, base_data, test_data, speed_xx, speed_xy, speed_yy, drift_vx, drift_vy): """Evaluate arrival time AND propagated data for anisotropic eikonal with drift on an 8-connected stencil. Combines the drift-aware time update of :func:`_evaluate_distance_aniso_drift_8conn` with the inverse-distance–weighted data propagation of :func:`_evaluate_distance_and_data_anisotropic_8conn`. :return: ``(new_time, new_data)`` """ rows = fixed.shape[0] cols = fixed.shape[1] Dxx = speed_xx[i, j] Dxy = speed_xy[i, j] Dyy = speed_yy[i, j] vx = drift_vx[i, j] vy = drift_vy[i, j] det_D = Dxx * Dyy - Dxy * Dxy if det_D <= 1e-30: return np.inf, test_data[i, j] # Use D directly (same convention as base aniso solver) nb_di = np.empty(8, dtype=np.int64) nb_dj = np.empty(8, dtype=np.int64) nb_di[0] = -1; nb_di[1] = -1; nb_di[2] = 0; nb_di[3] = 1 nb_di[4] = 1; nb_di[5] = 1; nb_di[6] = 0; nb_di[7] = -1 nb_dj[0] = 0; nb_dj[1] = 1; nb_dj[2] = 1; nb_dj[3] = 1 nb_dj[4] = 0; nb_dj[5] = -1; nb_dj[6] = -1; nb_dj[7] = -1 nb_valid = np.zeros(8, dtype=np.int64) nb_t = np.full(8, np.inf) nb_data_ok = np.zeros(8, dtype=np.int64) for k in range(8): ni = i + nb_di[k] nj = j + nb_dj[k] if 0 <= ni < rows and 0 <= nj < cols: if fixed[ni, nj] and not where_compute[ni, nj]: nb_valid[k] = 1 nb_t[k] = times[ni, nj] if base_data[ni, nj] > test_data[i, j]: nb_data_ok[k] = 1 # Data: inverse-distance-weighted average of eligible neighbours sum_w = 0.0 sum_wd = 0.0 for k in range(8): if nb_valid[k] and nb_data_ok[k]: ni = i + nb_di[k] nj = j + nb_dj[k] edge_len = np.sqrt((nb_dj[k] * dx_mesh)**2 + (nb_di[k] * dy_mesh)**2) w = 1.0 / edge_len sum_w += w sum_wd += w * base_data[ni, nj] if sum_w == 0.0: return np.inf, test_data[i, j] new_data = sum_wd / sum_w if new_data < test_data[i, j]: return np.inf, test_data[i, j] # Time: same drift-aware algorithm as _evaluate_distance_aniso_drift_8conn best = np.inf # 1D updates for k in range(8): if not nb_valid[k]: continue ex = np.float64(nb_dj[k]) * dx_mesh ey = np.float64(nb_di[k]) * dy_mesh c = Dxx * ex * ex + 2.0 * Dxy * ex * ey + Dyy * ey * ey if c <= 0.0: continue s = ex * ex + ey * ey d = vx * ex + vy * ey denom = np.sqrt(c) - d if denom <= 1e-30: continue tau = s / denom if tau > 0.0: cand = nb_t[k] + tau if cand < best: best = cand # 2D triangle updates with drift (sampled) for k in range(8): k2 = (k + 1) % 8 if not (nb_valid[k] and nb_valid[k2]): continue Ta = nb_t[k] Tb = nb_t[k2] e1x = np.float64(nb_dj[k]) * dx_mesh e1y = np.float64(nb_di[k]) * dy_mesh e2x = np.float64(nb_dj[k2]) * dx_mesh e2y = np.float64(nb_di[k2]) * dy_mesh best_tri = np.inf for step in range(22): if step < 21: lam = step / 20.0 else: p = Dxx * e1x * e1x + 2.0 * Dxy * e1x * e1y + Dyy * e1y * e1y q_val = Dxx * e1x * e2x + Dxy * (e1x * e2y + e1y * e2x) + Dyy * e1y * e2y r = Dxx * e2x * e2x + 2.0 * Dxy * e2x * e2y + Dyy * e2y * e2y alpha = p - 2.0 * q_val + r beta = q_val - r if alpha > 1e-14: lam = min(1.0, max(0.0, (-beta) / alpha)) else: continue ex = lam * e1x + (1.0 - lam) * e2x ey = lam * e1y + (1.0 - lam) * e2y cost_sq = Dxx * ex * ex + 2.0 * Dxy * ex * ey + Dyy * ey * ey if cost_sq <= 0.0: continue cost = np.sqrt(cost_sq) drift_dot = vx * ex + vy * ey denom = cost - drift_dot if denom <= 1e-30: continue e_sq = ex * ex + ey * ey tau = e_sq / denom cand = lam * Ta + (1.0 - lam) * Tb + tau if cand < best_tri: best_tri = cand if best_tri < best: best = best_tri return best, new_data
@njit(cache=True)
[docs] def __solve_eikonal_aniso_drift_with_data(sources, where_compute, base_data, test_data, speed_xx, speed_xy, speed_yy, drift_vx, drift_vy, dx_mesh, dy_mesh): """Solve anisotropic Eikonal with drift and data propagation (FMM, 8-conn). ``base_data`` is modified **in-place**. """ rows, cols = where_compute.shape fixed = np.zeros((rows, cols), dtype=np.uint8) fixed[:, 0] = 1; fixed[:, -1] = 1 fixed[0, :] = 1; fixed[-1, :] = 1 time = np.full((rows, cols), np.inf) time[:, 0] = 0.0; time[:, -1] = 0.0 time[0, :] = 0.0; time[-1, :] = 0.0 exp_di = np.empty(8, dtype=np.int64) exp_dj = np.empty(8, dtype=np.int64) exp_di[0] = -1; exp_di[1] = -1; exp_di[2] = 0; exp_di[3] = 1 exp_di[4] = 1; exp_di[5] = 1; exp_di[6] = 0; exp_di[7] = -1 exp_dj[0] = 0; exp_dj[1] = 1; exp_dj[2] = 1; exp_dj[3] = 1 exp_dj[4] = 0; exp_dj[5] = -1; exp_dj[6] = -1; exp_dj[7] = -1 capacity = 8 * rows * cols + sources.shape[0] heap_t = np.empty(capacity, dtype=np.float64) heap_i = np.empty(capacity, dtype=np.int64) heap_j = np.empty(capacity, dtype=np.int64) heap_size = np.int64(0) for k in range(sources.shape[0]): si, sj = sources[k, 0], sources[k, 1] time[si, sj] = 0.0 fixed[si, sj] = 1 heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, 0.0, si, sj) while heap_size > 0: heap_size, t, i, j = _heap_pop(heap_t, heap_i, heap_j, heap_size) fixed[i, j] = 1 where_compute[i, j] = 0.0 for d in range(8): new_i = i + exp_di[d] new_j = j + exp_dj[d] if new_i < 0 or new_i >= rows or new_j < 0 or new_j >= cols: continue if fixed[new_i, new_j] or not where_compute[new_i, new_j]: continue new_t, new_data = _evaluate_distance_and_data_aniso_drift_8conn( new_i, new_j, fixed, where_compute, dx_mesh, dy_mesh, time, base_data, test_data, speed_xx, speed_xy, speed_yy, drift_vx, drift_vy) if new_t < time[new_i, new_j]: time[new_i, new_j] = new_t base_data[new_i, new_j] = new_data heap_size = _heap_push(heap_t, heap_i, heap_j, heap_size, new_t, new_i, new_j) return time
[docs] def _solve_eikonal_aniso_drift_with_data( sources: list[tuple[int, int]], where_compute: np.ndarray, base_data: np.ndarray, test_data: np.ndarray, speed_xx: np.ndarray, speed_xy: np.ndarray, speed_yy: np.ndarray, drift_vx: np.ndarray, drift_vy: np.ndarray, dx_mesh: float = 1.0, dy_mesh: float = 1.0) -> np.ndarray: """Solve the **anisotropic Eikonal equation with drift and data propagation**. Computes arrival times T such that:: sqrt( (∇T)^T D⁻¹ (∇T) ) + v · ∇T = 1 and simultaneously propagates ``base_data`` outward from the sources along the FMM wavefront. ``base_data`` is modified **in-place**. :param sources: list of (row, col) source locations. :param where_compute: 2D float array — 1.0 where propagation is allowed, 0.0 for frozen cells. :param base_data: 2D array of data to propagate (modified in-place). :param test_data: 2D array — propagation accepted only when candidate data > test_data[i, j]. :param speed_xx: 2D array, D_xx component of metric tensor. :param speed_xy: 2D array, D_xy component (off-diagonal). :param speed_yy: 2D array, D_yy component. :param drift_vx: 2D array, x-component of drift velocity. :param drift_vy: 2D array, y-component of drift velocity. :param dx_mesh: mesh spacing in x (column) direction. :param dy_mesh: mesh spacing in y (row) direction. :return: 2D array of arrival times T. """ rows, cols = where_compute.shape assert base_data.shape == (rows, cols) assert test_data.shape == (rows, cols) assert speed_xx.shape == (rows, cols) assert speed_xy.shape == (rows, cols) assert speed_yy.shape == (rows, cols) assert drift_vx.shape == (rows, cols) assert drift_vy.shape == (rows, cols) if len(sources) == 0: logging.error("No sources provided") return np.zeros((rows, cols)) sources_arr = np.array(sources, dtype=np.int64).reshape(-1, 2) return __solve_eikonal_aniso_drift_with_data( sources_arr, where_compute, base_data, test_data, speed_xx, speed_xy, speed_yy, drift_vx, drift_vy, dx_mesh, dy_mesh)
# --------------------------------------------------------------------------- # Utility functions for building eikonal inputs from hydrodynamic fields # ---------------------------------------------------------------------------
[docs] def diffusion_tensor_from_velocity(Ux: np.ndarray, Uy: np.ndarray, D_longitudinal, D_transverse, min_speed: float = 1e-10 ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: r"""Build an anisotropic diffusion tensor aligned with a velocity field. The tensor is constructed as: .. math:: D = D_L \;(\mathbf{n} \otimes \mathbf{n}) + D_T \;(\mathbf{t} \otimes \mathbf{t}) where :math:`\mathbf{n} = (U_x, U_y) / |U|` is the unit flow direction and :math:`\mathbf{t} = (-n_y, n_x)` the perpendicular. In component form: - :math:`D_{xx} = D_L\, n_x^2 + D_T\, n_y^2` - :math:`D_{xy} = (D_L - D_T)\, n_x\, n_y` - :math:`D_{yy} = D_L\, n_y^2 + D_T\, n_x^2` At cells where :math:`|U| <` *min_speed*, the tensor falls back to isotropic: :math:`D = D_T\, I`. If *Ux* or *Uy* are :class:`numpy.ma.MaskedArray`, the returned arrays are also masked with the **union** of input masks. Masked cells are filled with *D_T* (isotropic) for Dxx/Dyy and 0 for Dxy. *D_longitudinal* and *D_transverse* may be **scalars** (uniform) or **2-D arrays** of the same shape as *Ux* / *Uy* (spatially variable). :param Ux: Velocity component in the **x** (column) direction (2-D array, rows × cols). :type Ux: numpy.ndarray or numpy.ma.MaskedArray :param Uy: Velocity component in the **y** (row) direction (2-D array, rows × cols). :type Uy: numpy.ndarray or numpy.ma.MaskedArray :param D_longitudinal: Diffusion coefficient along the flow direction (speed² units). Scalar or 2-D array. :type D_longitudinal: float or numpy.ndarray :param D_transverse: Diffusion coefficient perpendicular to the flow (speed² units). Scalar or 2-D array. :type D_transverse: float or numpy.ndarray :param min_speed: Minimum velocity magnitude below which the tensor is isotropic. Default ``1e-10``. :type min_speed: float, optional :return: ``(Dxx, Dxy, Dyy)`` — components of the symmetric 2×2 diffusion tensor, ready for :func:`_solve_eikonal_anisotropic` or :func:`_solve_eikonal_aniso_drift`. :rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray] Example:: Dxx, Dxy, Dyy = diffusion_tensor_from_velocity(Ux, Uy, D_L=9.0, D_T=1.0) T = _solve_eikonal_aniso_drift(sources, wc, Dxx, Dxy, Dyy, Ux, Uy, dx, dy) """ is_masked = isinstance(Ux, np.ma.MaskedArray) or isinstance(Uy, np.ma.MaskedArray) if is_masked: combined_mask = np.ma.getmaskarray(Ux) | np.ma.getmaskarray(Uy) Ux_data = np.ma.filled(Ux, 0.0).astype(np.float64) Uy_data = np.ma.filled(Uy, 0.0).astype(np.float64) else: Ux_data = np.asarray(Ux, dtype=np.float64) Uy_data = np.asarray(Uy, dtype=np.float64) speed = np.sqrt(Ux_data**2 + Uy_data**2) slow = speed < min_speed # Unit direction (safe division) nx = np.where(slow, 0.0, Ux_data / np.where(slow, 1.0, speed)) ny = np.where(slow, 0.0, Uy_data / np.where(slow, 1.0, speed)) # Broadcast D_L, D_T to the velocity field shape D_L = np.broadcast_to(np.asarray(D_longitudinal, dtype=np.float64), Ux_data.shape) D_T = np.broadcast_to(np.asarray(D_transverse, dtype=np.float64), Ux_data.shape) Dxx = D_L * nx**2 + D_T * ny**2 Dxy = (D_L - D_T) * nx * ny Dyy = D_L * ny**2 + D_T * nx**2 # Fall back to isotropic where speed ≈ 0 Dxx[slow] = D_T[slow] Dxy[slow] = 0.0 Dyy[slow] = D_T[slow] if is_masked: Dxx = np.ma.array(Dxx, mask=combined_mask) Dxy = np.ma.array(Dxy, mask=combined_mask) Dyy = np.ma.array(Dyy, mask=combined_mask) return Dxx, Dxy, Dyy
[docs] def eikonal_params_from_velocity(Ux: np.ndarray, Uy: np.ndarray, D_longitudinal, D_transverse, drift_scale: float = 1.0, min_speed: float = 1e-10 ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Prepare all eikonal parameters from a hydrodynamic velocity field. Convenience wrapper that computes both the anisotropic diffusion tensor *D* (via :func:`diffusion_tensor_from_velocity`) and the drift velocity from a single velocity field (*Ux*, *Uy*). If *Ux* or *Uy* are :class:`numpy.ma.MaskedArray`, all five returned arrays are masked arrays with the union of input masks. *D_longitudinal* and *D_transverse* may be **scalars** or **2-D arrays** (spatially variable diffusivity). :param Ux: Velocity component in the **x** (column) direction (2-D array, rows × cols). :type Ux: numpy.ndarray or numpy.ma.MaskedArray :param Uy: Velocity component in the **y** (row) direction (2-D array, rows × cols). :type Uy: numpy.ndarray or numpy.ma.MaskedArray :param D_longitudinal: Speed² along the flow direction. Scalar or 2-D array. :type D_longitudinal: float or numpy.ndarray :param D_transverse: Speed² perpendicular to the flow. Scalar or 2-D array. :type D_transverse: float or numpy.ndarray :param drift_scale: Scaling factor applied to the drift velocity (``drift = drift_scale × (Ux, Uy)``). Use 1.0 to take the raw velocity as drift. Default ``1.0``. :type drift_scale: float, optional :param min_speed: Minimum |U| for directional tensor. Default ``1e-10``. :type min_speed: float, optional :return: ``(Dxx, Dxy, Dyy, drift_vx, drift_vy)`` — diffusion tensor components and drift velocity arrays. :rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray] Example:: Dxx, Dxy, Dyy, vx, vy = eikonal_params_from_velocity( Ux, Uy, D_longitudinal=9.0, D_transverse=1.0, drift_scale=0.8) T = _solve_eikonal_aniso_drift(sources, wc, Dxx, Dxy, Dyy, vx, vy, dx, dy) """ Dxx, Dxy, Dyy = diffusion_tensor_from_velocity( Ux, Uy, D_longitudinal, D_transverse, min_speed) is_masked = isinstance(Ux, np.ma.MaskedArray) or isinstance(Uy, np.ma.MaskedArray) if is_masked: combined_mask = np.ma.getmaskarray(Ux) | np.ma.getmaskarray(Uy) drift_vx = np.ma.array(np.ma.filled(Ux, 0.0).astype(np.float64) * drift_scale, mask=combined_mask, fill_value=0.0) drift_vy = np.ma.array(np.ma.filled(Uy, 0.0).astype(np.float64) * drift_scale, mask=combined_mask, fill_value=0.0) else: drift_vx = np.asarray(Ux, dtype=np.float64) * drift_scale drift_vy = np.asarray(Uy, dtype=np.float64) * drift_scale return Dxx, Dxy, Dyy, drift_vx, drift_vy