"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024
Copyright (c) 2024 University of Liege. All rights reserved.
This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""
from numba import njit
import numpy as np
from tqdm import tqdm
import glob
from os.path import join
import logging
from pathlib import Path
from tempfile import TemporaryDirectory, TemporaryFile
from collections import deque
from ..wolf_array import WolfArray, WolfArrayMB
from ..wolf_tiles import Tiles
from ..PyVertexvectors import vector, wolfvertex as wv
from ..wolf_vrt import create_vrt, crop_vrt
from ..pydownloader import toys_lidaxe, LIDAXE_FLOW_ACC, LIDAXE_FLOW_DIR, LIDAXE_TILES_INDEX
# Define the mapping of flow directions to neighbor indices
@njit(cache=True)
[docs]
def get_direction_mapping(value):
""" Get the mapping of flow direction value to neighbor indices (i, j).
The array convention is WOLF array with X-axis along (i) and Y-axis along (j),
with (0, 0) at the lower-left corner.
The flow direction values are based on the D8 algorithm, where each value
corresponds to a specific direction of flow from a cell to one of its 8 neighbors.
"""
if value == 1:
return (1, 0) # East
elif value == 2:
return (1, -1) # Southeast
elif value == 4:
return (0, -1) # South
elif value == 8:
return (-1, -1) # Southwest
elif value == 16:
return (-1, 0) # West
elif value == 32:
return (-1, 1) # Northwest
elif value == 64:
return (0, 1) # North
elif value == 128:
return (1, 1) # Northeast
else:
return (0, 0) # No flow
@njit(cache=True)
[docs]
def _is_neighbor_flows_towards_me(flow_dir, start_i, start_j, visited, watershed):
"""
Check if any unvisited neighbors flow towards the current cell.
This function examines the 8 neighboring cells of a given position and determines
if any of them have flow directions pointing towards the current cell. Neighbors
that flow towards the current cell are marked in the watershed and added to the
output queue.
:param flow_dir: 2D array containing flow direction values for each cell.
:param start_i: Row index of the current cell.
:param start_j: Column index of the current cell.
:param visited: 2D boolean array tracking which cells have been processed.
:param watershed: 2D boolean array marking cells that belong to the watershed.
:returns: A list of tuples (neighbor_i, neighbor_j) representing neighbors
that flow towards the current cell. Returns an empty list if the
starting cell has already been visited.
"""
queue = []
if visited[start_i, start_j]:
return queue
visited[start_i, start_j] = True
for i in [-1, 0, 1]:
for j in [-1, 0, 1]:
if i == 0 and j == 0:
continue
neighbor_i = start_i + i
neighbor_j = start_j + j
if visited[neighbor_i, neighbor_j]:
continue
if (0 <= neighbor_i < flow_dir.shape[0]) and (0 <= neighbor_j < flow_dir.shape[1]):
neighbor_flow_direction = flow_dir[neighbor_i, neighbor_j]
incr_i, incr_j = get_direction_mapping(neighbor_flow_direction)
if incr_i == -i and incr_j == -j:
watershed[neighbor_i, neighbor_j] = True
queue.append((neighbor_i, neighbor_j))
return queue
@njit(cache=True)
[docs]
def _inverse_flow_dir_numba(flow_dir, seed_i, seed_j, n_seeds, visited, watershed):
"""
Fully Numba-compiled BFS for inverse flow direction.
The entire BFS runs in compiled code, avoiding repeated Python-Numba
boundary crossings that occur in the per-cell approach.
:param flow_dir: 2D array containing flow direction values for each cell.
:param seed_i: 1D int32 array of row indices for seed cells.
:param seed_j: 1D int32 array of column indices for seed cells.
:param n_seeds: Number of seed cells.
:param visited: 2D boolean array tracking which cells have been processed.
:param watershed: 2D boolean array marking cells that belong to the watershed.
:returns: A 2D boolean array where True values indicate cells that belong
to the watershed defined by the flow direction.
"""
rows, cols = flow_dir.shape
queue_i = np.empty(rows * cols, dtype=np.int32)
queue_j = np.empty(rows * cols, dtype=np.int32)
head = 0
tail = 0
for k in range(n_seeds):
si, sj = seed_i[k], seed_j[k]
if not visited[si, sj]:
watershed[si, sj] = True
visited[si, sj] = True
queue_i[tail] = si
queue_j[tail] = sj
tail += 1
while head < tail:
ci = queue_i[head]
cj = queue_j[head]
head += 1
for di in range(-1, 2):
for dj in range(-1, 2):
if di == 0 and dj == 0:
continue
ni = ci + di
nj = cj + dj
if ni < 0 or ni >= rows or nj < 0 or nj >= cols:
continue
if visited[ni, nj]:
continue
incr_i, incr_j = get_direction_mapping(flow_dir[ni, nj])
if incr_i == -di and incr_j == -dj:
watershed[ni, nj] = True
visited[ni, nj] = True
queue_i[tail] = ni
queue_j[tail] = nj
tail += 1
return watershed
[docs]
def inverse_flow_dir(flow_dir:np.ndarray, queue:deque, visited:np.ndarray, watershed:np.ndarray):
"""
Inverse flow direction to create watershed.
:param flow_dir: 2D array containing flow direction values for each cell.
:param queue: A deque containing tuples of (i, j) indices for cells to
process. Initially contains the starting cell.
:param visited: 2D boolean array tracking which cells have been processed.
:param watershed: 2D boolean array marking cells that belong to the watershed.
:returns: A 2D boolean array where True values indicate cells that belong
to the watershed defined by the flow direction.
"""
if not queue:
return watershed
seed_i = np.array([int(p[0]) for p in queue], dtype=np.int32)
seed_j = np.array([int(p[1]) for p in queue], dtype=np.int32)
queue.clear()
return _inverse_flow_dir_numba(flow_dir, seed_i, seed_j, len(seed_i), visited, watershed)
@njit(cache=True)
[docs]
def _get_largest_neighbor_flowing_towards_me(flow_dir, accumul, start_i, start_j):
"""
Find the largest neighbor cell that flows towards the current cell.
This function examines the 8 neighboring cells of a given position and determines
if any of them have flow directions pointing towards the current cell. The neighbor
with the highest accumulation value is selected.
:param flow_dir: 2D array containing flow direction values for each cell.
:param accumul: 2D array containing accumulation values for each cell.
:param start_i: Row index of the current cell.
:param start_j: Column index of the current cell.
:param visited: 2D boolean array tracking which cells have been processed.
:returns: A tuple (neighbor_i, neighbor_j) representing the neighbor that flows
towards the current cell and has the highest accumulation value. Returns
an empty tuple if no such neighbor exists or if the starting cell has already been visited.
"""
max_accumul = -1
max_i, max_j = -1, -1
for i in [-1, 0, 1]:
for j in [-1, 0, 1]:
if i == 0 and j == 0:
continue
neighbor_i = start_i + i
neighbor_j = start_j + j
neighbor_flow_direction = flow_dir[neighbor_i, neighbor_j]
incr_i, incr_j = get_direction_mapping(neighbor_flow_direction)
if incr_i == -i and incr_j == -j:
if accumul[neighbor_i, neighbor_j] > max_accumul:
max_accumul = accumul[neighbor_i, neighbor_j]
max_i, max_j = neighbor_i, neighbor_j
return (max_i, max_j)
[docs]
class FlowAcc(Tiles):
"""
Class LIDAXE flow accumulation
"""
def __init__(self, filename = None, data_dir = None):
super().__init__(filename, linked_data_dir= data_dir)
[docs]
self._search_length = 500.
@property
[docs]
def search_length(self):
return self._search_length
@search_length.setter
def search_length(self, value):
self._search_length = value
[docs]
def get_accumulation_from_catchment(self, catchment:WolfArray) -> WolfArray:
"""
Get accumulation from a catchment.
:param catchment: A WolfArray representing the catchment area for which flow accumulation is to be calculated.
:returns: A WolfArray containing the flow accumulation values for the specified catchment area.
"""
bounds = catchment.get_bounds()
vec = vector()
vec.add_vertices_from_array(np.array([[bounds[0][0], bounds[1][0]],
[bounds[0][0], bounds[1][1]],
[bounds[0][1], bounds[1][1]],
[bounds[0][1], bounds[1][0]]]))
vec.force_to_close()
flow_acc = self._extract_data(vec)
flow_acc.array.mask[:,:] = catchment.array.mask
flow_acc.nullvalue = 0
flow_acc.set_nullvalue_in_mask()
flow_acc.count()
return flow_acc
[docs]
def get_accumulation_around(self, x:float, y:float) -> WolfArray:
"""
Get accumulation from flow accumulation.
:param x: X coordinate of the point around which flow accumulation is to be calculated.
:param y: Y coordinate of the point around which flow accumulation is to be calculated.
:returns: A WolfArray containing the flow accumulation values for the area around the specified point.
"""
vec = vector()
vec.add_vertices_from_array(np.array([[x-self._search_length, y-self._search_length],
[x+self._search_length, y-self._search_length],
[x+self._search_length, y+self._search_length],
[x-self._search_length, y+self._search_length]]))
vec.force_to_close()
flow_acc = self._extract_data(vec)
return flow_acc
[docs]
class FlowDir(Tiles):
"""
Class to inverse flow direction to create watershed.
"""
def __init__(self, filename = None, data_dir = None):
super().__init__(filename, linked_data_dir= data_dir)
[docs]
self._search_length = 500.
""" The search length is the distance in meters that will be used to search for the flow direction tiles around the point of interest.
If the catchment touches the border of the extracted flow direction array, it means that the catchment is not complete and we need to
search for a larger area. The search length will be increased by this value until we find a sufficiently large area that contains the
entire catchment."""
@property
[docs]
def search_length(self):
return self._search_length
@search_length.setter
def search_length(self, value):
self._search_length = value
[docs]
def get_catchment(self, x:float, y:float) -> WolfArray:
"""
Get catchment from flow direction.
:param x: X coordinate of the point for which catchment is to be calculated.
:param y: Y coordinate of the point for which catchment is to be calculated.
:returns: A WolfArray containing the catchment area for the specified point.
"""
vec = vector()
vec.add_vertices_from_array(np.array([[x-self._search_length, y-self._search_length],
[x+self._search_length, y-self._search_length],
[x+self._search_length, y+self._search_length],
[x-self._search_length, y+self._search_length]]))
vec.force_to_close()
flow_dir = self._extract_data(vec)
catchment, flow_dir = self._inverse_flow_dir(flow_dir, x, y)
return catchment, flow_dir
[docs]
def _inverse_flow_dir(self, flow_dir:WolfArray, x:float, y:float) -> tuple[WolfArray, WolfArray]:
"""
Inverse flow direction to create watershed.
Begin with a quite small area around the point of interest, extract the flow direction data for this area,
and inverse the flow direction to create the watershed.
If the catchment touches the border of the extracted flow direction array, it means that the catchment
is not complete and we need to search for a larger area.
The search length will be increased by the value of self._search_length until we find a sufficiently large area
that contains the entire catchment.
:param flow_dir: A WolfArray containing the flow direction data for the area around the point of interest.
:param x: X coordinate of the point for which catchment is to be calculated.
:param y: Y coordinate of the point for which catchment is to be calculated.
:returns: A tuple of WolfArrays containing the catchment area and flow direction for the specified point.
"""
sufficiently_large = False
visited = np.zeros(flow_dir.array.shape, dtype=bool)
watershed = np.zeros(flow_dir.array.shape, dtype=bool)
queue = deque()
queue.append((flow_dir.get_ij_from_xy(x, y)))
while not sufficiently_large:
catchment = inverse_flow_dir(flow_dir.array.data, queue, visited, watershed)
# test if catchment touches the border of the array, if yes, it means that the catchment is not complete and we need to search for a larger area
if np.any(catchment[0, :]) or np.any(catchment[-1, :]) or np.any(catchment[:, 0]) or np.any(catchment[:, -1]):
logging.warning("Catchment touches the border of the array, searching for a larger area...")
to_left = np.any(catchment[0, :])
to_right = np.any(catchment[-1, :])
to_bottom = np.any(catchment[:, 0])
to_top = np.any(catchment[:, -1])
[[xmin, xmax], [ymin, ymax]] = flow_dir.get_bounds()
xmin = xmin - self._search_length if to_left else xmin
xmax = xmax + self._search_length if to_right else xmax
ymin = ymin - self._search_length if to_bottom else ymin
ymax = ymax + self._search_length if to_top else ymax
vec = vector()
vec.add_vertices_from_array(np.array([[xmin, ymin],
[xmax, ymin],
[xmax, ymax],
[xmin, ymax]]))
vec.force_to_close()
new_flow_dir = self._extract_data(vec)
old_origin_i, old_origin_j = new_flow_dir.get_ij_from_xy(flow_dir.origx, flow_dir.origy)
# extend visited and watershed arrays to the new size
new_visited = np.zeros(new_flow_dir.array.shape, dtype=bool)
new_watershed = np.zeros(new_flow_dir.array.shape, dtype=bool)
new_visited[old_origin_i:old_origin_i+visited.shape[0], old_origin_j:old_origin_j+visited.shape[1]] = visited
new_watershed[old_origin_i:old_origin_i+watershed.shape[0], old_origin_j:old_origin_j+watershed.shape[1]] = watershed
queue_left = list(np.argwhere(watershed[0, :] == True))
queue_right = list(np.argwhere(watershed[-1, :] == True))
queue_bottom = list(np.argwhere(watershed[:, 0] == True))
queue_top = list(np.argwhere(watershed[:, -1] == True))
queue = [(old_origin_i, j + old_origin_j) for j in queue_left] + \
[(old_origin_i + watershed.shape[0]-1, j + old_origin_j) for j in queue_right] + \
[(i + old_origin_i, old_origin_j) for i in queue_bottom] + \
[(i + old_origin_i, old_origin_j + watershed.shape[1]-1) for i in queue_top]
queue = deque(queue)
visited = new_visited
watershed = new_watershed
flow_dir = new_flow_dir
for i,j in queue:
visited[i, j] = False
else:
sufficiently_large = True
catchment_wa = WolfArray(mold=flow_dir)
catchment_wa.array.data[:,:] = catchment.astype(np.int8)
catchment_wa.mask_data(0)
return catchment_wa, flow_dir
[docs]
class Lidaxe():
""" Class to handle LIDAXE flow direction and flow accumulation data. """
def __init__(self, flow_direction:Path | None = None, flow_accumulation:Path | None = None):
""" Initialize the Lidaxe class.
:param flow_direction: Path to the flow direction data. If None, it will be downloaded from the Lidaxe dataset.
:param flow_accumulation: Path to the flow accumulation data. If None, it will be downloaded from the Lidaxe dataset.
"""
if flow_direction is None:
# flow_direction = toys_lidaxe(FLOW_DIR, 'tmp.vrt')
flow_direction = toys_lidaxe(LIDAXE_FLOW_DIR, LIDAXE_TILES_INDEX)
if flow_accumulation is None:
# flow_accumulation = toys_lidaxe(FLOW_ACC, 'tmp.vrt')
flow_accumulation = toys_lidaxe(LIDAXE_FLOW_ACC, LIDAXE_TILES_INDEX)
flow_direction = Path(flow_direction)
flow_accumulation = Path(flow_accumulation)
if not flow_direction.exists():
logging.error(f"Flow direction file {flow_direction} does not exist")
return
if not flow_accumulation.exists():
logging.error(f"Flow accumulation file {flow_accumulation} does not exist")
return
[docs]
self.flow_dir = FlowDir(filename=str(flow_direction), data_dir=str(flow_direction.parent))
[docs]
self.flow_acc = FlowAcc(filename=str(flow_accumulation), data_dir=str(flow_accumulation.parent))
[docs]
def snap_to_max(self, x:float, y:float, snap_length:float=10.0) -> tuple[float, float]:
""" Snap the point to the nearest maximum flow accumulation value within a certain distance.
:param x: X coordinate of the point to be snapped.
:param y: Y coordinate of the point to be snapped.
:param snap_length: The distance in meters within which to search for the maximum flow accumulation value to snap to.
:returns: A tuple containing the new X and Y coordinates of the snapped point.
"""
vec = vector()
vec.add_vertices_from_array(np.array([[x-snap_length, y-snap_length],
[x+snap_length, y-snap_length],
[x+snap_length, y+snap_length],
[x-snap_length, y+snap_length]]))
vec.force_to_close()
acc = self.flow_acc._extract_data(vec)
ij = np.argmax(acc.array.data)
x,y = acc.get_xy_from_ij(*np.unravel_index(ij, acc.array.shape))
return x,y
[docs]
def get_catchment(self, x:float, y:float, snap_to_max:bool=True, snap_length:float=10.0) -> tuple[WolfArray, WolfArray, WolfArray]:
""" Get catchment from flow direction and flow accumulation.
:param x: X coordinate of the point for which catchment is to be calculated.
:param y: Y coordinate of the point for which catchment is to be calculated.
:param snap_to_max: If True, the point will be snapped to the nearest maximum flow accumulation value within a certain distance defined by snap_length.
:param snap_length: The distance in meters within which to search for the maximum flow accumulation value to snap to if snap_to_max is True.
:returns: A tuple of WolfArrays containing the catchment area, flow accumulation, and flow direction for the specified point.
"""
if snap_to_max:
x,y = self.snap_to_max(x, y, snap_length=snap_length)
catchment, flow_dir = self.flow_dir.get_catchment(x, y)
acc = self.flow_acc.get_accumulation_from_catchment(catchment)
acc.mypal.distribute_values(0., 10_000.)
acc.mypal.automatic = False
return catchment, acc, flow_dir
[docs]
def get_catchments(self, working_area: vector, buffer:float = 2.0) -> tuple[WolfArrayMB, WolfArrayMB, WolfArrayMB]:
""" Get catchments from flow direction and flow accumulation for a working area.
:param working_area: A vector object defining the working area for which catchments are to be calculated.
:param buffer: A buffer distance in meters to expand the bounding box of the working polyline. This can be used to ensure that the extracted data includes a margin around the area of interest.
:returns: a tuple of three WolfArrayMB, the first one containing the catchments, the second one containing the flow accumulations, and the third one containing the flow directions for each catchment.
"""
catchments = []
flowdir = self.flow_dir._extract_data(working_area, buffer=buffer)
flowacc = self.flow_acc._extract_data(working_area, buffer=buffer)
ij = flowacc.get_ij_inside_polygon(working_area,).tolist()
ij_along = flowacc.get_ij_under_polyline(working_area,).tolist()
# Create a set of unique (i, j) indices from both lists
ij = list(set(tuple(idx) for idx in ij + ij_along))
# Create a sorted list of unique (i, j) indices based on flow accumulation values in descending order
sorted_ij = sorted(ij, key=lambda idx: flowacc.array[idx[0], idx[1]], reverse=True)
# Find catchments for each unique (i, j) index if (i, j) is not in the catchment of a previously processed index
idx = 1
while sorted_ij:
i, j = sorted_ij.pop(0)
if flowacc.array.mask[i, j]:
break
x,y = flowacc.get_xy_from_ij(i, j)
treated = False
for catchment, acc, flow_dir in catchments:
loci, locj = acc.get_ij_from_xy(x, y)
if not acc.array.mask[loci, locj]:
treated = True
break
if treated:
continue
catchments.append(self.get_catchment(x, y, snap_to_max= False))
catchments[-1][0].array[~catchments[-1][0].array.mask] = idx
idx += 1
acc_MB = WolfArrayMB.make_multiblocks([acc for catchment, acc, flow_dir in catchments])
dir_MB = WolfArrayMB.make_multiblocks([flow_dir for catchment, acc, flow_dir in catchments])
catchment_MB = WolfArrayMB.make_multiblocks([catchment for catchment, acc, flow_dir in catchments])
catchment_MB.mypal.distribute_values(0., idx)
catchment_MB.mypal.automatic = False
acc_MB.mypal.distribute_values(0., 10_000.)
acc_MB.mypal.automatic = False
return catchment_MB, acc_MB, dir_MB
[docs]
def get_catchments_on_border(self, working_polyline: vector,
exclude_interior:bool = True,
buffer:float = 2.0,
ignore_smaller_than:float = 100.0) -> tuple[WolfArrayMB, WolfArrayMB, WolfArrayMB]:
""" Get catchments from flow direction and flow accumulation for a working polyline.
:param working_polyline: A vector object defining the working polyline for which catchments are to be calculated.
:param exclude_interior: If True, exclude interior points from the catchment calculation.
:param buffer: A buffer distance in meters to expand the bounding box of the working polyline. This can be used to ensure that the extracted data includes a margin around the area of interest.
:param ignore_smaller_than: Minimum area in square meters for a catchment to be included in the results. Catchments smaller than this threshold will be ignored.
:returns: a tuple of three WolfArrayMB, the first one containing the catchments, the second one containing the flow accumulations, and the third one containing the flow directions for each catchment.
"""
catchments = []
flowdir = self.flow_dir._extract_data(working_polyline, buffer=buffer)
flowacc = self.flow_acc._extract_data(working_polyline, buffer=buffer)
ij = flowacc.get_ij_under_polyline(working_polyline,).tolist()
if exclude_interior:
ij_interior = flowacc.get_ij_inside_polygon(working_polyline,).tolist()
to_remove = []
for idx, (i,j) in enumerate(ij[-1::-1]):
direction = flowdir.array[i, j]
i_down, j_down = get_direction_mapping(direction)
if [i + i_down, j + j_down] not in ij_interior:
to_remove.append(len(ij) - 1 - idx)
for idx in to_remove:
ij.pop(idx)
# Create a sorted list of unique (i, j) indices based on flow accumulation values in descending order
sorted_ij = sorted(ij, key=lambda idx: flowacc.array[idx[0], idx[1]], reverse=True)
# Find catchments for each unique (i, j) index if (i, j) is not in the catchment of a previously processed index
idx = 1
while sorted_ij:
i, j = sorted_ij.pop(0)
if flowacc.array.mask[i, j]:
break
x,y = flowacc.get_xy_from_ij(i, j)
treated = False
for catchment, acc, flow_dir in catchments:
loci, locj = acc.get_ij_from_xy(x, y)
if not acc.array.mask[loci, locj]:
treated = True
break
if treated:
continue
catch_loc = self.get_catchment(x, y, snap_to_max= False)
if catch_loc[0].nbnotnull > int(ignore_smaller_than / (flowacc.dx * flowacc.dy)):
catchments.append(catch_loc)
catchments[-1][0].array[~catchments[-1][0].array.mask] = idx
else:
logging.warning(f"Catchment for point ({x}, {y}) is smaller than the threshold, skipping...")
idx += 1
acc_MB = WolfArrayMB.make_multiblocks([acc for catchment, acc, flow_dir in catchments])
dir_MB = WolfArrayMB.make_multiblocks([flow_dir for catchment, acc, flow_dir in catchments])
catchment_MB = WolfArrayMB.make_multiblocks([catchment for catchment, acc, flow_dir in catchments])
catchment_MB.mypal.distribute_values(0., idx)
catchment_MB.mypal.automatic = False
acc_MB.mypal.distribute_values(0., 10_000.)
acc_MB.mypal.automatic = False
return catchment_MB, acc_MB, dir_MB
[docs]
def get_longest_flow_path(self, x:float, y:float,
snap_to_max:bool=True, snap_length:float=10.0,
prepared_catchment:list[WolfArray] | tuple[WolfArray | None] = None) -> vector:
""" Find the longest flow path in the catchment.
:param x: X coordinate of the point for which the longest flow path is to be calculated.
:param y: Y coordinate of the point for which the longest flow path is to be calculated.
:param snap_to_max: If True, the point will be snapped to the nearest maximum flow accumulation value within a certain distance defined by snap_length before calculating the longest flow path.
:param snap_length: The distance in meters within which to search for the maximum flow accumulation value to snap to if snap_to_max is True.
:param prepared_catchment: A list or tuple containing pre-extracted catchment, accumulation, and flow direction arrays. If provided, these will be used instead of extracting them again from the flow direction and flow accumulation data. The expected format is (catchment: WolfArray, accumulation: WolfArray, flow_direction: WolfArray). If any of these is None, it will be extracted as needed.
:returns: A vector containing the vertices of the longest flow path in the catchment.
"""
if prepared_catchment is not None:
assert len(prepared_catchment) == 3, "prepared_catchment must be a list or tuple of length 3 containing (catchment, accumulation, flow_direction)"
assert all(isinstance(arr, WolfArray) or arr is None for arr in prepared_catchment), "All elements of prepared_catchment must be of type WolfArray or None"
catchment, accumulation, flow_dir = prepared_catchment
else:
catchment, accumulation, flow_dir = self.get_catchment(x, y, snap_to_max=snap_to_max, snap_length=snap_length)
outflow = np.argmax(accumulation.array.data)
i, j = np.unravel_index(outflow, accumulation.array.shape)
path = vector()
while i != -1 and j != -1:
x, y = accumulation.get_xy_from_ij(i, j)
path.add_vertex(wv(x, y))
i, j = _get_largest_neighbor_flowing_towards_me(flow_dir.array.data,
accumulation.array.data,
i, j)
path.update_lengths()
return path
[docs]
def get_accumulation_array(self, vect:vector) -> WolfArray:
""" Get accumulation array from flow accumulation. """
flow_acc = self.flow_acc._extract_data(vect)
return flow_acc
[docs]
def get_direction_array(self, vect:vector) -> WolfArray:
""" Get direction array from flow direction. """
flow_dir = self.flow_dir._extract_data(vect)
return flow_dir