Source code for wolfhece.multicriteria.salmon

"""
This module contains the objects needed to perform
a multi-criteria analysis for the seaward migration
of Atlantic salmons on spatially distributed hydrodynamic data.

The module tests are located in the folder:
tests/multicriteria/test_analysis.py in the HECEPython repository.

Authors: Utashi Ciraane,
HECE, University of Liège, Belgium.
"""
# **************************************************************
# Libraries used throughout the module
# **************************************************************

# To stay inside the HECEPython repository
# ----------------------------------------

# Standard libraries
# -------------------
import logging
import numpy as np
import os

# Cherry-picked objects from standard libraries
# ---------------------------------------------
from enum import Enum
from pathlib import Path
from tqdm import tqdm
from typing import Literal, Union

# Cherry-picked objects from wolfhece
# ------------------------------------
from wolfhece.wolf_array import WolfArray, header_wolf, wolfpalette
from wolfhece.multicriteria.analysis import MulticriteriAnalysis, Input, Operator

[docs] VELOCITY = "velocity"
[docs] DEPTH = "depth"
[docs] TKE = "tke"
[docs] SVG_X = "svg_x"
[docs] SVG_Y = "svg_y"
[docs] SVG_NORM = "svg_norm"
[docs] class Smolt: """ This class performs a multi-criteria analysis to assess the suitability of a river site to the downstream migration of Atlantic salmon smolt. """ def __init__(self, velocity: WolfArray, depth: WolfArray, tke: WolfArray, svg_x : WolfArray = None, svg_y : WolfArray = None, svg_norm : WolfArray = None, threshold_velocity: float = None, threshold_depth: float = None, threshold_tke: float = None, threshold_svg: float = None, ) -> None : THRESHOLD_VELOCITY = 0.15 # Mean and max body lengths from Renardy et al (0.161 & 0.185 m) # m/s @ Ciraane et al., 2025 (abstract), below disorient smolts THRESHOLD_DEPTH = 0.15 # Common sense (1 Body length of a smolt ~ 15 cm), can't swim where there is almot no water. THRESHOLD_TKE = 0.03 # m2/s2 @ Silva et al., 2020 (abstract), befit swimming performances below this value THRESHOLD_SVG = 1 # 1 cm/s/cm @ Enders et al., 2012 (abstract), avoid high spatial velocity gradients
[docs] self._velocity = None
[docs] self._depth = None
[docs] self._tke = None
[docs] self._svg_x = None
[docs] self._svg_y = None
[docs] self._svg_norm = None
[docs] self._threshold_velocity = None
[docs] self._threshold_depth = None
[docs] self._threshold_tke = None
[docs] self._threshold_svg = None
# Set initial values self.velocity = velocity self.depth = depth self.tke = tke self.svg_x = svg_x self.svg_y = svg_y self.svg_norm = svg_norm self.threshold_velocity = threshold_velocity if threshold_velocity is not None else THRESHOLD_VELOCITY self.threshold_depth = threshold_depth if threshold_depth is not None else THRESHOLD_DEPTH self.threshold_tke = threshold_tke if threshold_tke is not None else THRESHOLD_TKE self.threshold_svg = threshold_svg if threshold_svg is not None else THRESHOLD_SVG
[docs] self.inputs = [self.input_velocity, self.input_depth, self.input_tke, # self.input_svg_x, # self.input_svg_y, self.input_svg_norm ]
# Prepare
[docs] self._analysis = None
self.analyse_site() # transformations as inputs @property
[docs] def threshold_velocity(self) -> float: return self._threshold_velocity
@threshold_velocity.setter def threshold_velocity(self, value: float) -> None: self._threshold_velocity = value @property
[docs] def threshold_depth(self) -> float: return self._threshold_depth
@threshold_depth.setter def threshold_depth(self, value: float) -> None: self._threshold_depth = value @property
[docs] def threshold_tke(self) -> float: return self._threshold_tke
@threshold_tke.setter def threshold_tke(self, value: float) -> None: self._threshold_tke = value @property
[docs] def threshold_svg(self) -> float: return self._threshold_svg
@threshold_svg.setter def threshold_svg(self, value: float) -> None: self._threshold_svg = value @property
[docs] def analysis(self) -> MulticriteriAnalysis: if self._analysis is None: self.analyse_site() return self._analysis
@ property
[docs] def velocity(self) -> WolfArray: return self._velocity
@ velocity.setter def velocity(self, value: WolfArray) -> None: self._velocity = value @ property
[docs] def depth(self) -> WolfArray: return self._depth
@ depth.setter def depth(self, value: WolfArray) -> None: self._depth = value @ property
[docs] def tke(self) -> WolfArray: return self._tke
@ tke.setter def tke(self, value: WolfArray) -> None: self._tke = value @ property
[docs] def svg_x(self) -> WolfArray: if self._svg_x is None: self.compute_spatial_velocity_gradient() return self._svg_x
@ svg_x.setter def svg_x(self, value: WolfArray) -> None: self._svg_x = value @ property
[docs] def svg_y(self) -> WolfArray: if self._svg_y is None: self.compute_spatial_velocity_gradient() return self._svg_y
@ svg_y.setter def svg_y(self, value: WolfArray) -> None: self._svg_y = value @ property
[docs] def svg_norm(self) -> WolfArray: if self._svg_norm is None: if self._svg_x is None or self._svg_y is None: self.compute_spatial_velocity_gradient() svg_norm = WolfArray(mold=self.velocity) svg_norm.array[:] = np.sqrt((np.square(self._svg_x.array))+(np.square(self._svg_y.array))) self._svg_norm = svg_norm return self._svg_norm
@ svg_norm.setter def svg_norm(self, value: WolfArray) -> None: self._svg_norm = value @ property
[docs] def input_velocity(self) -> Input: return Input(name=VELOCITY, array=self.velocity, condition = Operator.SUPERIOR_OR_EQUAL.value, threshold= self.threshold_velocity) # meters per second
@ property
[docs] def input_depth(self) -> Input: return Input(name=DEPTH, array=self.depth, condition = Operator.SUPERIOR_OR_EQUAL.value, threshold= self.threshold_depth) # meters
@ property
[docs] def input_tke(self) -> Input: return Input(name=TKE, array=self.tke, condition = Operator.INFERIOR.value, threshold= self.threshold_tke) # m2/s2
@ property
[docs] def input_svg_x(self) -> Input: return Input(name=SVG_X, array=self.svg_x, condition = Operator.BETWEEN_STRICT.value, threshold= (-self.threshold_svg,self.threshold_svg)) # cm/s/cm
@ property
[docs] def input_svg_y(self) -> Input: return Input(name=SVG_Y, array=self.svg_y, condition = Operator.BETWEEN_STRICT.value, threshold= (-self.threshold_svg,self.threshold_svg)) # cm/s/cm
@ property
[docs] def input_svg_norm(self) -> Input: return Input(name=SVG_NORM, array=self.svg_norm, condition = Operator.INFERIOR.value, threshold= self.threshold_svg) # cm/s/cm
@property
[docs] def result(self) -> WolfArray: """Return the result of the multi-criteria analysis as a WolfArray.""" return self.analysis.results
[docs] def compute_spatial_velocity_gradient(self) -> None: """Compute the spatial velocity gradient (SVG) in both x and y directions.""" svg_x_values, svg_y_values = np.gradient(self.velocity.array, self.velocity.dx, self.velocity.dy) svg_x = WolfArray(mold = self.velocity) svg_y = WolfArray(mold = self.velocity) svg_x.array[:] = svg_x_values svg_y.array[:] = svg_y_values self._svg_x = svg_x self._svg_y = svg_y
[docs] def analyse_site(self, method: Literal["percentage", "average", "sum"] = "sum", write_to: str|Path = None) -> WolfArray: """Perform the multi-criteria analysis to assess the suitability of the river site for smolt migration.""" assert self.velocity is not None, "Velocity data is not set." assert self.depth is not None, "Depth data is not set." assert self.tke is not None, "TKE data is not set." assert self.svg_x is not None, "SVG X data is not set." assert self.svg_y is not None, "SVG Y data is not set." if isinstance(write_to, str): write_to = Path(write_to) self._analysis = MulticriteriAnalysis(inputs=self.inputs, method=method, dtype = self.velocity.dtype, mold = self.velocity, write_to=write_to)
[docs] def get_score(self, variable:Literal["Velocity", "Depth", "TKE", "SVG_X", "SVG_Y"]) -> np.ma.MaskedArray: """ Return the score for a given variable as a numpy array. :param variable: The variable for which to get the score. Must be one of "Velocity", "Depth", "TKE", "SVG_X", "SVG_Y". :return: The score as a numpy array. """ return self.analysis.scores.scores[variable].score
[docs] def get_score_as_WolfArray(self, variable:Literal["Velocity", "Depth", "TKE", "SVG_X", "SVG_Y"]) -> WolfArray: """ Return the score for a given variable as a WolfArray. The array is georeferenced and masked, with a discrete color palette (0: red, 1: cyan) to easily visualize the suitability of the site for smolt migration. :param variable: The variable for which to get the score. Must be one of "Velocity", "Depth", "TKE", "SVG_X", "SVG_Y". :return: The score as a WolfArray. """ score = self.get_score(variable) mask = score.mask score_array = WolfArray(mold=self.analysis.mold) score_array.nullvalue = -99999 score_array.set_nullvalue_in_mask() score_array.masknull = True score_array.array[mask == False] = score[mask == False] palette = wolfpalette() palette.set_discrete_values_colors(values=[0, 1, 1.000001], colors= ["#FF0000","#00D9FF","#00D9FF"]) score_array.mypal = palette return score_array
[docs] def get_variable(self, variable:Literal["velocity", "depth", "tke", "svg_x", "svg_y", "svg_norm"]) -> WolfArray: """Return the hydrodynamic variable used in the analysis as a WolfArray. :param variable: The variable to return. Must be one of "velocity", "depth", "tke", "svg_x", "svg_y", "svg_norm". :return: The variable as a WolfArray. """ if variable.lower() == VELOCITY.lower(): return self.velocity elif variable.lower() == DEPTH.lower(): return self.depth elif variable.lower() == TKE.lower(): return self.tke elif variable.lower() == SVG_X.lower(): return self.svg_x elif variable.lower() == SVG_Y.lower(): return self.svg_y elif variable.lower() == SVG_NORM.lower(): return self.svg_norm else: raise ValueError(f"Variable '{variable}' is not recognized.\ Valid variables are: {VELOCITY}, {DEPTH}, {TKE}, {SVG_X}, {SVG_Y}, {SVG_NORM}.")