Source code for wolfhece.hydrology.flowaccdir

"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

from numba import njit
import numpy as np
from tqdm import tqdm
import glob
from os.path import join
import logging
from pathlib import Path
from tempfile import TemporaryDirectory, TemporaryFile
from collections import deque

from ..wolf_array import WolfArray, WolfArrayMB
from ..wolf_tiles import Tiles
from ..PyVertexvectors import vector, wolfvertex as wv
from ..wolf_vrt import create_vrt, crop_vrt
from ..pydownloader import toys_lidaxe, LIDAXE_FLOW_ACC, LIDAXE_FLOW_DIR, LIDAXE_TILES_INDEX

# Define the mapping of flow directions to neighbor indices
@njit(cache=True)
[docs] def get_direction_mapping(value): """ Get the mapping of flow direction value to neighbor indices (i, j). The array convention is WOLF array with X-axis along (i) and Y-axis along (j), with (0, 0) at the lower-left corner. The flow direction values are based on the D8 algorithm, where each value corresponds to a specific direction of flow from a cell to one of its 8 neighbors. """ if value == 1: return (1, 0) # East elif value == 2: return (1, -1) # Southeast elif value == 4: return (0, -1) # South elif value == 8: return (-1, -1) # Southwest elif value == 16: return (-1, 0) # West elif value == 32: return (-1, 1) # Northwest elif value == 64: return (0, 1) # North elif value == 128: return (1, 1) # Northeast else: return (0, 0) # No flow
@njit(cache=True)
[docs] def _is_neighbor_flows_towards_me(flow_dir, start_i, start_j, visited, watershed): """ Check if any unvisited neighbors flow towards the current cell. This function examines the 8 neighboring cells of a given position and determines if any of them have flow directions pointing towards the current cell. Neighbors that flow towards the current cell are marked in the watershed and added to the output queue. :param flow_dir: 2D array containing flow direction values for each cell. :param start_i: Row index of the current cell. :param start_j: Column index of the current cell. :param visited: 2D boolean array tracking which cells have been processed. :param watershed: 2D boolean array marking cells that belong to the watershed. :returns: A list of tuples (neighbor_i, neighbor_j) representing neighbors that flow towards the current cell. Returns an empty list if the starting cell has already been visited. """ queue = [] if visited[start_i, start_j]: return queue visited[start_i, start_j] = True for i in [-1, 0, 1]: for j in [-1, 0, 1]: if i == 0 and j == 0: continue neighbor_i = start_i + i neighbor_j = start_j + j if visited[neighbor_i, neighbor_j]: continue if (0 <= neighbor_i < flow_dir.shape[0]) and (0 <= neighbor_j < flow_dir.shape[1]): neighbor_flow_direction = flow_dir[neighbor_i, neighbor_j] incr_i, incr_j = get_direction_mapping(neighbor_flow_direction) if incr_i == -i and incr_j == -j: watershed[neighbor_i, neighbor_j] = True queue.append((neighbor_i, neighbor_j)) return queue
@njit(cache=True)
[docs] def _inverse_flow_dir_numba(flow_dir, seed_i, seed_j, n_seeds, visited, watershed): """ Fully Numba-compiled BFS for inverse flow direction. The entire BFS runs in compiled code, avoiding repeated Python-Numba boundary crossings that occur in the per-cell approach. :param flow_dir: 2D array containing flow direction values for each cell. :param seed_i: 1D int32 array of row indices for seed cells. :param seed_j: 1D int32 array of column indices for seed cells. :param n_seeds: Number of seed cells. :param visited: 2D boolean array tracking which cells have been processed. :param watershed: 2D boolean array marking cells that belong to the watershed. :returns: A 2D boolean array where True values indicate cells that belong to the watershed defined by the flow direction. """ rows, cols = flow_dir.shape queue_i = np.empty(rows * cols, dtype=np.int32) queue_j = np.empty(rows * cols, dtype=np.int32) head = 0 tail = 0 for k in range(n_seeds): si, sj = seed_i[k], seed_j[k] if not visited[si, sj]: watershed[si, sj] = True visited[si, sj] = True queue_i[tail] = si queue_j[tail] = sj tail += 1 while head < tail: ci = queue_i[head] cj = queue_j[head] head += 1 for di in range(-1, 2): for dj in range(-1, 2): if di == 0 and dj == 0: continue ni = ci + di nj = cj + dj if ni < 0 or ni >= rows or nj < 0 or nj >= cols: continue if visited[ni, nj]: continue incr_i, incr_j = get_direction_mapping(flow_dir[ni, nj]) if incr_i == -di and incr_j == -dj: watershed[ni, nj] = True visited[ni, nj] = True queue_i[tail] = ni queue_j[tail] = nj tail += 1 return watershed
[docs] def inverse_flow_dir(flow_dir:np.ndarray, queue:deque, visited:np.ndarray, watershed:np.ndarray): """ Inverse flow direction to create watershed. :param flow_dir: 2D array containing flow direction values for each cell. :param queue: A deque containing tuples of (i, j) indices for cells to process. Initially contains the starting cell. :param visited: 2D boolean array tracking which cells have been processed. :param watershed: 2D boolean array marking cells that belong to the watershed. :returns: A 2D boolean array where True values indicate cells that belong to the watershed defined by the flow direction. """ if not queue: return watershed seed_i = np.array([int(p[0]) for p in queue], dtype=np.int32) seed_j = np.array([int(p[1]) for p in queue], dtype=np.int32) queue.clear() return _inverse_flow_dir_numba(flow_dir, seed_i, seed_j, len(seed_i), visited, watershed)
@njit(cache=True)
[docs] def _get_largest_neighbor_flowing_towards_me(flow_dir, accumul, start_i, start_j): """ Find the largest neighbor cell that flows towards the current cell. This function examines the 8 neighboring cells of a given position and determines if any of them have flow directions pointing towards the current cell. The neighbor with the highest accumulation value is selected. :param flow_dir: 2D array containing flow direction values for each cell. :param accumul: 2D array containing accumulation values for each cell. :param start_i: Row index of the current cell. :param start_j: Column index of the current cell. :param visited: 2D boolean array tracking which cells have been processed. :returns: A tuple (neighbor_i, neighbor_j) representing the neighbor that flows towards the current cell and has the highest accumulation value. Returns an empty tuple if no such neighbor exists or if the starting cell has already been visited. """ max_accumul = -1 max_i, max_j = -1, -1 for i in [-1, 0, 1]: for j in [-1, 0, 1]: if i == 0 and j == 0: continue neighbor_i = start_i + i neighbor_j = start_j + j neighbor_flow_direction = flow_dir[neighbor_i, neighbor_j] incr_i, incr_j = get_direction_mapping(neighbor_flow_direction) if incr_i == -i and incr_j == -j: if accumul[neighbor_i, neighbor_j] > max_accumul: max_accumul = accumul[neighbor_i, neighbor_j] max_i, max_j = neighbor_i, neighbor_j return (max_i, max_j)
[docs] class FlowAcc(Tiles): """ Class LIDAXE flow accumulation """ def __init__(self, filename = None, data_dir = None): super().__init__(filename, linked_data_dir= data_dir)
[docs] self._search_length = 500.
@property
[docs] def search_length(self): return self._search_length
@search_length.setter def search_length(self, value): self._search_length = value
[docs] def _extract_data(self, bounds_vec:vector, buffer:float = 0.0): """ Extract flow accumulation data for the given bounds vector. This method checks for the presence of the required tiles on disk, downloads them if necessary, creates a VRT file, and crops it to the bounding box defined by the bounds vector. The resulting cropped raster is returned as a WolfArray. :param bounds_vec: A vector object defining the bounding area for which flow accumulation data is to be extracted. :param buffer: A buffer distance in meters to expand the bounding box. This can be used to ensure that the extracted data includes a margin around the area of interest. :returns: A WolfArray containing the flow accumulation data for the specified bounds. """ if buffer > 0: bounds_vec = bounds_vec.buffer(buffer, inplace=False) dirdata = Path(self.linked_data_dir) # find intersection between bounds_vec and the tiles, if no intersection, return an empty array for tile_zone in self.myzones: for tile in tile_zone.myvectors: if tile.polygon.intersects(bounds_vec.polygon): # check if the file is present on disk, if not, dowload it tile_file = dirdata / f"{tile.myname}" if not tile_file.exists(): logging.warning(f"Tile {tile.myname} not found, downloading...") toys_lidaxe(LIDAXE_FLOW_ACC, tile.myname) if (dirdata / 'tmp.vrt').exists(): (dirdata / 'tmp.vrt').unlink() file_vrt = 'tmp.vrt' create_vrt(dirdata, fout=file_vrt) glob_vrt = list(dirdata.glob('*.vrt'))[0] if not glob_vrt.exists(): raise FileNotFoundError(f"VRT file {glob_vrt} does not exist") bounds_vec.find_minmax() bbox = bounds_vec.get_bounds_xx_yy() with TemporaryFile() as file: crop_vrt(str(glob_vrt), bbox, fout=str(file.name)) return WolfArray(Path(file.name).with_suffix('.tif'))
[docs] def get_accumulation_from_catchment(self, catchment:WolfArray) -> WolfArray: """ Get accumulation from a catchment. :param catchment: A WolfArray representing the catchment area for which flow accumulation is to be calculated. :returns: A WolfArray containing the flow accumulation values for the specified catchment area. """ bounds = catchment.get_bounds() vec = vector() vec.add_vertices_from_array(np.array([[bounds[0][0], bounds[1][0]], [bounds[0][0], bounds[1][1]], [bounds[0][1], bounds[1][1]], [bounds[0][1], bounds[1][0]]])) vec.force_to_close() flow_acc = self._extract_data(vec) flow_acc.array.mask[:,:] = catchment.array.mask flow_acc.nullvalue = 0 flow_acc.set_nullvalue_in_mask() flow_acc.count() return flow_acc
[docs] def get_accumulation_around(self, x:float, y:float) -> WolfArray: """ Get accumulation from flow accumulation. :param x: X coordinate of the point around which flow accumulation is to be calculated. :param y: Y coordinate of the point around which flow accumulation is to be calculated. :returns: A WolfArray containing the flow accumulation values for the area around the specified point. """ vec = vector() vec.add_vertices_from_array(np.array([[x-self._search_length, y-self._search_length], [x+self._search_length, y-self._search_length], [x+self._search_length, y+self._search_length], [x-self._search_length, y+self._search_length]])) vec.force_to_close() flow_acc = self._extract_data(vec) return flow_acc
[docs] class FlowDir(Tiles): """ Class to inverse flow direction to create watershed. """ def __init__(self, filename = None, data_dir = None): super().__init__(filename, linked_data_dir= data_dir)
[docs] self._search_length = 500.
""" The search length is the distance in meters that will be used to search for the flow direction tiles around the point of interest. If the catchment touches the border of the extracted flow direction array, it means that the catchment is not complete and we need to search for a larger area. The search length will be increased by this value until we find a sufficiently large area that contains the entire catchment.""" @property
[docs] def search_length(self): return self._search_length
@search_length.setter def search_length(self, value): self._search_length = value
[docs] def _extract_data(self, bounds_vec:vector, buffer:float = 0.0): """ Extract flow direction data for the given bounds vector. :param bounds_vec: A vector object defining the bounding area for which flow direction data is to be extracted. :param buffer: A buffer distance in meters to expand the bounding box. :returns: A WolfArray containing the flow direction data for the specified bounds. """ if buffer > 0: bounds_vec = bounds_vec.buffer(buffer, inplace=False) dirdata = Path(self.linked_data_dir) # find intersection between bounds_vec and the tiles, if no intersection, return an empty array for tile_zone in self.myzones: for tile in tile_zone.myvectors: if tile.polygon.intersects(bounds_vec.polygon): # check if the file is present on disk, if not, dowload it tile_file = dirdata / f"{tile.myname}" if not tile_file.exists(): logging.warning(f"Tile {tile.myname} not found, downloading...") toys_lidaxe(LIDAXE_FLOW_DIR, tile.myname) if (dirdata / 'tmp.vrt').exists(): (dirdata / 'tmp.vrt').unlink() file_vrt = 'tmp.vrt' create_vrt(dirdata, fout=file_vrt) glob_vrt = list(dirdata.glob('*.vrt'))[0] if not glob_vrt.exists(): raise FileNotFoundError(f"VRT file {glob_vrt} does not exist") bounds_vec.find_minmax() bbox = bounds_vec.get_bounds_xx_yy() with TemporaryFile() as file: crop_vrt(str(glob_vrt), bbox, fout=str(file.name)) return WolfArray(Path(file.name).with_suffix('.tif'))
[docs] def get_catchment(self, x:float, y:float) -> WolfArray: """ Get catchment from flow direction. :param x: X coordinate of the point for which catchment is to be calculated. :param y: Y coordinate of the point for which catchment is to be calculated. :returns: A WolfArray containing the catchment area for the specified point. """ vec = vector() vec.add_vertices_from_array(np.array([[x-self._search_length, y-self._search_length], [x+self._search_length, y-self._search_length], [x+self._search_length, y+self._search_length], [x-self._search_length, y+self._search_length]])) vec.force_to_close() flow_dir = self._extract_data(vec) catchment, flow_dir = self._inverse_flow_dir(flow_dir, x, y) return catchment, flow_dir
[docs] def _inverse_flow_dir(self, flow_dir:WolfArray, x:float, y:float) -> tuple[WolfArray, WolfArray]: """ Inverse flow direction to create watershed. Begin with a quite small area around the point of interest, extract the flow direction data for this area, and inverse the flow direction to create the watershed. If the catchment touches the border of the extracted flow direction array, it means that the catchment is not complete and we need to search for a larger area. The search length will be increased by the value of self._search_length until we find a sufficiently large area that contains the entire catchment. :param flow_dir: A WolfArray containing the flow direction data for the area around the point of interest. :param x: X coordinate of the point for which catchment is to be calculated. :param y: Y coordinate of the point for which catchment is to be calculated. :returns: A tuple of WolfArrays containing the catchment area and flow direction for the specified point. """ sufficiently_large = False visited = np.zeros(flow_dir.array.shape, dtype=bool) watershed = np.zeros(flow_dir.array.shape, dtype=bool) queue = deque() queue.append((flow_dir.get_ij_from_xy(x, y))) while not sufficiently_large: catchment = inverse_flow_dir(flow_dir.array.data, queue, visited, watershed) # test if catchment touches the border of the array, if yes, it means that the catchment is not complete and we need to search for a larger area if np.any(catchment[0, :]) or np.any(catchment[-1, :]) or np.any(catchment[:, 0]) or np.any(catchment[:, -1]): logging.warning("Catchment touches the border of the array, searching for a larger area...") to_left = np.any(catchment[0, :]) to_right = np.any(catchment[-1, :]) to_bottom = np.any(catchment[:, 0]) to_top = np.any(catchment[:, -1]) [[xmin, xmax], [ymin, ymax]] = flow_dir.get_bounds() xmin = xmin - self._search_length if to_left else xmin xmax = xmax + self._search_length if to_right else xmax ymin = ymin - self._search_length if to_bottom else ymin ymax = ymax + self._search_length if to_top else ymax vec = vector() vec.add_vertices_from_array(np.array([[xmin, ymin], [xmax, ymin], [xmax, ymax], [xmin, ymax]])) vec.force_to_close() new_flow_dir = self._extract_data(vec) old_origin_i, old_origin_j = new_flow_dir.get_ij_from_xy(flow_dir.origx, flow_dir.origy) # extend visited and watershed arrays to the new size new_visited = np.zeros(new_flow_dir.array.shape, dtype=bool) new_watershed = np.zeros(new_flow_dir.array.shape, dtype=bool) new_visited[old_origin_i:old_origin_i+visited.shape[0], old_origin_j:old_origin_j+visited.shape[1]] = visited new_watershed[old_origin_i:old_origin_i+watershed.shape[0], old_origin_j:old_origin_j+watershed.shape[1]] = watershed queue_left = list(np.argwhere(watershed[0, :] == True)) queue_right = list(np.argwhere(watershed[-1, :] == True)) queue_bottom = list(np.argwhere(watershed[:, 0] == True)) queue_top = list(np.argwhere(watershed[:, -1] == True)) queue = [(old_origin_i, j + old_origin_j) for j in queue_left] + \ [(old_origin_i + watershed.shape[0]-1, j + old_origin_j) for j in queue_right] + \ [(i + old_origin_i, old_origin_j) for i in queue_bottom] + \ [(i + old_origin_i, old_origin_j + watershed.shape[1]-1) for i in queue_top] queue = deque(queue) visited = new_visited watershed = new_watershed flow_dir = new_flow_dir for i,j in queue: visited[i, j] = False else: sufficiently_large = True catchment_wa = WolfArray(mold=flow_dir) catchment_wa.array.data[:,:] = catchment.astype(np.int8) catchment_wa.mask_data(0) return catchment_wa, flow_dir
[docs] class Lidaxe(): """ Class to handle LIDAXE flow direction and flow accumulation data. """ def __init__(self, flow_direction:Path | None = None, flow_accumulation:Path | None = None): """ Initialize the Lidaxe class. :param flow_direction: Path to the flow direction data. If None, it will be downloaded from the Lidaxe dataset. :param flow_accumulation: Path to the flow accumulation data. If None, it will be downloaded from the Lidaxe dataset. """ if flow_direction is None: # flow_direction = toys_lidaxe(FLOW_DIR, 'tmp.vrt') flow_direction = toys_lidaxe(LIDAXE_FLOW_DIR, LIDAXE_TILES_INDEX) if flow_accumulation is None: # flow_accumulation = toys_lidaxe(FLOW_ACC, 'tmp.vrt') flow_accumulation = toys_lidaxe(LIDAXE_FLOW_ACC, LIDAXE_TILES_INDEX) flow_direction = Path(flow_direction) flow_accumulation = Path(flow_accumulation) if not flow_direction.exists(): logging.error(f"Flow direction file {flow_direction} does not exist") return if not flow_accumulation.exists(): logging.error(f"Flow accumulation file {flow_accumulation} does not exist") return
[docs] self.flow_dir = FlowDir(filename=str(flow_direction), data_dir=str(flow_direction.parent))
[docs] self.flow_acc = FlowAcc(filename=str(flow_accumulation), data_dir=str(flow_accumulation.parent))
[docs] def snap_to_max(self, x:float, y:float, snap_length:float=10.0) -> tuple[float, float]: """ Snap the point to the nearest maximum flow accumulation value within a certain distance. :param x: X coordinate of the point to be snapped. :param y: Y coordinate of the point to be snapped. :param snap_length: The distance in meters within which to search for the maximum flow accumulation value to snap to. :returns: A tuple containing the new X and Y coordinates of the snapped point. """ vec = vector() vec.add_vertices_from_array(np.array([[x-snap_length, y-snap_length], [x+snap_length, y-snap_length], [x+snap_length, y+snap_length], [x-snap_length, y+snap_length]])) vec.force_to_close() acc = self.flow_acc._extract_data(vec) ij = np.argmax(acc.array.data) x,y = acc.get_xy_from_ij(*np.unravel_index(ij, acc.array.shape)) return x,y
[docs] def get_catchment(self, x:float, y:float, snap_to_max:bool=True, snap_length:float=10.0) -> tuple[WolfArray, WolfArray, WolfArray]: """ Get catchment from flow direction and flow accumulation. :param x: X coordinate of the point for which catchment is to be calculated. :param y: Y coordinate of the point for which catchment is to be calculated. :param snap_to_max: If True, the point will be snapped to the nearest maximum flow accumulation value within a certain distance defined by snap_length. :param snap_length: The distance in meters within which to search for the maximum flow accumulation value to snap to if snap_to_max is True. :returns: A tuple of WolfArrays containing the catchment area, flow accumulation, and flow direction for the specified point. """ if snap_to_max: x,y = self.snap_to_max(x, y, snap_length=snap_length) catchment, flow_dir = self.flow_dir.get_catchment(x, y) acc = self.flow_acc.get_accumulation_from_catchment(catchment) acc.mypal.distribute_values(0., 10_000.) acc.mypal.automatic = False return catchment, acc, flow_dir
[docs] def get_catchments(self, working_area: vector, buffer:float = 2.0) -> tuple[WolfArrayMB, WolfArrayMB, WolfArrayMB]: """ Get catchments from flow direction and flow accumulation for a working area. :param working_area: A vector object defining the working area for which catchments are to be calculated. :param buffer: A buffer distance in meters to expand the bounding box of the working polyline. This can be used to ensure that the extracted data includes a margin around the area of interest. :returns: a tuple of three WolfArrayMB, the first one containing the catchments, the second one containing the flow accumulations, and the third one containing the flow directions for each catchment. """ catchments = [] flowdir = self.flow_dir._extract_data(working_area, buffer=buffer) flowacc = self.flow_acc._extract_data(working_area, buffer=buffer) ij = flowacc.get_ij_inside_polygon(working_area,).tolist() ij_along = flowacc.get_ij_under_polyline(working_area,).tolist() # Create a set of unique (i, j) indices from both lists ij = list(set(tuple(idx) for idx in ij + ij_along)) # Create a sorted list of unique (i, j) indices based on flow accumulation values in descending order sorted_ij = sorted(ij, key=lambda idx: flowacc.array[idx[0], idx[1]], reverse=True) # Find catchments for each unique (i, j) index if (i, j) is not in the catchment of a previously processed index idx = 1 while sorted_ij: i, j = sorted_ij.pop(0) if flowacc.array.mask[i, j]: break x,y = flowacc.get_xy_from_ij(i, j) treated = False for catchment, acc, flow_dir in catchments: loci, locj = acc.get_ij_from_xy(x, y) if not acc.array.mask[loci, locj]: treated = True break if treated: continue catchments.append(self.get_catchment(x, y, snap_to_max= False)) catchments[-1][0].array[~catchments[-1][0].array.mask] = idx idx += 1 acc_MB = WolfArrayMB.make_multiblocks([acc for catchment, acc, flow_dir in catchments]) dir_MB = WolfArrayMB.make_multiblocks([flow_dir for catchment, acc, flow_dir in catchments]) catchment_MB = WolfArrayMB.make_multiblocks([catchment for catchment, acc, flow_dir in catchments]) catchment_MB.mypal.distribute_values(0., idx) catchment_MB.mypal.automatic = False acc_MB.mypal.distribute_values(0., 10_000.) acc_MB.mypal.automatic = False return catchment_MB, acc_MB, dir_MB
[docs] def get_catchments_on_border(self, working_polyline: vector, exclude_interior:bool = True, buffer:float = 2.0, ignore_smaller_than:float = 100.0) -> tuple[WolfArrayMB, WolfArrayMB, WolfArrayMB]: """ Get catchments from flow direction and flow accumulation for a working polyline. :param working_polyline: A vector object defining the working polyline for which catchments are to be calculated. :param exclude_interior: If True, exclude interior points from the catchment calculation. :param buffer: A buffer distance in meters to expand the bounding box of the working polyline. This can be used to ensure that the extracted data includes a margin around the area of interest. :param ignore_smaller_than: Minimum area in square meters for a catchment to be included in the results. Catchments smaller than this threshold will be ignored. :returns: a tuple of three WolfArrayMB, the first one containing the catchments, the second one containing the flow accumulations, and the third one containing the flow directions for each catchment. """ catchments = [] flowdir = self.flow_dir._extract_data(working_polyline, buffer=buffer) flowacc = self.flow_acc._extract_data(working_polyline, buffer=buffer) ij = flowacc.get_ij_under_polyline(working_polyline,).tolist() if exclude_interior: ij_interior = flowacc.get_ij_inside_polygon(working_polyline,).tolist() to_remove = [] for idx, (i,j) in enumerate(ij[-1::-1]): direction = flowdir.array[i, j] i_down, j_down = get_direction_mapping(direction) if [i + i_down, j + j_down] not in ij_interior: to_remove.append(len(ij) - 1 - idx) for idx in to_remove: ij.pop(idx) # Create a sorted list of unique (i, j) indices based on flow accumulation values in descending order sorted_ij = sorted(ij, key=lambda idx: flowacc.array[idx[0], idx[1]], reverse=True) # Find catchments for each unique (i, j) index if (i, j) is not in the catchment of a previously processed index idx = 1 while sorted_ij: i, j = sorted_ij.pop(0) if flowacc.array.mask[i, j]: break x,y = flowacc.get_xy_from_ij(i, j) treated = False for catchment, acc, flow_dir in catchments: loci, locj = acc.get_ij_from_xy(x, y) if not acc.array.mask[loci, locj]: treated = True break if treated: continue catch_loc = self.get_catchment(x, y, snap_to_max= False) if catch_loc[0].nbnotnull > int(ignore_smaller_than / (flowacc.dx * flowacc.dy)): catchments.append(catch_loc) catchments[-1][0].array[~catchments[-1][0].array.mask] = idx else: logging.warning(f"Catchment for point ({x}, {y}) is smaller than the threshold, skipping...") idx += 1 acc_MB = WolfArrayMB.make_multiblocks([acc for catchment, acc, flow_dir in catchments]) dir_MB = WolfArrayMB.make_multiblocks([flow_dir for catchment, acc, flow_dir in catchments]) catchment_MB = WolfArrayMB.make_multiblocks([catchment for catchment, acc, flow_dir in catchments]) catchment_MB.mypal.distribute_values(0., idx) catchment_MB.mypal.automatic = False acc_MB.mypal.distribute_values(0., 10_000.) acc_MB.mypal.automatic = False return catchment_MB, acc_MB, dir_MB
[docs] def get_longest_flow_path(self, x:float, y:float, snap_to_max:bool=True, snap_length:float=10.0, prepared_catchment:list[WolfArray] | tuple[WolfArray | None] = None) -> vector: """ Find the longest flow path in the catchment. :param x: X coordinate of the point for which the longest flow path is to be calculated. :param y: Y coordinate of the point for which the longest flow path is to be calculated. :param snap_to_max: If True, the point will be snapped to the nearest maximum flow accumulation value within a certain distance defined by snap_length before calculating the longest flow path. :param snap_length: The distance in meters within which to search for the maximum flow accumulation value to snap to if snap_to_max is True. :param prepared_catchment: A list or tuple containing pre-extracted catchment, accumulation, and flow direction arrays. If provided, these will be used instead of extracting them again from the flow direction and flow accumulation data. The expected format is (catchment: WolfArray, accumulation: WolfArray, flow_direction: WolfArray). If any of these is None, it will be extracted as needed. :returns: A vector containing the vertices of the longest flow path in the catchment. """ if prepared_catchment is not None: assert len(prepared_catchment) == 3, "prepared_catchment must be a list or tuple of length 3 containing (catchment, accumulation, flow_direction)" assert all(isinstance(arr, WolfArray) or arr is None for arr in prepared_catchment), "All elements of prepared_catchment must be of type WolfArray or None" catchment, accumulation, flow_dir = prepared_catchment else: catchment, accumulation, flow_dir = self.get_catchment(x, y, snap_to_max=snap_to_max, snap_length=snap_length) outflow = np.argmax(accumulation.array.data) i, j = np.unravel_index(outflow, accumulation.array.shape) path = vector() while i != -1 and j != -1: x, y = accumulation.get_xy_from_ij(i, j) path.add_vertex(wv(x, y)) i, j = _get_largest_neighbor_flowing_towards_me(flow_dir.array.data, accumulation.array.data, i, j) path.update_lengths() return path
[docs] def get_accumulation_array(self, vect:vector) -> WolfArray: """ Get accumulation array from flow accumulation. """ flow_acc = self.flow_acc._extract_data(vect) return flow_acc
[docs] def get_direction_array(self, vect:vector) -> WolfArray: """ Get direction array from flow direction. """ flow_dir = self.flow_dir._extract_data(vect) return flow_dir