Source code for wolfhece.wolfresults_2D

"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import sys
import wx
from os.path import dirname, exists, join, splitext
from math import floor
from pathlib import Path

import numpy.ma as ma
import numpy as np
import matplotlib.path as mpltPath
import matplotlib.pyplot as plt
from enum import Enum
from typing import Literal, Union
import logging
from tqdm import tqdm
from datetime import datetime as dt, timedelta

try:
    from OpenGL.GL import *
except:
[docs] msg=_('Error importing OpenGL library')
msg+=_(' Python version : ' + sys.version) msg+=_(' Please check your version of opengl32.dll -- conflict may exist between different files present on your desktop') raise Exception(msg) from .drawing_obj import Element_To_Draw from .PyPalette import wolfpalette from .PyTranslate import _ from .gpuview import GRID_N, Rectangle, VectorField from .pyshields import get_d_cr, get_d_cr_susp, izbach_d_cr, get_Shields_2D_Manning, get_friction_slope_2D_Manning, get_shear_velocity_2D_Manning from .pyviews import WolfViews from .mesh2d.wolf2dprev import prev_parameters_simul, blocks_file from .GraphNotebook import PlotPanel from .CpGrid import CpGrid try: from .libs import wolfpy except Exception as ex: # This convoluted error handling is here to catch an issue # which was difficult to track down: wolfpy was there # but its DLL were not available. from importlib.util import find_spec
[docs] s = find_spec('wolfhece.libs.wolfpy')
# Not too sure about this find_spec. If the root # directory is not the good one, the import search may # end up in the site packages, loading the wrong wolfpy. base = Path(__file__).parent.parts package = Path(s.origin).parent.parts is_submodule = (len(base) <= len(package)) and all(i==j for i, j in zip(base, package)) if is_submodule: msg = _("wolfpy was found but we were not able to load it. It may be an issue with its DLL dependencies") msg += _("The actual error was: {}").format(str(ex)) raise Exception(msg) from .wolf_array import WolfArray, getkeyblock, header_wolf, WolfArrayMB, WolfArrayMNAP, header_wolf, SelectionData, SelectionDataMB, VERSION_RGB from .mesh2d import wolf2dprev from .PyVertexvectors import vector, zone, Zones
[docs] def outside_domain(val): """ Test if a value is outside the calculated domain """ return val[0][0] is np.ma.masked or val[1][0] =='-'
[docs] def q_splitting(q_left, q_right): """ Splitting of the normal flow between two nodes """ prod_q = q_left * q_right sum_q = q_left + q_right if prod_q > 0.: if q_left > 0.: return q_left else: return q_right elif prod_q < 0.: if sum_q > 0.: return q_left elif sum_q < 0.: return q_right else: return 0. else: if q_left<0.: return 0. elif q_right<0.: return 0. else: return sum_q / 2.
[docs] def u_splitting(q_left, q_right, h_left, h_right): """ Splitting of the normal flow velocity between two nodes """ prod_q = q_left * q_right sum_q = q_left + q_right if prod_q > 0.: if q_left > 0.: return q_left/h_left else: return q_right/h_right elif prod_q < 0.: if sum_q > 0.: return q_left/h_left elif sum_q < 0.: return q_right/h_right else: return 0. else: if q_left<0.: return 0. elif q_right<0.: return 0. else: return (q_left/h_left + q_right/h_right) / 2.
[docs] class Props_Res_2D(wx.Frame): """ Fenêtre de propriétésd'un Wolfresults_2D """ def __init__(self, parent:"Wolfresults_2D", mapviewer=None): self._parent:Wolfresults_2D self._parent = parent from .PyDraw import WolfMapViewer self._mapviewer:WolfMapViewer self._mapviewer = mapviewer self.wx_exists = wx.App.Get() is not None self.active_vector:vector = None self.active_zone:zone = None self.active_array:WolfArray = self._parent self.myzones = Zones(parent=self) self.myzonetmp = zone(name='tmp') self.vectmp = vector(name='tmp') self.fnsave = '' self.myzonetmp.add_vector(self.vectmp) self.myzones.add_zone(self.myzonetmp) if self.wx_exists: self.set_GUI()
[docs] def get_mapviewer(self): """ Retourne une instance WolfMapViewer """ return self._mapviewer
[docs] def get_linked_arrays(self): """ Pour compatibilité avec la gestion de vecteur et WolfMapViewer """ return {self._parent.idx: self._parent}
[docs] def set_GUI(self): """Set the wx GUI""" super(Props_Res_2D, self).__init__(None, title=_('Properties'), size=(600, 700), style=wx.DEFAULT_FRAME_STYLE | wx.TAB_TRAVERSAL) # GUI self.Bind(wx.EVT_CLOSE, self.onclose) self.Bind(wx.EVT_SHOW, self.onshow) self.SetSizeHints(wx.DefaultSize, wx.DefaultSize) # GUI Notebook self._notebook = wx.Notebook(self, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize) # panel Selection # self.selection = wx.Panel(self._notebook, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL) # self._notebook.AddPage(self.selection, _("Selection"), True) # panel Operations # self.operation = wx.Panel(self._notebook, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL) # self._notebook.AddPage(self.operation, _("Operators"), False) # panel Mask # self.mask = wx.Panel(self._notebook, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL) # self._notebook.AddPage(self.mask, _("Mask"), False) # panel Interpolation # self.Interpolation = wx.Panel(self._notebook, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL) # self._notebook.AddPage(self.Interpolation, _("Interpolation"), False) # panel Tools/Misc self._panel_colormap = PlotPanel(self._notebook, wx.ID_ANY, toolbar=False) self._notebook.AddPage(self._panel_colormap, _("Palette"), False) self._tools = wx.Panel(self._notebook, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL) self._notebook.AddPage(self._tools, _("Miscellaneous"), False) # panel PALETTE de couleurs self._cm_grid = CpGrid(self._panel_colormap, wx.ID_ANY, style=wx.WANTS_CHARS | wx.TE_CENTER) self._cm_apply = wx.Button(self._panel_colormap, wx.ID_ANY, _("Apply"), wx.DefaultPosition, wx.DefaultSize, 0) self._cm_apply.SetToolTip(_('Apply changes in memory')) self._cm_grid.CreateGrid(16, 4) self._cm_auto = wx.CheckBox(self._panel_colormap, wx.ID_ANY, _("Automatic"), wx.DefaultPosition, wx.DefaultSize, style=wx.CHK_CHECKED) self._cm_auto.SetToolTip(_('Activating/Deactivating automatic colormap values distribution')) self._cm_uniform_or_lerp = wx.CheckBox(self._panel_colormap, wx.ID_ANY, _("Uniform in parts"), wx.DefaultPosition, wx.DefaultSize, style=wx.CHK_UNCHECKED) self._cm_uniform_or_lerp.SetToolTip(_('Activating/Deactivating linear interpolation')) self._cm_alpha = wx.CheckBox(self._panel_colormap, wx.ID_ANY, _("Opacity"), wx.DefaultPosition, wx.DefaultSize, style=wx.CHK_CHECKED) self._cm_alpha.SetToolTip(_('Activating/Deactivating transparency of the array')) self._cm_hillshade = wx.CheckBox(self._panel_colormap, wx.ID_ANY, _("Hillshade"), wx.DefaultPosition, wx.DefaultSize, style=wx.CHK_CHECKED) self._cm_hillshade.SetToolTip(_('Activating/Deactivating hillshade on colors and create if necessary a gray map')) self._cm_slider_alpha = wx.Slider(self._panel_colormap, wx.ID_ANY, 100, 0, 100, wx.DefaultPosition, wx.DefaultSize, wx.SL_HORIZONTAL, name='palslider') self._cm_slider_alpha.SetToolTip(_('Global opacity (transparent --> opaque)')) self._cm_slider_hillshade = wx.Slider(self._panel_colormap, wx.ID_ANY, 100, 0, 100, wx.DefaultPosition, wx.DefaultSize, wx.SL_HORIZONTAL, name='palalphaslider') self._cm_slider_hillshade.SetToolTip(_('Hillshade transparency (transparent-->opaque)')) self._cm_slider_azimuth = wx.Slider(self._panel_colormap, wx.ID_ANY, 315, 0, 360, wx.DefaultPosition, wx.DefaultSize, wx.SL_HORIZONTAL, name='palazimuthslider') self._cm_slider_azimuth.SetToolTip(_('Hillshade azimuth (0-->360)')) self._cm_slider_altitude = wx.Slider(self._panel_colormap, wx.ID_ANY, 0, 0, 90, wx.DefaultPosition, wx.DefaultSize, wx.SL_HORIZONTAL, name='palaltitudeslider') self._cm_slider_altitude.SetToolTip(_('Hillshade altitude (0-->90)')) self._cm_save = wx.Button(self._panel_colormap, wx.ID_ANY, _("Save to file"), wx.DefaultPosition, wx.DefaultSize, 0) self._cm_save.SetToolTip(_('Save colormap on .pal file')) self._cm_load = wx.Button(self._panel_colormap, wx.ID_ANY, _("Load from file"), wx.DefaultPosition, wx.DefaultSize, 0) self._cm_load.SetToolTip(_('Load colormap from .pal file')) self._cm_default_pal = wx.Button(self._panel_colormap, wx.ID_ANY, _("Load precomputed"), wx.DefaultPosition, wx.DefaultSize, 0) self._cm_default_pal.SetToolTip(_('Load a default colormap available in the software')) self._cm_toimage = wx.Button(self._panel_colormap, wx.ID_ANY, _("Create image"), wx.DefaultPosition, wx.DefaultSize, 0) self._cm_toimage.SetToolTip(_('Generate colormap image (horizontal, vertical or both) and save to disk')) self._cm_distribute = wx.Button(self._panel_colormap, wx.ID_ANY, _("Distribute"), wx.DefaultPosition, wx.DefaultSize, 0) self._cm_distribute.SetToolTip(_('Set colormap values based on minimum+maximum or minimum+step')) if self._parent.mypal.automatic: self._cm_auto.SetValue(1) else: self._cm_auto.SetValue(0) if self._parent.mypal.interval_cst: self._cm_uniform_or_lerp.SetValue(1) else: self._cm_uniform_or_lerp.SetValue(0) self._cm_alpha.SetValue(1) self._cm_choose_color = wx.Button(self._panel_colormap, wx.ID_ANY, _("Choose color for current value"), wx.DefaultPosition, wx.DefaultSize) self._cm_choose_color.SetToolTip(_('Color dialog box for the current selected value in the grid')) self._default16 = wx.Button(self._panel_colormap, wx.ID_ANY, _("Default 16 colors"), wx.DefaultPosition, wx.DefaultSize, 0) self._defaultblue = wx.Button(self._panel_colormap, wx.ID_ANY, _("Default blue colors"), wx.DefaultPosition, wx.DefaultSize, 0) self._default_grey = wx.Button(self._panel_colormap, wx.ID_ANY, _("Default grey colors"), wx.DefaultPosition, wx.DefaultSize, 0) self._default16.Bind(wx.EVT_BUTTON, self.OnDefault_cmap) self._defaultblue.Bind(wx.EVT_BUTTON, self.OnDefault_cmap) self._default_grey.Bind(wx.EVT_BUTTON, self.OnDefault_cmap) sizer_default_cmap = wx.BoxSizer(wx.HORIZONTAL) sizer_default_cmap.Add(self._default16, 1, wx.EXPAND) sizer_default_cmap.Add(self._defaultblue, 1, wx.EXPAND) sizer_default_cmap.Add(self._default_grey, 1, wx.EXPAND) # Sizer Colormap # -------------- self._panel_colormap.sizerfig.Add(self._cm_grid, 1, wx.EXPAND) self._panel_colormap.sizer.Add(self._cm_auto, 1, wx.EXPAND) self._panel_colormap.sizer.Add(self._cm_uniform_or_lerp, 1, wx.EXPAND) self._panel_colormap.sizer.Add(self._cm_alpha, 1, wx.EXPAND) self._panel_colormap.sizer.Add(self._cm_slider_alpha, 1, wx.EXPAND) # self._panel_colormap.sizer.Add(self._cm_hillshade, 1, wx.EXPAND) # self._panel_colormap.sizer.Add(self._cm_slider_hillshade, 1, wx.EXPAND) # self._panel_colormap.sizer.Add(self._cm_slider_azimuth, 1, wx.EXPAND) # self._panel_colormap.sizer.Add(self._cm_slider_altitude, 1, wx.EXPAND) self._panel_colormap.sizer.Add(self._cm_choose_color, 1, wx.EXPAND) self._panel_colormap.sizer.Add(self._cm_apply, 1, wx.EXPAND) sizer_loadpal = wx.BoxSizer(wx.HORIZONTAL) self._panel_colormap.sizer.Add(sizer_loadpal, 1, wx.EXPAND) sizer_loadpal.Add(self._cm_load, 1, wx.EXPAND) sizer_loadpal.Add(self._cm_default_pal, 1, wx.EXPAND) self._panel_colormap.sizer.Add(self._cm_save, 1, wx.EXPAND) self._panel_colormap.sizer.Add(self._cm_toimage, 1, wx.EXPAND) self._panel_colormap.sizer.Add(self._cm_distribute, 1 , wx.EXPAND) self._panel_colormap.sizer.Add(sizer_default_cmap, 1, wx.EXPAND) # HISTOGRAMMES self._panel_histo = PlotPanel(self._notebook, wx.ID_ANY, toolbar=True) self._histo_update = wx.Button(self._panel_histo, wx.ID_ANY, _("All data..."), wx.DefaultPosition, wx.DefaultSize, 0) self._histo_updatezoom = wx.Button(self._panel_histo, wx.ID_ANY, _("On zoom..."), wx.DefaultPosition, wx.DefaultSize, 0) self._histo_updateerase = wx.Button(self._panel_histo, wx.ID_ANY, _("Erase"), wx.DefaultPosition, wx.DefaultSize, 0) # Sizer Histogram # --------------- self._panel_histo.sizer.Add(self._histo_update, 0, wx.EXPAND) self._panel_histo.sizer.Add(self._histo_updatezoom, 0, wx.EXPAND) self._panel_histo.sizer.Add(self._histo_updateerase, 0, wx.EXPAND) self._notebook.AddPage(self._panel_histo, _("Histogram"), False) # # LINKS # self.links = wx.Panel(self._notebook, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL) # self._notebook.AddPage(self.links, _("Links"), False) # # Interpolation # gSizer1 = wx.GridSizer(0, 2, 0, 0) # self.interp2D = wx.Button(self.Interpolation, wx.ID_ANY, _("2D Interpolation on selection"), wx.DefaultPosition, # wx.DefaultSize, 0) # self.interp2D.SetToolTip(_('Spatial interpolation based on nodes stored in named groups. \n The interpolation apply only on the current selection.')) # gSizer1.Add(self.interp2D, 0, wx.EXPAND) # self.interp2D.Bind(wx.EVT_BUTTON, self.interpolation2D) # self.m_button7 = wx.Button(self.Interpolation, wx.ID_ANY, _("Stage/Volume/Surface evaluation"), wx.DefaultPosition, # wx.DefaultSize, 0) # self.m_button7.SetToolTip(_('Evaluate stage-volume-surface relationship. \n Results : plots and arrays saved on disk')) # gSizer1.Add(self.m_button7, 0, wx.EXPAND) # self.m_button7.Bind(wx.EVT_BUTTON, self.volumesurface) # self.m_button8 = wx.Button(self.Interpolation, wx.ID_ANY, _("Interpolation on active zone \n polygons"), # wx.DefaultPosition, wx.DefaultSize, 0) # self.m_button8.SetToolTip(_('Spatial interpolation based on all polygons in active zone')) # gSizer1.Add(self.m_button8, 0, wx.EXPAND) # self.m_button8.Bind(wx.EVT_BUTTON, self.interp2Dpolygons) # self.m_button9 = wx.Button(self.Interpolation, wx.ID_ANY, _("Interpolation on active zone \n 3D polylines"), # wx.DefaultPosition, wx.DefaultSize, 0) # self.m_button9.SetToolTip(_('Spatial interpolation based on all polylines in active zone')) # gSizer1.Add(self.m_button9, 0, wx.EXPAND) # self.m_button9.Bind(wx.EVT_BUTTON, self.interp2Dpolylines) # self.m_button10 = wx.Button(self.Interpolation, wx.ID_ANY, _("Interpolation on active vector \n polygon"), # wx.DefaultPosition, wx.DefaultSize, 0) # self.m_button10.SetToolTip(_('Spatial interpolation based on active polygon')) # gSizer1.Add(self.m_button10, 0, wx.EXPAND) # self.m_button10.Bind(wx.EVT_BUTTON, self.interp2Dpolygon) # self.m_button11 = wx.Button(self.Interpolation, wx.ID_ANY, _("Interpolation on active vector \n 3D polyline"), # wx.DefaultPosition, wx.DefaultSize, 0) # self.m_button11.SetToolTip(_('Spatial interpolation based on active polyline')) # gSizer1.Add(self.m_button11, 0, wx.EXPAND) # self.m_button11.Bind(wx.EVT_BUTTON, self.interp2Dpolyline) # self.Interpolation.SetSizer(gSizer1) # self.Interpolation.Layout() # gSizer1.Fit(self.Interpolation) # Tools # ----- Toolssizer = wx.BoxSizer(wx.VERTICAL) hbox = wx.BoxSizer(wx.HORIZONTAL) hbox2 = wx.BoxSizer(wx.HORIZONTAL) self._label_epsilon = wx.StaticText(self._tools,label=_('Epsilon')) self._txt_epsilon = wx.TextCtrl(self._tools,value=str(self._parent.epsilon), style=wx.TE_CENTER) self._txt_epsilon.SetToolTip(_('Epsilon value under which the value is not plotted')) self._label_nullvalue = wx.StaticText(self._tools,label=_('Null value')) self._txt_nullvalue = wx.TextCtrl(self._tools,value=str(self._parent.nullvalue), style=wx.TE_CENTER) self._txt_nullvalue.SetToolTip(_('Null value -- All node with this value will be masked (not plotted)')) hbox.Add(self._label_epsilon, 0, wx.EXPAND|wx.ALL) hbox.Add(self._txt_epsilon, 1, wx.EXPAND|wx.ALL) hbox2.Add(self._label_nullvalue, 0, wx.EXPAND|wx.ALL) hbox2.Add(self._txt_nullvalue, 1, wx.EXPAND|wx.ALL) self._Apply_Tools = wx.Button(self._tools, wx.ID_ANY, _("Apply modifications"), wx.DefaultPosition, wx.DefaultSize, 0) self._Apply_Tools.SetToolTip(_("Save modifications into memmory/object")) Toolssizer.Add(hbox, 0, wx.EXPAND) Toolssizer.Add(hbox2, 0, wx.EXPAND) Toolssizer.Add(self._Apply_Tools, 1, wx.EXPAND) self._tools.SetSizer(Toolssizer) self._tools.Layout() Toolssizer.Fit(self._tools) # Selection # --------- # bSizer15 = wx.BoxSizer(wx.VERTICAL) # bSizer21 = wx.BoxSizer(wx.HORIZONTAL) # bSizer16 = wx.BoxSizer(wx.VERTICAL) # selectmethodChoices = [_("by clicks"), _("inside active vector"), _("inside active zone"), # _("inside temporary vector"), _("under active vector"), _("under active zone"), # _("under temporary vector")] # self.selectmethod = wx.RadioBox(self.selection, wx.ID_ANY, _("How to select nodes?"), wx.DefaultPosition, # wx.DefaultSize, selectmethodChoices, 1, wx.RA_SPECIFY_COLS) # self.selectmethod.SetSelection(0) # self.selectmethod.SetToolTip(_("Selection mode : \n - one by one (keyboard shortcut N) \n- inside the currently activated polygon (keyboard shortcut V) \n- inside the currently activated zone (multipolygons) \n- inside a temporary polygon (keyboard shortcut B) \n- under the currently activated polyline \n- under the currently activated zone (multipolylines) \n- under a temporary polyline")) # bSizer16.Add(self.selectmethod, 0, wx.ALL, 5) # self.selectrestricttomask = wx.CheckBox(self.selection,wx.ID_ANY,_('Use mask to restrict')) # self.selectrestricttomask.SetValue(True) # self.selectrestricttomask.SetToolTip(_('If checked, the selection will be restricted by the mask data')) # bSizer16.Add(self.selectrestricttomask, 0, wx.ALL, 5) # self.LaunchSelection = wx.Button(self.selection, wx.ID_ANY, # _("Action !"), wx.DefaultPosition, # wx.DefaultSize, 0) # self.LaunchSelection.SetBackgroundColour((0,128,64,255)) # self.LaunchSelection.SetDefault() # # self.LaunchSelection.SetForegroundColour((255,255,255,255)) # font = wx.Font(12, wx.FONTFAMILY_DECORATIVE, 0, 90, underline = False,faceName ="") # self.LaunchSelection.SetFont(font) # bSizer16.Add(self.LaunchSelection, 0, wx.EXPAND) # self.AllSelection = wx.Button(self.selection, wx.ID_ANY, # _("Select all nodes"), wx.DefaultPosition, # wx.DefaultSize, 0) # self.AllSelection.SetToolTip(_("Select all nodes in one click - store 'All' in the selection list")) # bSizer16.Add(self.AllSelection, 0, wx.EXPAND) # self.MoveSelection = wx.Button(self.selection, wx.ID_ANY, # _("Move current selection to..."), wx.DefaultPosition, # wx.DefaultSize, 0) # self.MoveSelection.SetToolTip(_("Store the current selection in an indexed list -- useful for some interpolation methods")) # bSizer16.Add(self.MoveSelection, 0, wx.EXPAND) # self.ResetSelection = wx.Button(self.selection, wx.ID_ANY, # _("Reset"), wx.DefaultPosition, # wx.DefaultSize, 0) # self.ResetSelection.SetToolTip(_("Reset the current selection list (keyboard shortcut r)")) # bSizer16.Add(self.ResetSelection, 0, wx.EXPAND) # self.ResetAllSelection = wx.Button(self.selection, wx.ID_ANY, # _("Reset All"), wx.DefaultPosition, # wx.DefaultSize, 0) # self.ResetAllSelection.SetToolTip(_("Reset the current selection list and the indexed lists (keyboard shortcut R)")) # bSizer16.Add(self.ResetAllSelection, 0, wx.EXPAND) # bSizer21.Add(bSizer16, 1, wx.EXPAND, 5) # bSizer17 = wx.BoxSizer(wx.VERTICAL) # self.m_button2 = wx.Button(self.selection, wx.ID_ANY, _("Manage vectors"), wx.DefaultPosition, wx.DefaultSize, # 0) # self.m_button2.SetToolTip(_("Open the vector manager attached to the array")) # bSizer17.Add(self.m_button2, 0, wx.EXPAND) # self.active_vector_id = wx.StaticText(self.selection, wx.ID_ANY, _("Active vector"), wx.DefaultPosition, # wx.DefaultSize, 0) # self.active_vector_id.Wrap(-1) # bSizer17.Add(self.active_vector_id, 0, wx.EXPAND) # self.CurActiveparent = wx.StaticText(self.selection, wx.ID_ANY, _("Active parent"), wx.DefaultPosition, # wx.DefaultSize, 0) # self.CurActiveparent.Wrap(-1) # bSizer17.Add(self.CurActiveparent, 0, wx.EXPAND) # self.loadvec = wx.Button(self.selection, wx.ID_ANY, _("Read from file..."), wx.DefaultPosition, wx.DefaultSize, # 0) # self.loadvec.SetToolTip(_("Load a vector file into the vector manager")) # bSizer17.Add(self.loadvec, 0, wx.EXPAND) # self.saveas = wx.Button(self.selection, wx.ID_ANY, _("Save as..."), wx.DefaultPosition, wx.DefaultSize, 0) # bSizer17.Add(self.saveas, 0, wx.EXPAND) # self.saveas.SetToolTip(_("Save the vector manager to a new vector file")) # self.save = wx.Button(self.selection, wx.ID_ANY, _("Save"), wx.DefaultPosition, wx.DefaultSize, 0) # self.save.SetToolTip(_("Save the vector manager to the kwnown vector file")) # bSizer17.Add(self.save, 0, wx.EXPAND) # bSizer21.Add(bSizer17, 1, wx.EXPAND, 5) # bSizer15.Add(bSizer21, 1, wx.EXPAND, 5) # bSizer22 = wx.BoxSizer(wx.HORIZONTAL) # self.nbselect = wx.StaticText(self.selection, wx.ID_ANY, _("nb"), wx.DefaultPosition, wx.DefaultSize, 0) # self.nbselect.Wrap(-1) # bSizer22.Add(self.nbselect, 1, wx.EXPAND, 10) # self.minx = wx.StaticText(self.selection, wx.ID_ANY, _("xmin"), wx.DefaultPosition, wx.DefaultSize, 0) # self.minx.Wrap(-1) # self.minx.SetToolTip(_("X Mininum")) # bSizer22.Add(self.minx, 1, wx.EXPAND, 10) # self.maxx = wx.StaticText(self.selection, wx.ID_ANY, _("xmax"), wx.DefaultPosition, wx.DefaultSize, 0) # self.maxx.Wrap(-1) # self.maxx.SetToolTip(_("X Maximum")) # bSizer22.Add(self.maxx, 1, wx.EXPAND, 10) # self.miny = wx.StaticText(self.selection, wx.ID_ANY, _("ymin"), wx.DefaultPosition, wx.DefaultSize, 0) # self.miny.Wrap(-1) # self.miny.SetToolTip(_("Y Minimum")) # bSizer22.Add(self.miny, 1, wx.EXPAND, 10) # self.maxy = wx.StaticText(self.selection, wx.ID_ANY, _("ymax"), wx.DefaultPosition, wx.DefaultSize, 0) # self.maxy.Wrap(-1) # self.maxy.SetToolTip(_("Y Maximum")) # bSizer22.Add(self.maxy, 1, wx.EXPAND, 10) # bSizer15.Add(bSizer22, 0, wx.EXPAND, 5) # self.selection.SetSizer(bSizer15) # self.selection.Layout() # bSizer15.Fit(self.selection) # Mask # ---- # sizermask = wx.BoxSizer(wx.VERTICAL) # self.mask.SetSizer(sizermask) # unmaskall = wx.Button(self.mask, wx.ID_ANY, _("Unmask all"), wx.DefaultPosition, wx.DefaultSize, 0) # sizermask.Add(unmaskall, 1, wx.EXPAND) # unmaskall.Bind(wx.EVT_BUTTON, self.Unmaskall) # unmaskall.SetToolTip(_("Unmask all values in the current array")) # unmasksel = wx.Button(self.mask, wx.ID_ANY, _("Unmask selection"), wx.DefaultPosition, wx.DefaultSize, 0) # sizermask.Add(unmasksel, 1, wx.EXPAND) # unmasksel.Bind(wx.EVT_BUTTON, self.Unmasksel) # unmasksel.SetToolTip(_("Unmask all values in the current selection \n If you wish to unmask some of the currently masked data, you have to first select the desired nodes by unchecking the 'Use mask to retrict' on the 'Selection' panel, otherwise it is impossible to select these nodes")) # invertmask = wx.Button(self.mask, wx.ID_ANY, _("Invert mask"), wx.DefaultPosition, wx.DefaultSize, 0) # sizermask.Add(invertmask, 1, wx.EXPAND) # invertmask.Bind(wx.EVT_BUTTON, self.InvertMask) # invertmask.SetToolTip(_("Logical operation on mask -- mask = ~mask")) # self.mask.Layout() # sizermask.Fit(self.mask) # Operations # sizeropgen = wx.BoxSizer(wx.VERTICAL) # sepopcond = wx.BoxSizer(wx.HORIZONTAL) # sizerop = wx.BoxSizer(wx.VERTICAL) # sizercond = wx.BoxSizer(wx.VERTICAL) # # bSizer26 = wx.BoxSizer( wx.VERTICAL ) # # bSizer14.Add( bSizer26, 1, wx.EXPAND, 5 ) # sepopcond.Add(sizercond, 1, wx.EXPAND) # sepopcond.Add(sizerop, 1, wx.EXPAND) # sizeropgen.Add(sepopcond, 1, wx.EXPAND) # operationChoices = [u"+", u"-", u"*", u"/", _("replace")] # self.choiceop = wx.RadioBox(self.operation, wx.ID_ANY, # _("Operator"), wx.DefaultPosition, # wx.DefaultSize, operationChoices, 1, wx.RA_SPECIFY_COLS) # self.choiceop.SetSelection(4) # sizerop.Add(self.choiceop, 1, wx.EXPAND) # self.opvalue = wx.TextCtrl(self.operation, wx.ID_ANY, u"1", # wx.DefaultPosition, wx.DefaultSize, style=wx.TE_CENTER) # sizerop.Add(self.opvalue, 0, wx.EXPAND) # self.opvalue.SetToolTip(_('Numeric value or "Null"')) # conditionChoices = [u"<", u"<=", u"=", u">=", u">", u"isNaN"] # self.condition = wx.RadioBox(self.operation, wx.ID_ANY, _("Condition"), wx.DefaultPosition, wx.DefaultSize, # conditionChoices, 1, wx.RA_SPECIFY_COLS) # self.condition.SetSelection(2) # sizercond.Add(self.condition, 1, wx.EXPAND) # self.condvalue = wx.TextCtrl(self.operation, wx.ID_ANY, u"0", # wx.DefaultPosition, wx.DefaultSize, style=wx.TE_CENTER) # sizercond.Add(self.condvalue, 0, wx.EXPAND) # self.ApplyOp = wx.Button(self.operation, wx.ID_ANY, _("Apply math operator (Condition and Operator)"), wx.DefaultPosition, # wx.DefaultSize, 0) # sizeropgen.Add(self.ApplyOp, 1, wx.EXPAND) # self.ApplyOp.SetToolTip(_("This action will use the condition AND the operator to manipulate the selected nodes")) # self.SelectOp = wx.Button(self.operation, wx.ID_ANY, _("Select nodes (only Condition)"), wx.DefaultPosition, # wx.DefaultSize, 0) # self.SelectOp.SetToolTip(_("This action will use the condition AND NOT the operator to select some nodes")) # sizeropgen.Add(self.SelectOp, 1, wx.EXPAND) # maskdata = wx.Button(self.operation, wx.ID_ANY, _("Mask nodes (only Condition )"), wx.DefaultPosition, wx.DefaultSize, 0) # maskdata.SetToolTip(_("This action will use the condition AND NOT the operator to mask some selected nodes \n If no node is selectd --> Nothing to do !!")) # sizeropgen.Add(maskdata, 1, wx.EXPAND) # maskdata.Bind(wx.EVT_BUTTON, self.Onmask) # self.operation.SetSizer(sizeropgen) # self.operation.Layout() # sizeropgen.Fit(self.operation) gensizer = wx.BoxSizer(wx.VERTICAL) gensizer.Add(self._notebook, 1, wx.EXPAND | wx.ALL) self.SetSizer(gensizer) self.Layout() self.Centre(wx.BOTH) # Connect Events # self.LaunchSelection.Bind(wx.EVT_BUTTON, self.OnLaunchSelect) # self.AllSelection.Bind(wx.EVT_BUTTON, self.OnAllSelect) # self.MoveSelection.Bind(wx.EVT_BUTTON, self.OnMoveSelect) # self.ResetSelection.Bind(wx.EVT_BUTTON, self.OnResetSelect) # self.ResetAllSelection.Bind(wx.EVT_BUTTON, self.OnResetAllSelect) # self.m_button2.Bind(wx.EVT_BUTTON, self.OnManageVectors) # self.loadvec.Bind(wx.EVT_BUTTON, self.OnLoadvec) # self.saveas.Bind(wx.EVT_BUTTON, self.OnSaveasvec) # self.save.Bind(wx.EVT_BUTTON, self.OnSavevec) # self.ApplyOp.Bind(wx.EVT_BUTTON, self.OnApplyOpMath) # self.SelectOp.Bind(wx.EVT_BUTTON, self.OnApplyOpSelect) self._Apply_Tools.Bind(wx.EVT_BUTTON, self.OnApplyTools) self._cm_apply.Bind(wx.EVT_BUTTON, self.Onupdatepal) self._cm_save.Bind(wx.EVT_BUTTON, self.Onsavepal) self._cm_load.Bind(wx.EVT_BUTTON, self.Onloadpal) self._cm_default_pal.Bind(wx.EVT_BUTTON, self.Onloaddefaultpal) self._cm_toimage.Bind(wx.EVT_BUTTON, self.Onpalimage) self._cm_distribute.Bind(wx.EVT_BUTTON, self.Onpaldistribute) self._cm_choose_color.Bind(wx.EVT_BUTTON, self.OnClickColorPal) self._histo_update.Bind(wx.EVT_BUTTON, self.OnClickHistoUpdate) self._histo_updatezoom.Bind(wx.EVT_BUTTON, self.OnClickHistoUpdate) self._histo_updateerase.Bind(wx.EVT_BUTTON, self.OnClickHistoUpdate)
[docs] def OnDefault_cmap(self, event:wx.MouseEvent): """ Load a default colormap """ itemlabel = event.GetEventObject().GetLabel() if itemlabel == self._default16.LabelText: self._parent.default16() elif itemlabel == self._defaultblue.LabelText: self._parent.defaultblue() elif itemlabel == self._default_grey.LabelText: self._parent.defaultgrey() self.update_palette()
[docs] def onclose(self, event: wx.CloseEvent): """ Hide the window instead of closing/detsroy it """ self.Hide()
[docs] def onshow(self, event: wx.ShowEvent): """ Update the GUI when the window is shown """ if self._parent.nullvalue == np.nan: self._txt_nullvalue.Value = 'nan' else : self._txt_nullvalue.Value = str(self._parent.epsilon) self.update_palette()
[docs] def OnApplyTools(self, event:wx.MouseEvent): """ Apply modifications in the textctrl """ neweps:str = self._txt_epsilon.Value newnull:str = self._txt_nullvalue.Value if neweps.lower() == 'nan': neweps = np.nan else: neweps = float(neweps) if newnull.lower() == 'nan': newnull = np.nan else: newnull = float(newnull) to_update = False if self._parent.epsilon != neweps: self._parent.epsilon = neweps to_update = True if self._parent.nullvalue != newnull: self._parent.nullvalue = newnull to_update = True if to_update: self._parent.read_oneresult(self._parent.current_result) self._parent.set_currentview()
[docs] def update_palette(self): """ Update the colormap grid and plot """ # Update the plot self._panel_colormap.add_ax() fig, ax = self._panel_colormap.get_fig_ax() self._parent.mypal.plot(fig, ax) fig.canvas.draw() # Fill the grid self._parent.mypal.fillgrid(self._cm_grid)
[docs] def Onsavepal(self, event:wx.MouseEvent): """ Save the colormap to a file """ self._parent.mypal.savefile()
[docs] def Onloadpal(self, event): """ Read the colormap from a file """ self._parent.mypal.readfile() self._parent.mypal.automatic = False self._cm_auto.SetValue(0) self._parent.set_currentview() self.update_palette()
[docs] def Onloaddefaultpal(self, event:wx.MouseEvent): """ Load default palette """ import glob # list of all .pal file in model directory dirpal = Path(__file__).parent / 'models' listpal = glob.glob(str(dirpal) + '/*.pal') if len(listpal) == 0: logging.info('No default palette found') return listpal = [Path(i).name for i in listpal] dlg = wx.SingleChoiceDialog(None, 'Choose the default palette', 'Default palette', listpal) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: dlg.Destroy() self._parent.mypal.readfile(str(dirpal / dlg.GetStringSelection())) self._parent.mypal.automatic = False self._cm_auto.SetValue(0) self.update_palette() self._parent.set_currentview()
[docs] def Onpalimage(self, event): """ Export the colormap to an image """ self._parent.mypal.export_image()
[docs] def Onpaldistribute(self, event): """ Distribute uniformly the colormap values """ self._parent.mypal.distribute_values() self._parent.mypal.automatic = False self._cm_auto.SetValue(0) self._parent.set_currentview() self.update_palette()
[docs] def Onupdatepal(self, event): """ Update the colormap from the grid """ dellists = False auto = self._cm_auto.IsChecked() uni = self._cm_uniform_or_lerp.IsChecked() oldalpha = self._parent.alpha if self._cm_alpha.IsChecked(): self._parent.alpha=1. else: self._parent.alpha = float(self._cm_slider_alpha.GetValue()) / 100. ret = self._parent.mypal.updatefromgrid(self._cm_grid) if self._parent.mypal.automatic != auto or self._parent.alpha != oldalpha or ret or auto != self._parent.mypal.automatic or uni != self._parent.mypal.interval_cst: self._parent.mypal.automatic = auto self._parent.mypal.interval_cst = uni self._parent.updatepalette(0) dellists = True shadehill = self._cm_hillshade.IsChecked() if not self._parent.shading and shadehill: self._parent.shading = True dellists = True if shadehill: azim = float(self._cm_slider_azimuth.GetValue()) alti = float(self._cm_slider_altitude.GetValue()) if self._parent.azimuthhill != azim: self._parent.azimuthhill = azim self._parent.shading = True if self._parent.altitudehill != alti: self._parent.altitudehill = alti self._parent.shading = True alpha = float(self._cm_slider_hillshade.GetValue()) / 100. if self._parent.shaded.alpha != alpha: self._parent.shaded.alpha = alpha self._parent.shading = True if dellists: self._parent.set_currentview()
[docs] def OnClickHistoUpdate(self, event: wx.MouseEvent): """ Click to update the histogram """ itemlabel = event.GetEventObject().GetLabel() fig, ax = self._panel_histo.get_fig_ax() if itemlabel == self._histo_updateerase.LabelText: ax.clear() fig.canvas.draw() return onzoom = [] if itemlabel == self._histo_updatezoom.LabelText: if self._mapviewer is not None: onzoom = [self._mapviewer.xmin, self._mapviewer.xmax, self._mapviewer.ymin, self._mapviewer.ymax] partarray = self._parent.get_working_array(onzoom).flatten(order='F') # .sort(axis=-1) ax: plt.Axes ax.hist(partarray, 200, density=True) fig.canvas.draw()
[docs] def OnClickColorPal(self, event: wx.MouseEvent): """ Click to open the color dialog box """ gridto = self._cm_grid k = gridto.GetGridCursorRow() r = int(gridto.GetCellValue(k, 1)) g = int(gridto.GetCellValue(k, 2)) b = int(gridto.GetCellValue(k, 3)) curcol = wx.ColourData() curcol.SetChooseFull(True) curcol.SetColour(wx.Colour(r, g, b)) dlg = wx.ColourDialog(None, curcol) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: dlg.Destroy() return curcol = dlg.GetColourData() dlg.Destroy() rgb = curcol.GetColour() k = gridto.GetGridCursorRow() gridto.SetCellValue(k, 1, str(rgb.red)) gridto.SetCellValue(k, 2, str(rgb.green)) gridto.SetCellValue(k, 3, str(rgb.blue))
# def interpolation2D(self, event: wx.MouseEvent): # self._parent.interpolation2D() # def Unmaskall(self, event: wx.MouseEvent): # """ # Enlève le masque des tous les éléments # @author Pierre Archambeau # """ # curarray: WolfArray # curarray = self._parent # curarray.mask_reset() # curarray = self._parent # curarray.updatepalette() # curarray.delete_lists() # def Unmasksel(self,event:wx.MouseEvent): # """ # Enlève le masque des éléments sélectionnés # @author Pierre Archambeau # """ # curarray: WolfArray # curarray = self._parent # if len(curarray.mngselection.myselection) == 0: # return # else: # destxy = curarray.mngselection.myselection # destij = np.asarray([list(curarray.get_ij_from_xy(x, y)) for x, y in destxy]) # curarray.array.mask[destij[:, 0], destij[:, 1]] = False # curarray.updatepalette() # curarray.delete_lists() # def InvertMask(self, event: wx.MouseEvent): # curarray: WolfArray # curarray = self._parent # curarray.mask_invert() # curarray = self._parent # curarray.updatepalette() # curarray.delete_lists() # def interp2Dpolygons(self, event: wx.MouseEvent): # """ # Bouton d'interpolation sous tous les polygones d'une zone # cf _interp2Dpolygon # """ # choices = ["nearest", "linear", "cubic"] # dlg = wx.SingleChoiceDialog(None, "Pick an interpolate method", "Choices", choices) # ret = dlg.ShowModal() # if ret == wx.ID_CANCEL: # dlg.Destroy() # return # method = dlg.GetStringSelection() # dlg.Destroy() # actzone = self.active_zone # curarray: WolfArray # curarray = self._parent # for curvec in actzone.myvectors: # curvec: vector # self._interp2Dpolygon(curvec, method) # curarray.updatepalette() # curarray.delete_lists() # def interp2Dpolygon(self, event: wx.MouseEvent): # """ # Bouton d'interpolation sous un polygone # cf _interp2Dpolygon # """ # choices = ["nearest", "linear", "cubic"] # dlg = wx.SingleChoiceDialog(None, "Pick an interpolate method", "Choices", choices) # ret = dlg.ShowModal() # if ret == wx.ID_CANCEL: # dlg.Destroy() # return # method = dlg.GetStringSelection() # dlg.Destroy() # actzone = self.active_zone # curarray: WolfArray # curarray = self._parent # self._interp2Dpolygon(self.active_vector, method) # curarray.updatepalette() # curarray.delete_lists() # def _interp2Dpolygon(self, vect: vector, method): # """ # Interpolation sous un polygone # L'interpolation a lieu : # - uniquement dans les mailles sélectionnées si elles existent # - dans les mailles contenues dans le polygone sinon # On utilise ensuite "griddata" pour interpoler les altitudes des mailles # depuis les vertices 3D du polygone # """ # curarray: WolfArray # curarray = self._parent # if len(curarray.mngselection.myselection) == 0: # destxy = curarray.get_xy_inside_polygon(vect) # else: # destxy = curarray.mngselection.myselection # if len(destxy)==0: # return # destij = np.asarray([list(curarray.get_ij_from_xy(x, y)) for x, y in destxy]) # xyz = vect.asnparray3d() # newvalues = griddata(xyz[:, :2], xyz[:, 2], destxy, method=method, fill_value=-99999.) # locmask = np.where(newvalues != -99999.) # curarray.array.data[destij[locmask][:, 0], destij[locmask][:, 1]] = newvalues[locmask] # def interp2Dpolylines(self, event: wx.MouseEvent): # """ # Bouton d'interpolation sous toutes les polylignes de la zone # cf _interp2Dpolyline # """ # actzone = self.active_zone # curarray: WolfArray # curarray = self._parent # for curvec in actzone.myvectors: # curvec: vector # self._interp2Dpolyline(curvec) # curarray.updatepalette() # curarray.delete_lists() # def interp2Dpolyline(self, event: wx.MouseEvent): # """ # Bouton d'interpolation sous la polyligne active # cf _interp2Dpolyline # """ # curarray: WolfArray # curarray = self._parent # self._interp2Dpolyline(self.active_vector) # curarray.updatepalette() # curarray.delete_lists() # def _interp2Dpolyline(self, vect: vector, usemask=True): # """ # Interpolation sous une polyligne # L'interpolation a lieu : # - uniquement dans les mailles sélectionnées si elles existent # - dans les mailles sous la polyligne sinon # On utilise ensuite "interpolate" de shapely pour interpoler les altitudes des mailles # depuis les vertices 3D de la polyligne # """ # curarray: WolfArray # curarray = self._parent # vecls = vect.asshapely_ls() # if len(curarray.mngselection.myselection) == 0: # allij = curarray.get_ij_under_polyline(vect, usemask) # allxy = [curarray.get_xy_from_ij(cur[0], cur[1]) for cur in allij] # else: # allxy = curarray.mngselection.myselection # allij = np.asarray([curarray.get_ij_from_xy(x,y) for x,y in allxy]) # newz = np.asarray([vecls.interpolate(vecls.project(Point(x, y))).z for x, y in allxy]) # curarray.array.data[allij[:, 0], allij[:, 1]] = newz # def volumesurface(self, event): # """ # Click on evaluation of stage-storage-surface relation # """ # self._volumesurface() # def _volumesurface(self, show=True): # """ # Evaluation of stage-storage-surface relation # """ # if self._mapviewer is not None: # if self._mapviewer.linked: # array1 = self._mapviewer.linkedList[0].active_array # array2 = self._mapviewer.linkedList[1].active_array # # transfert des mailles sélectionnées dans l'autre matrice # if array1 is self._parent: # array2.mngselection.myselection = array1.mngselection.myselection.copy() # if array2 is self._parent: # array1.mngselection.myselection = array2.mngselection.myselection.copy() # if len(self._parent.mngselection.myselection) == 0 or self._parent.mngselection.myselection == 'all': # myarray = array1 # axs = myarray.volume_estimation() # myarray = array2 # axs = myarray.volume_estimation(axs) # else: # myarray = array1.mngselection.get_newarray() # axs = myarray.volume_estimation() # myarray = array2.mngselection.get_newarray() # axs = myarray.volume_estimation(axs) # else: # if len(self._parent.mngselection.myselection) == 0 or self._parent.mngselection.myselection == 'all': # myarray = self._parent # else: # myarray = self._parent.mngselection.get_newarray() # myarray.volume_estimation() # else: # if len(self._parent.mngselection.myselection) == 0 or self._parent.mngselection.myselection == 'all': # myarray = self._parent # else: # myarray = self._parent.mngselection.get_newarray() # myarray.volume_estimation() # if show: # plt.show() # def OnAllSelect(self, event): # """ # Select all --> just put "all" in "myselection" # """ # self._parent.mngselection.myselection = 'all' # self._parent.myops.nbselect.SetLabelText('All') # def OnMoveSelect(self, event): # """Transfert de la sélection courante dans un dictionnaire""" # dlg = wx.TextEntryDialog(self, 'Choose id', 'id?') # ret = dlg.ShowModal() # idtxt = dlg.GetValue() # dlg = wx.ColourDialog(self) # ret = dlg.ShowModal() # color = dlg.GetColourData() # self._parent.mngselection.move_selectionto(idtxt, color.GetColour()) # def reset_selection(self): # """ # Reset of current selection # """ # self._parent.mngselection.myselection = [] # self.nbselect.SetLabelText('0') # self.minx.SetLabelText('0') # self.miny.SetLabelText('0') # self.maxx.SetLabelText('0') # self.maxy.SetLabelText('0') # def reset_all_selection(self): # """ # Reset of current selection and stored ones # """ # self.reset_selection() # self._parent.mngselection.selections = {} # def OnResetSelect(self, event): # """ # Click on Reset of current selection # """ # self.reset_selection() # def OnResetAllSelect(self, event): # """ # Click on reset all # """ # self.reset_all_selection() # def OnApplyOpSelect(self, event): # curcond = self.condition.GetSelection() # curcondvalue = float(self.condvalue.GetValue()) # self._parent.mngselection.condition_select(curcond, curcondvalue) # def OnApplyOpMath(self, event): # curop = self.choiceop.GetSelection() # curcond = self.condition.GetSelection() # opval = self.opvalue.GetValue() # if opval.lower() == 'null' or opval.lower() == 'nan': # curopvalue = self._parent.nullvalue # else: # curopvalue = float(opval) # curcondvalue = float(self.condvalue.GetValue()) # self._parent.mngselection.treat_select(curop, curcond, curopvalue, curcondvalue) # def Onmask(self, event): # curop = self.choiceop.GetSelection() # curcond = self.condition.GetSelection() # curopvalue = float(self.opvalue.GetValue()) # curcondvalue = float(self.condvalue.GetValue()) # self._parent.mngselection.mask_condition(curop, curcond, curopvalue, curcondvalue) # self._parent.reset_plot() # pass # def OnManageVectors(self, event): # if self._mapviewer is not None: # if self._mapviewer.linked: # if self._mapviewer.link_shareopsvect: # if self.myzones.parent in self._mapviewer.linkedList: # self.myzones.showstructure() # return # self.myzones.showstructure() # def OnLoadvec(self, event): # dlg = wx.FileDialog(None, 'Select file', # wildcard='Vec file (*.vec)|*.vec|Vecz file (*.vecz)|*.vecz|Dxf file (*.dxf)|*.dxf|All (*.*)|*.*', style=wx.FD_OPEN) # ret = dlg.ShowModal() # if ret == wx.ID_CANCEL: # dlg.Destroy() # return # self.fnsave = dlg.GetPath() # dlg.Destroy() # self.myzones = Zones(self.fnsave, parent=self) # if self._mapviewer is not None: # if self._mapviewer.linked: # if not self._mapviewer.linkedList is None: # for curFrame in self._mapviewer.linkedList: # if curFrame.link_shareopsvect: # curFrame.active_array.myops.myzones = self.myzones # curFrame.active_array.myops.fnsave = self.fnsave # def OnSaveasvec(self, event): # dlg = wx.FileDialog(None, 'Select file', wildcard='Vec file (*.vec)|*.vec|Vecz file (*.vecz)|*.vecz|All (*.*)|*.*', style=wx.FD_SAVE) # ret = dlg.ShowModal() # if ret == wx.ID_CANCEL: # dlg.Destroy() # return # self.fnsave = dlg.GetPath() # dlg.Destroy() # self.myzones.saveas(self.fnsave) # if self._mapviewer is not None: # if self._mapviewer.linked: # if not self._mapviewer.linkedList is None: # for curFrame in self._mapviewer.linkedList: # if curFrame.link_shareopsvect: # curFrame.active_array.myops.fnsave = self.fnsave # def OnSavevec(self, event): # if self.fnsave == '': # return # self.myzones.saveas(self.fnsave)
[docs] def select_node_by_node(self): if self._mapviewer is not None: self._mapviewer.action = 'select node by node - results' self._mapviewer.active_res2d = self._parent
# def select_zone_inside_manager(self): # if self.active_zone is None: # wx.MessageBox('Please select an active zone !') # return # for curvec in self.active_zone.myvectors: # self._select_vector_inside_manager(curvec) # def select_vector_inside_manager(self): # if self.active_vector is None: # wx.MessageBox('Please select an active vector !') # return # self._select_vector_inside_manager(self.active_vector) # def _select_vector_inside_manager(self, vect: vector): # if len(vect.myvertices) > 2: # self._parent.mngselection.select_insidepoly(vect) # elif self._mapviewer is not None: # if len(vect.myvertices) < 3: # wx.MessageBox('Please add points to vector by clicks !') # self._mapviewer.action = 'select by vector inside' # self._mapviewer.active_array = self._parent # self.Active_vector(vect) # firstvert = wolfvertex(0., 0.) # self.vectmp.add_vertex(firstvert) # def select_zone_under_manager(self): # if self.active_zone is None: # wx.MessageBox('Please select an active zone !') # return # for curvec in self.active_zone.myvectors: # self._select_vector_under_manager(curvec) # def select_vector_under_manager(self): # if self.active_vector is None: # wx.MessageBox('Please select an active vector !') # return # self._select_vector_under_manager(self.active_vector) # def _select_vector_under_manager(self, vect: vector): # if len(vect.myvertices) > 1: # self._parent.mngselection.select_underpoly(vect) # elif self._mapviewer is not None: # if len(vect.myvertices) < 2: # wx.MessageBox('Please add points to vector by clicks !') # self._mapviewer.action = 'select by vector under' # self._mapviewer.active_array = self._parent # self.Active_vector(vect) # firstvert = wolfvertex(0., 0.) # self.vectmp.add_vertex(firstvert) # def select_vector_inside_tmp(self): # if self._mapviewer is not None: # self._mapviewer.action = 'select by tmp vector inside' # self.vectmp.reset() # self.Active_vector(self.vectmp) # self._mapviewer.active_array = self._parent # firstvert = wolfvertex(0., 0.) # self.vectmp.add_vertex(firstvert) # def select_vector_under_tmp(self): # if self._mapviewer is not None: # self._mapviewer.action = 'select by tmp vector under' # self.vectmp.reset() # self.Active_vector(self.vectmp) # self._mapviewer.active_array = self._parent # firstvert = wolfvertex(0., 0.) # self.vectmp.add_vertex(firstvert) # def OnLaunchSelect(self, event): # id = self.selectmethod.GetSelection() # if id == 0: # logging.info(_('Node selection by individual clicks')) # logging.info(_('')) # logging.info(_(' Clicks on the desired nodes...')) # logging.info(_('')) # self.select_node_by_node() # elif id == 1: # logging.info(_('Node selection inside active vector (manager)')) # self.select_vector_inside_manager() # elif id == 2: # logging.info(_('Node selection inside active zone (manager)')) # self.select_zone_inside_manager() # elif id == 3: # logging.info(_('Node selection inside temporary vector')) # logging.info(_('')) # logging.info(_(' Choose vector by clicks...')) # logging.info(_('')) # self.select_vector_inside_tmp() # elif id == 4: # logging.info(_('Node selection under active vector (manager)')) # self.select_vector_under_manager() # elif id == 5: # logging.info(_('Node selection under active zone (manager)')) # self.select_zone_under_manager() # elif id == 6: # logging.info(_('Node selection under temporary vector')) # logging.info(_('')) # logging.info(_(' Choose vector by clicks...')) # logging.info(_('')) # self.select_vector_under_tmp() # def Active_vector(self, vect: vector, copyall=True): # if vect is None: # return # self.active_vector = vect # self.active_vector_id.SetLabelText(vect.myname) # if vect.parentzone is not None: # self.active_zone = vect.parentzone # if self._mapviewer is not None and copyall: # self._mapviewer.Active_vector(vect) # def Active_zone(self, zone): # self.active_zone = zone # if self._mapviewer is not None: # self._mapviewer.Active_zone(zone)
[docs] class views_2D(Enum):
[docs] WATERDEPTH = _('Water depth [m]')
[docs] WATERLEVEL = _('Water level [m]')
[docs] TOPOGRAPHY = _('Bottom level [m]')
[docs] QX = _('Discharge X [m2s-1]')
[docs] QY = _('Discharge Y [m2s-1]')
[docs] QNORM = _('Discharge norm [m2s-1]')
[docs] UX =_('Velocity X [ms-1]')
[docs] UY = _('Velocity Y [ms-1]')
[docs] UNORM = _('Velocity norm [ms-1]')
[docs] HEAD = _('Head [m]')
[docs] FROUDE = _('Froude [-]')
[docs] KINETIC_ENERGY = _('Kinetic energy k')
[docs] EPSILON = _('Rate of dissipation e')
[docs] TURB_VISC_2D = _('Turbulent viscosity 2D')
[docs] TURB_VISC_3D = _('Turbulent viscosity 3D')
[docs] VECTOR_FIELD_Q = _('Discharge vector field')
[docs] VECTOR_FIELD_U = _('Velocity vector field')
[docs] U_SHEAR = _('Shear velocity [ms-1]')
[docs] SHIELDS_NUMBER = _('Shields number - Manning-Strickler')
[docs] CRITICAL_DIAMETER_SHIELDS = _('Critical grain diameter - Shields')
[docs] CRITICAL_DIAMETER_IZBACH = _('Critical grain diameter - Izbach')
[docs] CRITICAL_DIAMETER_SUSPENSION_50 = _('Critical grain diameter - Suspension 50%')
[docs] CRITICAL_DIAMETER_SUSPENSION_100 = _('Critical grain diameter - Suspension 100%')
[docs] QNORM_FIELD = _('Q norm + Field')
[docs] UNORM_FIELD = _('U norm + Field')
[docs] WL_Q = _('WL + Q')
[docs] WD_Q = _('WD + Q')
[docs] WL_U = _('WL + U')
[docs] WD_U = _('WD + U')
[docs] T_WL_Q = _('Top + WL + Q')
[docs] T_WD_Q = _('Top + WD + Q')
[docs] T_WD_U = _('Top + WD + U')
@classmethod
[docs] def get_view(cls, value): for cur in views_2D: if cur.value == value: return cur return None
def __str__(self): return self.value
[docs] VIEWS_SEDIMENTARY = [views_2D.SHIELDS_NUMBER, views_2D.CRITICAL_DIAMETER_SHIELDS, views_2D.CRITICAL_DIAMETER_IZBACH, views_2D.CRITICAL_DIAMETER_SUSPENSION_50, views_2D.CRITICAL_DIAMETER_SUSPENSION_100]
[docs] VIEWS_COMPLEX = [views_2D.T_WD_Q, views_2D.T_WD_U, views_2D.T_WL_Q, views_2D.WL_Q, views_2D.WD_Q, views_2D.WL_U, views_2D.WD_U, views_2D.QNORM_FIELD, views_2D.UNORM_FIELD]
[docs] VIEWS_VECTOR_FIELD = [views_2D.VECTOR_FIELD_Q, views_2D.VECTOR_FIELD_U]
[docs] class OneWolfResult: """ Stockage des résultats d'un bloc de modèle WOLF2D """ def __init__(self, idx:int=0, parent = None): self.parent = parent self.wx_exists = wx.GetApp() is not None self.blockindex = idx # index du bloc en numérotation Python --> penser à ajouter 1 si retour type VB6/Fortran self.idx = getkeyblock(idx) self.linkedvec = None self.epsilon = 0. self.waterdepth = WolfArray() self.top = WolfArray() self.qx = WolfArray() self.qy = WolfArray() self.rough_n = WolfArray() self.set_current(views_2D.WATERDEPTH) self.k = WolfArray() self.eps = WolfArray() self.ShieldsNumber = None self.critShields = None self.critIzbach = None self.critSusp50 = None self.critSusp100 = None self._vec_field = None self._min_field_size = .1 self._sedimentdiam = 1e-3 self._sedimentdensity = 2.65 self._force_update_shields = True # Force la MAJ du Shields si le diametre ou la densité change # self.mngselection = SelectionData(self.current) @property
[docs] def SelectionData(self) -> SelectionDataMB: """ Return the data of the selection """ return self.mngselection
@property
[docs] def sediment_diameter(self): """ Diamètre des particules de sédiment [m] """ return self._sedimentdiam
@sediment_diameter.setter def sediment_diameter(self, value:float): """ Diamètre des particules de sédiment [m] """ self._force_update_shields = self._sedimentdiam != value self._sedimentdiam = value # forcer la MAJ si nécessaire if self._which_current == views_2D.SHIELDS_NUMBER: self.set_current(views_2D.SHIELDS_NUMBER) @property
[docs] def sediment_density(self): """ Densité des particules de sédiment [-] """ return self._sedimentdensity
@sediment_density.setter def sediment_density(self, value:float): """ Densité des particules de sédiment [-] """ self._force_update_shields = self._sedimentdensity != value self._sedimentdensity = value # forcer la MAJ si nécessaire if self._which_current == views_2D.SHIELDS_NUMBER: self.set_current(views_2D.SHIELDS_NUMBER) @property
[docs] def current(self): """ Vue courante """ return self._current
[docs] def set_linkedvec(self, link:vector): """ Set the linked vecteor -- used for masking outside the vector """ self.linkedvec = link
[docs] def set_epsilon(self, eps:float): """ Définit la valeur de l'epsilon pour le masquage des données. Toute valeur de waterdepth inférieure à eps sera masquée. see "filter_inundation" :param eps: valeur de l'epsilon """ self.epsilon = eps
[docs] def filter_inundation(self): """ Apply filter on array : - mask data below eps - mask data outisde linkedvec """ mask = self.waterdepth.array < self.epsilon self._current.filter_inundation(mask = mask)
[docs] def filter_independent_zones(self, n_largest:int = 1): """ Apply filter on array : mask data outside n_largest independant zones """ self.waterdepth.filter_independent_zones(n_largest = n_largest) self._current.array.mask[:,:] = self.waterdepth.array.mask[:,:]
[docs] def set_current(self, which:views_2D): """ Définition de la vue courante :param which: vue courante (voir enum views_2D) """ self._which_current = which if which==views_2D.WATERDEPTH: self._current=self.waterdepth nullvalue = self.waterdepth.nullvalue elif which==views_2D.TOPOGRAPHY: self._current=self.top nullvalue = self.top.nullvalue elif which==views_2D.QX: self._current=self.qx nullvalue = self.qx.nullvalue elif which==views_2D.QY: self._current=self.qy nullvalue = self.qy.nullvalue elif which==views_2D.QNORM: self._current=(self.qx**2.+self.qy**2.)**.5 nullvalue = self.qx.nullvalue elif which==views_2D.UNORM: self._current=(self.qx**2.+self.qy**2.)**.5/self.waterdepth nullvalue = self.qx.nullvalue elif which==views_2D.UX: self._current=self.qx/self.waterdepth nullvalue = self.qx.nullvalue elif which==views_2D.UY: self._current=self.qy/self.waterdepth nullvalue = self.qy.nullvalue elif which==views_2D.WATERLEVEL: self._current=self.waterdepth+self.top nullvalue = self.waterdepth.nullvalue elif which==views_2D.FROUDE: self._current=(self.qx**2.+self.qy**2.)**.5/self.waterdepth/(self.waterdepth*9.81)**.5 nullvalue = self.qx.nullvalue elif which==views_2D.HEAD: self._current=(self.qx**2.+self.qy**2.)**.5/self.waterdepth/(2.*9.81)+self.waterdepth+self.top nullvalue = self.qx.nullvalue elif which==views_2D.KINETIC_ENERGY: self._current=self.k nullvalue = self.k.nullvalue elif which==views_2D.EPSILON: self._current=self.eps nullvalue = self.eps.nullvalue elif which==views_2D.VECTOR_FIELD_Q: self._current=(self.qx**2.+self.qy**2.)**.5 nullvalue = self.qx.nullvalue self._vec_field = VectorField(self.qx.array, self.qy.array, self.qx.get_bounds(), self.qx.dx, self.qy.dy, minsize=self._min_field_size, mapviewer=self.parent.mapviewer) elif which==views_2D.VECTOR_FIELD_U: ux = self.qx/self.waterdepth uy = self.qy/self.waterdepth self._current=(ux**2.+uy**2.)**.5 self._vec_field = VectorField(ux.array, uy.array, ux.get_bounds(), ux.dx, ux.dy, minsize=self._min_field_size, mapviewer=self.parent.mapviewer) nullvalue = self.qx.nullvalue elif which ==views_2D.QNORM_FIELD: self._current = (self.qx**2.+self.qy**2.)**.5 self._view = WolfViews() self._vec_field = VectorField(self.qx.array, self.qy.array, self.qx.get_bounds(), self.qx.dx, self.qy.dy, minsize=self._min_field_size, mapviewer=self.parent.mapviewer) self._view.add_elemts([self._current, self._vec_field]) nullvalue = self.qx.nullvalue elif which ==views_2D.UNORM_FIELD: self._current = (self.qx**2.+self.qy**2.)**.5/self.waterdepth self._view = WolfViews() ux = self.qx/self.waterdepth uy = self.qy/self.waterdepth self._vec_field = VectorField(ux.array, uy.array, self.qx.get_bounds(), self.qx.dx, self.qy.dy, minsize=self._min_field_size, mapviewer=self.parent.mapviewer) self._view.add_elemts([self._current, self._vec_field]) nullvalue = self.qx.nullvalue elif which ==views_2D.WL_U: self._current = self.waterdepth+self.top self._view = WolfViews() ux = self.qx/self.waterdepth uy = self.qy/self.waterdepth self._vec_field = VectorField(ux.array, uy.array, self.qx.get_bounds(), self.qx.dx, self.qy.dy, minsize=self._min_field_size, mapviewer=self.parent.mapviewer) self._view.add_elemts([self._current, self._vec_field]) nullvalue = self.waterdepth.nullvalue elif which ==views_2D.WL_Q: self._current = self.waterdepth+self.top self._view = WolfViews() self._vec_field = VectorField(self.qx.array, self.qy.array, self.qx.get_bounds(), self.qx.dx, self.qy.dy, minsize=self._min_field_size, mapviewer=self.parent.mapviewer) self._view.add_elemts([self._current, self._vec_field]) nullvalue = self.waterdepth.nullvalue elif which ==views_2D.WD_Q: self._current = self.waterdepth self._view = WolfViews() self._vec_field =VectorField(self.qx.array, self.qy.array, self.qx.get_bounds(), self.qx.dx, self.qy.dy, minsize=self._min_field_size, mapviewer=self.parent.mapviewer) self._view.add_elemts([self._current, self._vec_field]) nullvalue = self.waterdepth.nullvalue elif which ==views_2D.WD_U: self._current = self.waterdepth self._view = WolfViews() ux = self.qx/self.waterdepth uy = self.qy/self.waterdepth self._vec_field =VectorField(ux.array, uy.array, self.qx.get_bounds(), self.qx.dx, self.qy.dy, minsize=self._min_field_size, mapviewer=self.parent.mapviewer) self._view.add_elemts([self._current, self._vec_field]) nullvalue = self.waterdepth.nullvalue elif which ==views_2D.T_WL_Q: self._current = self.waterdepth+self.top self._view = WolfViews() self._vec_field =VectorField(self.qx.array, self.qy.array, self.qx.get_bounds(), self.qx.dx, self.qy.dy, minsize=self._min_field_size, mapviewer=self.parent.mapviewer) self._view.add_elemts([self.top, self._current, self._vec_field]) nullvalue = self.waterdepth.nullvalue elif which ==views_2D.T_WD_Q: self._current = self.waterdepth self.waterdepth.mypal.defaultblue_minmax(self.waterdepth.array) self._view = WolfViews() self._vec_field =VectorField(self.qx.array, self.qy.array, self.qx.get_bounds(), self.qx.dx, self.qy.dy, minsize=self._min_field_size, mapviewer=self.parent.mapviewer) self._view.add_elemts([self.top, self._current, self._vec_field]) nullvalue = self.waterdepth.nullvalue elif which ==views_2D.T_WD_U: self._current = self.waterdepth self.waterdepth.mypal.defaultblue_minmax(self.waterdepth.array) self._view = WolfViews() ux = self.qx/self.waterdepth uy = self.qy/self.waterdepth self._vec_field =VectorField(ux.array, uy.array, self.qx.get_bounds(), self.qx.dx, self.qy.dy, minsize=self._min_field_size, mapviewer=self.parent.mapviewer) self._view.add_elemts([self.top, self._current, self._vec_field ]) nullvalue = self.waterdepth.nullvalue elif which==views_2D.U_SHEAR: self.U_Shear = self.get_u_shear() self._current = self.U_Shear self._view = WolfViews() nullvalue = self.waterdepth.nullvalue elif which==views_2D.SHIELDS_NUMBER: if self.ShieldsNumber is None or self._force_update_shields: self.ShieldsNumber = self.get_shieldsnumber() self._force_update_shields = False self._current = self.ShieldsNumber nullvalue = self.waterdepth.nullvalue elif which==views_2D.CRITICAL_DIAMETER_SHIELDS: if self.critShields is None: self.critShields = self.get_critdiam(0) self._current = self.critShields nullvalue = self.waterdepth.nullvalue elif which==views_2D.CRITICAL_DIAMETER_IZBACH: if self.critIzbach is None: self.critIzbach = self.get_critdiam(1) self._current = self.critIzbach nullvalue = self.waterdepth.nullvalue elif which==views_2D.CRITICAL_DIAMETER_SUSPENSION_50: if self.critSusp50 is None: self.critSusp50 = self.get_critsusp(50) self._current = self.critSusp50 nullvalue = self.waterdepth.nullvalue elif which==views_2D.CRITICAL_DIAMETER_SUSPENSION_100: if self.critSusp100 is None: self.critSusp100 = self.get_critsusp(100) self._current = self.critSusp100 nullvalue = self.waterdepth.nullvalue self._current.linkedvec = self.linkedvec self._current.idx = self.idx self._current.nullvalue = nullvalue self.mngselection = SelectionData(self._current)
[docs] def set_opacity(self, alpha:float): """ Set the transparency of the array """ if alpha <0.: alpha = 0. if alpha > 1.: alpha = 1. self.alpha = alpha return self.alpha
@property
[docs] def min_field_size(self): return self._min_field_size
@min_field_size.setter def min_field_size(self, value): self._min_field_size = value
[docs] def get_norm_max(self): if self._which_current == views_2D.VECTOR_FIELD_Q: return (self._vec_field.min_size, self._vec_field.max_norm, np.max(self._current.array)) elif self._which_current == views_2D.VECTOR_FIELD_U: return (self._vec_field.min_size, self._vec_field.max_norm, np.max(self._current.array)) elif self._which_current == views_2D.QNORM_FIELD: return (self._vec_field.min_size, self._vec_field.max_norm, np.max(self._current.array)) elif self._which_current == views_2D.UNORM_FIELD: return (self._vec_field.min_size, self._vec_field.max_norm, np.max(self._current.array)) elif self._which_current == views_2D.WL_U: return (self._vec_field.min_size, self._vec_field.max_norm, np.max(((self.qx**2.+self.qy**2.)**.5/self.waterdepth).array)) elif self._which_current == views_2D.WL_Q: return (self._vec_field.min_size, self._vec_field.max_norm, np.max(((self.qx**2.+self.qy**2.)**.5).array)) elif self._which_current == views_2D.T_WL_Q: return (self._vec_field.min_size, self._vec_field.max_norm, np.max(((self.qx**2.+self.qy**2.)**.5).array)) elif self._which_current == views_2D.T_WD_Q: return (self._vec_field.min_size, self._vec_field.max_norm, np.max(((self.qx**2.+self.qy**2.)**.5).array)) elif self._which_current == views_2D.T_WD_U: return (self._vec_field.min_size, self._vec_field.max_norm, np.max(((self.qx**2.+self.qy**2.)**.5/self.waterdepth).array)) return (0., 1.,1.)
[docs] def update_zoom_2(self, newzoom): if self._vec_field is not None: self._vec_field.update_zoom_2(newzoom)
[docs] def update_zoom_vectorfield(self,factor): if self._vec_field is not None: self._vec_field.update_zoom_factor(factor)
[docs] def update_arrowpixelsize_vectorfield(self,factor): if self._vec_field is not None: self._vec_field.arrow_pixel_size += factor self._vec_field.arrow_pixel_size = max(1,self._vec_field.arrow_pixel_size)
[docs] def update_pal(self, curpal:wolfpalette, graypal=None, bluepal=None): """ Mise à jour de la palette :param curpal: palette courante :param graypal: palette grise :param bluepal: palette bleue """ which = self._which_current self._current.mypal = curpal self._current.mypal.interval_cst = curpal.interval_cst if VERSION_RGB==1 : self._current.rgb = curpal.get_rgba(self._current.array) self._current.rgb[self._current.array.mask] = [1., 1., 1., 1.] if which in [views_2D.WD_Q, views_2D.WD_U]: self._view.pals = [curpal, None] elif which in [views_2D.WL_U, views_2D.WL_Q]: self._view.pals = [curpal, None] elif which == views_2D.T_WL_Q: self._view.pals = [graypal, curpal, None] elif which == views_2D.T_WL_Q: self._view.pals = [graypal, curpal, None] elif which == views_2D.T_WD_Q: self._view.pals = [graypal, bluepal, None] elif which == views_2D.T_WD_U: self._view.pals = [graypal, bluepal, None]
[docs] def get_critdiam(self, which:int) -> WolfArray: """ Calcul du dimètre critique :param which : 0 = Shields ; 1 = Izbach """ def compute(which) -> WolfArray: ij = np.argwhere(self.waterdepth.array>0.) diamcrit = WolfArray(mold=self.waterdepth) qnorm = (self.qx**2.+self.qy**2.)**.5 qnorm.array.mask=self.waterdepth.array.mask if which==0: diam = np.asarray([get_d_cr(qnorm.array[i,j],self.waterdepth.array[i,j],1./self.rough_n.array[i,j])[which] for i,j in ij]) else: diam = np.asarray([izbach_d_cr(qnorm.array[i,j],self.waterdepth.array[i,j]) for i,j in ij]) diamcrit.array[ij[:,0],ij[:,1]] = diam return diamcrit logging.info(_('Computing critical diameters')) diamcrit = compute(which) logging.info(_('End of computing critical diameters')) return diamcrit
[docs] def get_u_shear(self) -> WolfArray: """ Calcul de la vitesse de cisaillement """ def compute() -> WolfArray: ij = np.argwhere(self.waterdepth.array>0.) u_shear = WolfArray(mold=self.waterdepth) qnorm = (self.qx**2.+self.qy**2.)**.5 qnorm.array.mask=self.waterdepth.array.mask _u_shear = np.asarray([get_shear_velocity_2D_Manning(qnorm.array[i,j], self.waterdepth.array[i,j], self.rough_n.array.data[i,j]) for i,j in ij]) u_shear.array[ij[:,0],ij[:,1]] = _u_shear return u_shear logging.info(_('Computing shear velocity')) u_shear = compute() logging.info(_('End of computing shear velocity')) return u_shear
[docs] def get_shieldsnumber(self) -> WolfArray: """ Calcul du nombre de Shields """ def compute() -> WolfArray: ij = np.argwhere(self.waterdepth.array>0.) shields = WolfArray(mold=self.waterdepth) qnorm = (self.qx**2.+self.qy**2.)**.5 qnorm.array.mask=self.waterdepth.array.mask _shields = np.asarray([get_Shields_2D_Manning(self.sediment_density, self.sediment_diameter, qnorm.array[i,j], self.waterdepth.array[i,j], self.rough_n.array.data[i,j]) for i,j in ij]) shields.array[ij[:,0],ij[:,1]] = _shields return shields logging.info(_('Computing critical diameters')) shields = compute() logging.info(_('End of computing critical diameters')) return shields
[docs] def get_critsusp(self, which:int = 50): logging.info(_('Computing critical diameters')) ij = np.argwhere(self.waterdepth.array>0.) diamcrit = WolfArray(mold=self.waterdepth) qnorm = (self.qx**2.+self.qy**2.)**.5 qnorm.array.mask=self.waterdepth.array.mask diam = np.asarray([get_d_cr_susp(qnorm.array[i,j],self.waterdepth.array[i,j],1./self.rough_n.array[i,j],which=which) for i,j in ij]) diamcrit.array[ij[:,0],ij[:,1]] = diam
[docs] def plot(self, sx=None, sy=None, xmin=None, ymin=None, xmax=None, ymax=None): """ Affichage des résultats """ if self._which_current in VIEWS_VECTOR_FIELD: self._vec_field.plot(sx, sy,xmin,ymin,xmax,ymax) elif self._which_current in VIEWS_COMPLEX: self._view.plot(sx, sy,xmin,ymin,xmax,ymax) else: self._current.plot(sx, sy,xmin,ymin,xmax,ymax)
[docs] def get_values_labels(self, i:int, j:int): """ Récupération des valeurs et labels pour affichage """ which = self._which_current mylab = [which.value] myval = [self._current.array[i,j]] if which in VIEWS_VECTOR_FIELD: if which == views_2D.VECTOR_FIELD_Q: mylab = [views_2D.QX.value, views_2D.QY.value, views_2D.QNORM.value] h = self.waterdepth.array[i,j] qx = self.qx.array[i,j] qy = self.qy.array[i,j] qnorm = (qx**2.+qy**2.)**.5 myval = [qx, qy, qnorm] elif which == views_2D.VECTOR_FIELD_U: mylab = [views_2D.UX.value, views_2D.UY.value, views_2D.UNORM.value] h = self.waterdepth.array[i,j] qx = self.qx.array[i,j] qy = self.qy.array[i,j] ux = qx/h uy = qy/h unorm = (ux**2.+uy**2.)**.5 myval = [ux, uy, unorm] if which in VIEWS_COMPLEX: mylab = [views_2D.TOPOGRAPHY.value, views_2D.WATERDEPTH.value, views_2D.WATERLEVEL.value, views_2D.QX.value, views_2D.QY.value, views_2D.QNORM.value, views_2D.UX.value, views_2D.UY.value, views_2D.UNORM.value, views_2D.FROUDE.value] top = self.top.array[i,j] h = self.waterdepth.array[i,j] sl = h+top qx = self.qx.array[i,j] qy = self.qy.array[i,j] qnorm = (qx**2.+qy**2.)**.5 ux = qx/h uy = qy/h unorm = (ux**2.+uy**2.)**.5 fr = unorm/(9.81*h)**.5 myval = [top, h, sl, qx, qy, qnorm, ux, uy, unorm, fr] return myval,mylab
[docs] def delete_lists(self): """ Reset des listes OpenGL de la matrice/vue courante """ if self._which_current in VIEWS_VECTOR_FIELD: self._view.delete_lists() elif self._which_current in VIEWS_COMPLEX: self._view.delete_lists() else: self._current.delete_lists()
[docs] class Wolfresults_2D(Element_To_Draw): """ Manipulation des résultats d'un modèle WOLF2D en multiblocs La classe hérite de 'Element_To_Draw' afin d'être sûr de disposer des informations pour un éventuel affichage dans un viewer 'WolfMapViewer' ATTENTION : - la classe contient un dictionnaire 'myblocks' d'objets 'OneWolfResult' - les clés du dictionnaire sont de type 'block1', 'block2'... 'blockn' --> voir fonction 'getkeyblock' - les entrées ne sont PAS des matrices multiblocks 'WolfArrayMB' mais une classe 'OneWolfResult' contient plusieurs matrices pour chaque type de résultat (water depth, dischargeX, dischargeY, ...) - la classe se comporte donc un peu comme une généralisation d'une matrice 'WolfArrayMB' mais il ne s'agit pas d'une extension par polymorphisme - on retrouve cependant plusieurs routines similaires afin de faciliter l'intégration dans un viewer WX """
[docs] myblocks:dict[str, OneWolfResult]
[docs] head_blocks:dict[str, header_wolf]
[docs] myparam:prev_parameters_simul
[docs] myblocfile:blocks_file
[docs] mymnap:WolfArrayMNAP
def __init__(self, fname:str = None, mold = None, eps=0., idx: str = '', plotted: bool = True, mapviewer=None, need_for_wx: bool = False, loader=None) -> None: """ Initialisation :param fname : nom du fichier de résultats :param mold : objet 'WolfArray' servant de moule pour les résultats :param eps : valeur de l'epsilon pour le masquage des données :param idx : identifiant de l'objet :param plotted : si True alors l'objet est prêt à être affiché :param mapviewer : objet 'WolfMapViewer' pour l'affichage :param need_for_wx : si True alors l'objet abesoin d'une app WX. Si elle n'existe pas, génération d'une erreur ! :param loader : if True then we'll use this function rather than classic one. This param was introduced because one cannot guess if one's loading GPU results by just looking at the filename (GPU results come in directories). """ super().__init__(idx, plotted, mapviewer, need_for_wx) self.filename="" self.filenamegen=self.filename self.linkedvec = None # vecteur d'exclusion de données self.epsilon = eps self._epsilon_default = self.epsilon self.loaded=True self.current_result = -1 self._step_interval = 1 self.mypal = wolfpalette(None,'Colors') self.mypal.default16() self.mypal.automatic = True self.palgray = None self.palblue = None self.nbnotnull=99999 self._which_current_view = views_2D.WATERDEPTH # _('Water depth [m]') self.to_filter_independent = False self.translx=0. self.transly=0. self.head_blocks={} if fname is not None: if loader is not None: if loader(fname) <0: self.loaded = False logging.error(_('Error while loading results - Abort !')) return else: parts=splitext(fname) if len(parts)>1: self.filename = parts[0] else: self.filename = fname self.filenamegen=self.filename if exists(self.filename + '.trl'): with open(self.filename + '.trl') as f: trl=f.read().splitlines() self.translx=float(trl[1]) self.transly=float(trl[2]) self.myblocks={} self.read_param_simul() if exists(self.filename+'.head') or exists(join(dirname(self.filename),'bloc1.head')): wolfpy.r2d_init(self.filename.ljust(255).encode('ansi')) nb_blocks = wolfpy.r2d_nbblocks() for i in range(nb_blocks): curblock = OneWolfResult(i, parent=self) self.myblocks[getkeyblock(i)] = curblock nbx,nby,dx,dy,ox,oy,tx,ty = wolfpy.r2d_hblock(i+1) curhead = self.head_blocks[getkeyblock(i)]=header_wolf() curhead.nbx = nbx curhead.nby = nby curhead.origx = ox curhead.origy = oy curhead.dx = dx curhead.dy = dy curhead.translx = self.translx curhead.transly = self.transly self.myblocks[getkeyblock(i)].waterdepth.dx = dx self.myblocks[getkeyblock(i)].waterdepth.dy = dy self.myblocks[getkeyblock(i)].waterdepth.nbx = nbx self.myblocks[getkeyblock(i)].waterdepth.nby = nby self.myblocks[getkeyblock(i)].waterdepth.origx = ox self.myblocks[getkeyblock(i)].waterdepth.origy = oy self.myblocks[getkeyblock(i)].waterdepth.translx = self.translx self.myblocks[getkeyblock(i)].waterdepth.transly = self.transly self.myblocks[getkeyblock(i)].top.dx = dx self.myblocks[getkeyblock(i)].top.dy = dy self.myblocks[getkeyblock(i)].top.nbx = nbx self.myblocks[getkeyblock(i)].top.nby = nby self.myblocks[getkeyblock(i)].top.origx = ox self.myblocks[getkeyblock(i)].top.origy = oy self.myblocks[getkeyblock(i)].top.translx = self.translx self.myblocks[getkeyblock(i)].top.transly = self.transly self.myblocks[getkeyblock(i)].qx.dx = dx self.myblocks[getkeyblock(i)].qx.dy = dy self.myblocks[getkeyblock(i)].qx.nbx = nbx self.myblocks[getkeyblock(i)].qx.nby = nby self.myblocks[getkeyblock(i)].qx.origx = ox self.myblocks[getkeyblock(i)].qx.origy = oy self.myblocks[getkeyblock(i)].qx.translx = self.translx self.myblocks[getkeyblock(i)].qx.transly = self.transly self.myblocks[getkeyblock(i)].qy.dx = dx self.myblocks[getkeyblock(i)].qy.dy = dy self.myblocks[getkeyblock(i)].qy.nbx = nbx self.myblocks[getkeyblock(i)].qy.nby = nby self.myblocks[getkeyblock(i)].qy.origx = ox self.myblocks[getkeyblock(i)].qy.origy = oy self.myblocks[getkeyblock(i)].qy.translx = self.translx self.myblocks[getkeyblock(i)].qy.transly = self.transly self.myblocks[getkeyblock(i)].rough_n.dx = dx self.myblocks[getkeyblock(i)].rough_n.dy = dy self.myblocks[getkeyblock(i)].rough_n.nbx = nbx self.myblocks[getkeyblock(i)].rough_n.nby = nby self.myblocks[getkeyblock(i)].rough_n.origx = ox self.myblocks[getkeyblock(i)].rough_n.origy = oy self.myblocks[getkeyblock(i)].rough_n.translx = self.translx self.myblocks[getkeyblock(i)].rough_n.transly = self.transly else: nb_blocks = self.myblocfile.nb_blocks for i in range(nb_blocks): #print(f"Reading block {getkeyblock(i)}") curblock = OneWolfResult(i, parent = self) self.myblocks[getkeyblock(i)] = curblock curblock.waterdepth.set_header(self.mymnap.head_blocks[getkeyblock(i)]) curblock.top.set_header(self.mymnap.head_blocks[getkeyblock(i)]) curblock.qx.set_header(self.mymnap.head_blocks[getkeyblock(i)]) curblock.qy.set_header(self.mymnap.head_blocks[getkeyblock(i)]) curblock.rough_n.set_header(self.mymnap.head_blocks[getkeyblock(i)]) self.allocate_ressources() self.read_topography() self.read_ini_mb() self.loaded_rough = False else: self.myblocks={} self.nbx = 1 self.nby = 1 ox=9999999. oy=9999999. ex=-9999999. ey=-9999999. for curblock in self.myblocks.values(): curhead=curblock.waterdepth.get_header(False) ox=min(ox,curhead.origx) oy=min(oy,curhead.origy) ex=max(ex,curhead.origx+float(curhead.nbx)*curhead.dx) ey=max(ey,curhead.origy+float(curhead.nby)*curhead.dy) self.dx = ex-ox self.dy = ey-oy self.origx = ox self.origy = oy self._nb_results = None self.timesteps = [] self.times = [] self.properties:Props_Res_2D = None self.set_properties() self.mngselection = SelectionDataMB(self) self.myops = None self._active_blocks = 0 @property
[docs] def all_dt(self): #FIXME : defined in GPU version --> to be implemented for CPU version pass
@property
[docs] def all_mostly_dry_mesh(self): #FIXME : defined in GPU version --> to be implemented for CPU version pass
@property
[docs] def all_clock_time(self): #FIXME : defined in GPU version --> to be implemented for CPU version pass
@property
[docs] def all_wet_meshes(self): #FIXME : defined in GPU version --> to be implemented for CPU version pass
[docs] def set_opacity(self, alpha:float): """ Set the transparency of the array """ self.alpha = alpha self.reset_plot() return self.alpha
@property
[docs] def SelectionData(self) -> SelectionDataMB: """ Return the data of the selection """ return self.mngselection
@property
[docs] def common_mask_MB(self) -> list[np.ndarray]: """ Common mask for multiblock arrays """ if self.mymnap is not None: return self.mymnap.get_all_masks() else: return None
@property
[docs] def active_blocks(self) -> list["WolfArray"]: """ Return the active blocks """ if self.nb_blocks>0 and self._active_blocks is not None: if isinstance(self._active_blocks, list): return [self.myblocks[cur] for cur in self._active_blocks] elif self._active_blocks == 0: return [k for k in self.myblocks.values()] elif self._active_blocks in self.myblocks: return [self.myblocks[self._active_blocks]] else: return None else: return [self]
@active_blocks.setter def active_blocks(self, value:Union[str, int, list[int]]): """ Set the active blocks :param value: name of the block or index 1-based or list of index 1-based """ if isinstance(value, str): if value in self.myblocks: self._active_blocks = value logging.info(_(f'Block found - {value}')) else: self._active_blocks = None logging.info(_('Block not found')) elif isinstance(value, int): if value == 0: self._active_blocks = 0 logging.info(_('All blocks selected')) else: value = getkeyblock(value, addone=False) if value in self.myblocks: self._active_blocks = value logging.info(_(f'Block found - {value}')) else: self._active_blocks = None logging.info(_('Block not found')) elif isinstance(value, list): if 0 in value: self._active_blocks = 0 logging.info(_('All blocks selected')) else: value = [getkeyblock(cur, addone=False) for cur in value] value = [cur for cur in value if cur in self.myblocks] if len(value)>0: self._active_blocks = value logging.info(_('List of blocks selected')) else: self._active_blocks = None logging.info(_('No block found')) else: logging.error(_('Unknown type for active_blocks'))
[docs] def default16(self): """ Mise à jour de la palette par défaut """ self.mypal.default16() if self.get_mapviewer() is not None: self.reset_plot() self.get_mapviewer().Refresh()
[docs] def defaultblue(self): """ Mise à jour de la palette par défaut """ self.mypal.defaultblue() if self.get_mapviewer() is not None: self.reset_plot() self.get_mapviewer().Refresh()
[docs] def defaultgrey(self): """ Mise à jour de la palette par défaut """ self.mypal.defaultgray() if self.get_mapviewer() is not None: self.reset_plot() self.get_mapviewer().Refresh()
[docs] def get_bounds(self, abs=True): """ Return bounds in coordinates :param abs = if True, add translation to (x, y) (coordinate to global space) :return : tuple of two lists of two floats - ([xmin, xmax],[ymin, ymax]) """ if abs: return ([self.origx + self.translx, self.origx + self.translx + float(self.nbx) * self.dx], [self.origy + self.transly, self.origy + self.transly + float(self.nby) * self.dy]) else: return ([self.origx, self.origx + float(self.nbx) * self.dx], [self.origy, self.origy + float(self.nby) * self.dy])
[docs] def check_bounds_ij(self, i:int, j:int): """Check if i and j are inside the array bounds""" x,y = self.get_xy_from_ij(i,j) return self.check_bounds_xy(x,y)
[docs] def check_bounds_xy(self, x:float, y:float): """Check if i and j are inside the array bounds""" (xmin, xmax), (ymin, ymax) = self.get_bounds() return x>=xmin and x<=xmax and y>=ymin and y<=ymax
@property
[docs] def step_interval_results(self): return self._step_interval
@step_interval_results.setter def step_interval_results(self, value:int): self._step_interval = value @property
[docs] def nullvalue(self): """ Get nullvalue from the first block """ return self[0].nullvalue
@nullvalue.setter def nullvalue(self, value): """ Set nullvalue for all blocks """ for i in range(self.nb_blocks): self[i].nullvalue = value @property
[docs] def alpha(self): return self[0].alpha
@alpha.setter def alpha(self, value:float): if value <0.: value = 0. if value > 1.: value = 1. for i in range(self.nb_blocks): self[i].alpha = value @property
[docs] def shading(self): return self[0].shading
@shading.setter def shading(self, value:bool): for i in range(self.nb_blocks): self[i].shading = value @property
[docs] def altitudehill(self): return self[0].altitudehill
@altitudehill.setter def altitudehill(self, value:bool): for i in range(self.nb_blocks): self[i].altitudehill = value @property
[docs] def azimuthhill(self): return self[0].azimuthhill
@azimuthhill.setter def azimuthhill(self, value:bool): for i in range(self.nb_blocks): self[i].azimuthhill = value @property
[docs] def shaded(self): return self[0].shaded
[docs] def show_properties(self): """ Affichage des propriétés de la matrice """ if self.wx_exists and self.properties is not None: self.properties.SetTitle(_('Properties : ') + self.idx) self.properties.Show()
[docs] def set_properties(self): """ Create : - Props_Res_2D (GUI) if mapviewer is not None """ if self.wx_exists and self.mapviewer is not None: self.properties = Props_Res_2D(self, self.mapviewer) self.properties.Hide() else: self.properties = None
[docs] def get_header(self, abs=True) -> header_wolf: """ Récupération de l'entête de la matrice """ curhead = header_wolf() curhead.origx = self.origx curhead.origy = self.origy curhead.dx = self.dx curhead.dy = self.dy curhead.nbx = self.nbx curhead.nby = self.nby curhead.translx = self.translx curhead.transly = self.transly curhead.head_blocks = self.head_blocks.copy() if abs: curhead.origx += curhead.translx curhead.origy += curhead.transly curhead.origz += curhead.translz curhead.translx = 0. curhead.transly = 0. curhead.translz = 0. return curhead
def __getitem__(self, block_key:Union[int,str]) -> WolfArray: """Access a block's array of this multi-blocks Result""" if isinstance(block_key,int): _key = getkeyblock(block_key) else: _key = block_key if _key in self.myblocks.keys(): return self.myblocks[_key].current else: logging.error(_('Block not found in Wolfresults_2D')) return None """ Iterator """ def __iter__(self): self._iter = 0 return self[self._iter] def __next__(self): self._iter += 1 return self[self._iter] """ End Iterator """
[docs] def as_WolfArray(self, copyarray:bool=True, force_mb = False) -> Union[WolfArray, WolfArrayMB]: """ Récupération d'une matrice MB ou Mono sur base du résultat courant :param copyarray : si True alors on copie les données :param force_mb : if True then return a WolfArrayMB even if there is only one block """ if self.nb_blocks>1 or force_mb: retarray = WolfArrayMB(nullvalue=self[0].nullvalue) retarray.set_header(self.get_header()) for i in range(self.nb_blocks): retarray.add_block(self[i], copyarray=copyarray) else: if copyarray : retarray = WolfArray(mold=self[0], nullvalue=self[0].nullvalue) else: retarray = self[0] return retarray
@property
[docs] def nb_blocks(self): """ Nombre de blocs """ return len(self.myblocks)
@property
[docs] def sediment_diameter(self): try: return self.myblocks[getkeyblock(0)].sediment_diameter except: return None
@sediment_diameter.setter def sediment_diameter(self, value:float): for curblock in self.myblocks.values(): force = curblock.sediment_diameter != value curblock.sediment_diameter = value # forcer la MAJ si nécessaire if self.get_currentview() == views_2D.SHIELDS_NUMBER and force: self.set_currentview() @property
[docs] def sediment_density(self): try: return self.myblocks[getkeyblock(0)].sediment_density except: return None
@sediment_density.setter def sediment_density(self, value:float): for curblock in self.myblocks.values(): force = curblock.sediment_density != value curblock.sediment_density = value # forcer la MAJ si nécessaire if self.get_currentview() == views_2D.SHIELDS_NUMBER and force: self.set_currentview()
[docs] def load_default_colormap(self, which:str): """ Lecture d'une palette disponible dans le répertoire "models" """ dir = join(dirname(__file__), 'models') if exists(join(dir, which + '.pal')): try: self.mypal.readfile(join(dir, which + '.pal')) if which.endswith('_cst'): self.mypal.interval_cst = True else: self.mypal.interval_cst = False except: return else: logging.warning(_('Bad file - {}'.format(which))) return
[docs] def get_times_steps(self, nb:int = None): """ Récupération des temps réels et des pas de calcul de chaque résultat sur disque :param nb : nombre de résultats à lire - si None alors on lit tous les résultats :return : tuple (liste des temps, liste des pas de calcul) """ if nb is None: if self._nb_results is None: nb = self.get_nbresults() else: nb = self._nb_results if nb == 0: self.times, self.timesteps = [],[] else: wolfpy.r2d_init(self.filename.ljust(255).encode('ansi')) self.times, self.timesteps = wolfpy.get_times_steps(nb) return self.times, self.timesteps
[docs] def find_minmax(self,update=False): """Find spatial bounds""" self.xmin = self.origx + self.translx self.xmax = self.origx + self.translx + float(self.nbx) * self.dx self.ymin = self.origy + self.transly self.ymax = self.origy + self.transly + float(self.nby) * self.dy
[docs] def get_norm_max(self): """ Retourne la norme maximale du champ de débit ou de vitesse """ nmax=[] for curblock in self.myblocks.values(): nmax.append(curblock.get_norm_max()) return nmax
[docs] def update_zoom_2(self, newzoom): for curblock in self.myblocks.values(): curblock.update_zoom_2(newzoom)
[docs] def update_zoom_factor(self): if self._which_current_view in VIEWS_VECTOR_FIELD or self._which_current_view in VIEWS_COMPLEX: nmax = self.get_norm_max() maxq = np.max(np.asarray([cur[2] for cur in nmax])) maxq_rel = np.max(np.asarray([cur[1] for cur in nmax])) size_min_rel= np.min(np.asarray([cur[0] for cur in nmax])) dmin = self.get_dxdy_min() for idx, curblock in enumerate(self.myblocks.values()): smin = nmax[idx][0] qmax_r = nmax[idx][1] qmax = nmax[idx][2] curblock.update_zoom_vectorfield(qmax / maxq * (1.-size_min_rel) + size_min_rel * 1.)
[docs] def update_arrowpixelsize_vectorfield(self,factor): for curblock in self.myblocks.values(): curblock.update_arrowpixelsize_vectorfield(factor)
[docs] def get_dxdy_min(self): """Return the minimal size into blocks""" dmin=99999 for curblock in self.myblocks.values(): dmin = min(dmin,curblock.waterdepth.dx) dmin = min(dmin,curblock.waterdepth.dy) return dmin
[docs] def get_dxdy_max(self): """Return the maximal size into blocks""" dmax=-99999 for curblock in self.myblocks.values(): dmax = max(dmax,curblock.waterdepth.dx) dmax = max(dmax,curblock.waterdepth.dy) return dmax
[docs] def read_param_simul(self): """Read simulation parameters from files""" self.myparam = prev_parameters_simul(self) self.myparam.read_file(self.filename) self.myblocfile = blocks_file(self) # self.myblocfile.read_file() self.mymnap = WolfArrayMNAP(self.filename)
[docs] def get_currentview(self): """Return the current view""" return self._which_current_view
[docs] def filter_inundation(self, eps:float=None, linkedvec:vector = None): """ Apply filtering on array :param eps : mask data below eps :param linkedvec : mask data outside linkedvec """ if eps is not None: self.epsilon = eps if linkedvec is not None: self.linkedvec = linkedvec self.mimic_plotdata() for curblock in self.myblocks.values(): curblock.filter_inundation()
[docs] def filter_independent_zones(self, n_largest:int = 1): """ Apply filtering on array """ for curblock in self.myblocks.values(): curblock.filter_independent_zones(n_largest)
[docs] def set_currentview(self, which=None, force_wx=False, force_updatepal:bool=False): """ Set the current view --> see 'views_2D' for supported values :param which : view to set :param force_wx : if True, a wx dialog will be shown to set the minimum size of the vector field :param force_updatepal : if True, the palette will be updated """ if which is None: which = self._which_current_view if self.wx_exists and force_wx: if which in VIEWS_VECTOR_FIELD or which in VIEWS_COMPLEX: dlg = wx.TextEntryDialog(None,_('Minimum size of the vector field (0 --> 1) -- 1 == equal size'),_('Size'), str(self.myblocks[getkeyblock(0)].min_field_size)) ret = dlg.ShowModal() minsize = max(0,min(1,float(dlg.GetValue()))) dlg.Destroy() for curblock in self.myblocks.values(): curblock.min_field_size = minsize if which in views_2D: # self.delete_lists() self._which_current_view = which self.plotting=True self.mimic_plotdata() if which in VIEWS_SEDIMENTARY: if not self.loaded_rough: self.read_roughness_param() for curblock in self.myblocks.values(): curblock.set_current(which) # Epsilon is set to 0. by default # OK for all views except QX, QY, UX, UY if which in [views_2D.QX, views_2D.QY, views_2D.UX, views_2D.UY]: self.epsilon = -999999. else: self.epsilon = self._epsilon_default self.filter_inundation() if self.to_filter_independent: self.filter_independent_zones() if which in VIEWS_VECTOR_FIELD or which in VIEWS_COMPLEX: self.update_zoom_factor() if force_updatepal: self.mypal.automatic = True self.updatepalette() else: self.link_palette() self.plotting=False self.mimic_plotdata() if self.mapviewer is not None: self.reset_plot() self.mapviewer.Refresh()
[docs] def allocate_ressources(self): """Allocation de l'espace mémoire utile pour le stockage des résultats de chaque bloc""" for curblock in self.myblocks.values(): curblock.waterdepth.allocate_ressources() curblock.top.allocate_ressources() curblock.qx.allocate_ressources() curblock.qy.allocate_ressources()
[docs] def read_topography(self): """Lecture de la topographie de modélisation""" if exists(self.filename.strip() + '.topini'): all_masks = self.common_mask_MB with open(self.filename.strip() + '.topini','rb') as f: for i in range(self.nb_blocks): nbx=self.myblocks[getkeyblock(i)].top.nbx nby=self.myblocks[getkeyblock(i)].top.nby nbbytes=nbx*nby*4 self.myblocks[getkeyblock(i)].top.array = ma.masked_equal(np.frombuffer(f.read(nbbytes),dtype=np.float32),0.) self.myblocks[getkeyblock(i)].top.array = self.myblocks[getkeyblock(i)].top.array.reshape(nbx,nby,order='F') self.myblocks[getkeyblock(i)].top.array.mask[:,:] = all_masks[i][:,:]
[docs] def read_ini_mb(self): """Lecture des conditions initiales""" # all_masks = self.common_mask_MB() if exists(self.filename.strip() + '.hbinb'): with open(self.filename.strip() + '.hbinb','rb') as f: for i in range(self.nb_blocks): nbx=self.myblocks[getkeyblock(i)].waterdepth.nbx nby=self.myblocks[getkeyblock(i)].waterdepth.nby nbbytes=nbx*nby*4 self.myblocks[getkeyblock(i)].waterdepth.array = ma.masked_equal(np.frombuffer(f.read(nbbytes),dtype=np.float32),0.) self.myblocks[getkeyblock(i)].waterdepth.array = self.myblocks[getkeyblock(i)].waterdepth.array.reshape(nbx,nby,order='F') # self.myblocks[getkeyblock(i)].waterdepth.array.mask = all_masks[i].copy() if exists(self.filename.strip() + '.qxbinb'): with open(self.filename.strip() + '.qxbinb','rb') as f: for i in range(self.nb_blocks): nbx=self.myblocks[getkeyblock(i)].qx.nbx nby=self.myblocks[getkeyblock(i)].qx.nby nbbytes=nbx*nby*4 self.myblocks[getkeyblock(i)].qx.array = ma.masked_equal(np.frombuffer(f.read(nbbytes),dtype=np.float32),0.) self.myblocks[getkeyblock(i)].qx.array = self.myblocks[getkeyblock(i)].qx.array.reshape(nbx,nby,order='F') # self.myblocks[getkeyblock(i)].qx.array.mask = all_masks[i].copy() if exists(self.filename.strip() + '.qybinb'): with open(self.filename.strip() + '.qybinb','rb') as f: for i in range(self.nb_blocks): nbx=self.myblocks[getkeyblock(i)].qy.nbx nby=self.myblocks[getkeyblock(i)].qy.nby nbbytes=nbx*nby*4 self.myblocks[getkeyblock(i)].qy.array = ma.masked_equal(np.frombuffer(f.read(nbbytes),dtype=np.float32),0.) self.myblocks[getkeyblock(i)].qy.array = self.myblocks[getkeyblock(i)].qy.array.reshape(nbx,nby,order='F')
# self.myblocks[getkeyblock(i)].qy.array.mask = all_masks[i].copy()
[docs] def read_roughness_param(self): """Lecture du frottement de modélisation""" with open(self.filename.strip() + '.frotini','rb') as f: for i in range(self.nb_blocks): nbx=self.myblocks[getkeyblock(i)].rough_n.nbx nby=self.myblocks[getkeyblock(i)].rough_n.nby nbbytes=nbx*nby*4 self.myblocks[getkeyblock(i)].rough_n.array = ma.masked_equal(np.frombuffer(f.read(nbbytes),dtype=np.float32),0.) self.myblocks[getkeyblock(i)].rough_n.array = self.myblocks[getkeyblock(i)].rough_n.array.reshape(nbx,nby,order='F') self.loaded_rough = True
[docs] def get_nbresults(self, force_update_timessteps=True): """Récupération du nombre de pas sauvegardés --> utilisation de la librairie Fortran""" if exists(self.filename+'.head'): wolfpy.r2d_init(self.filename.ljust(255).encode('ansi')) nb = wolfpy.r2d_getnbresults() self._nb_results = nb if force_update_timessteps: self.get_times_steps(nb) return nb else: return 1
[docs] def read_oneblockresult_withoutmask(self, which_step:int=-1,whichblock:int=-1): """ Lecture d'un résultat pour un bloc spécifique --> utilisation de la librairie Fortran :param which_step : timestep ; 0-based; -1 == last one :param whichblock : block index """ which_step = self._sanitize_result_step(which_step) if whichblock!=-1: block = self.myblocks[getkeyblock(whichblock,False)] nbx = block.waterdepth.nbx nby = block.waterdepth.nby # Fortran is 1-based --> which_step+1 block.waterdepth.array, block.qx.array, block.qy.array = wolfpy.r2d_getresults(which_step+1, nbx, nby, whichblock) block.k.array, block.eps.array = wolfpy.r2d_getturbresults(which_step+1, nbx, nby, whichblock) block._force_update_shields = True
[docs] def _read_oneblockresult_withoutmask_only_h(self, which_step:int=-1,whichblock:int=-1): """ Lecture d'un résultat pour un bloc spécifique --> utilisation de la librairie Fortran :param which_step : timestep ; 0-based; -1 == last one :param whichblock : block index """ which_step = self._sanitize_result_step(which_step) if whichblock!=-1: block = self.myblocks[getkeyblock(whichblock,False)] nbx = block.waterdepth.nbx nby = block.waterdepth.nby # Fortran is 1-based --> which_step+1 block.waterdepth.array, block.qx.array, block.qy.array = wolfpy.r2d_getresults(which_step+1, nbx, nby, whichblock) block._force_update_shields = True
[docs] def read_oneblockresult(self, which_step:int=-1, whichblock:int=-1): """ Lecture d'un résultat pour un bloc spécifique et application d'un masque sur base d'nu epsilon de hauteur d'eau which_step: result number to read ; 0-based; -1 == last one whichblock : block index ; 1-based """ which_step = self._sanitize_result_step(which_step) if whichblock!=-1: self.read_oneblockresult_withoutmask(which_step, whichblock) if self.epsilon > 0.: self.myblocks[getkeyblock(whichblock,False)].waterdepth.array=ma.masked_less_equal(self.myblocks[getkeyblock(whichblock,False)].waterdepth.array,self.epsilon) else: self.myblocks[getkeyblock(whichblock,False)].waterdepth.array=ma.masked_equal(self.myblocks[getkeyblock(whichblock,False)].waterdepth.array,0.) self.myblocks[getkeyblock(whichblock,False)].qx.array=ma.masked_where(self.myblocks[getkeyblock(whichblock,False)].waterdepth.array.mask,self.myblocks[getkeyblock(whichblock,False)].qx.array) self.myblocks[getkeyblock(whichblock,False)].qy.array=ma.masked_where(self.myblocks[getkeyblock(whichblock,False)].waterdepth.array.mask,self.myblocks[getkeyblock(whichblock,False)].qy.array) self.myblocks[getkeyblock(whichblock,False)].k.array=ma.masked_where(self.myblocks[getkeyblock(whichblock,False)].waterdepth.array.mask,self.myblocks[getkeyblock(whichblock,False)].k.array) self.myblocks[getkeyblock(whichblock,False)].eps.array=ma.masked_where(self.myblocks[getkeyblock(whichblock,False)].waterdepth.array.mask,self.myblocks[getkeyblock(whichblock,False)].eps.array) self.myblocks[getkeyblock(whichblock,False)].waterdepth.count() self.myblocks[getkeyblock(whichblock,False)].qx.count() self.myblocks[getkeyblock(whichblock,False)].qy.count() self.myblocks[getkeyblock(whichblock,False)].k.count() self.myblocks[getkeyblock(whichblock,False)].eps.count() self.myblocks[getkeyblock(whichblock,False)].waterdepth.set_nullvalue_in_mask() self.myblocks[getkeyblock(whichblock,False)].qx.set_nullvalue_in_mask() self.myblocks[getkeyblock(whichblock,False)].qy.set_nullvalue_in_mask() self.myblocks[getkeyblock(whichblock,False)].k.set_nullvalue_in_mask() self.myblocks[getkeyblock(whichblock,False)].eps.set_nullvalue_in_mask()
[docs] def _read_oneblockresult_only_h(self, which_step:int=-1, whichblock:int=-1): """ Lecture d'un résultat pour un bloc spécifique et application d'un masque sur base d'nu epsilon de hauteur d'eau which_step: result number to read ; 0-based; -1 == last one whichblock : block index ; 1-based """ which_step = self._sanitize_result_step(which_step) if whichblock!=-1: self._read_oneblockresult_withoutmask_only_h(which_step, whichblock) if self.epsilon > 0.: self.myblocks[getkeyblock(whichblock,False)].waterdepth.array=ma.masked_less_equal(self.myblocks[getkeyblock(whichblock,False)].waterdepth.array,self.epsilon) else: self.myblocks[getkeyblock(whichblock,False)].waterdepth.array=ma.masked_equal(self.myblocks[getkeyblock(whichblock,False)].waterdepth.array,0.) self.myblocks[getkeyblock(whichblock,False)].waterdepth.count() self.myblocks[getkeyblock(whichblock,False)].waterdepth.set_nullvalue_in_mask()
[docs] def read_oneresult(self, which:int=-1): """ Lecture d'un pas de sauvegarde :param which: result number to read; 0-based; -1 == last one """ which = self._sanitize_result_step(which) if exists(self.filename+'.head'): logging.info(_('Reading from results - step :{}'.format(which+1))) wolfpy.r2d_init(self.filename.ljust(255).encode('ansi')) for i in range(self.nb_blocks): self.read_oneblockresult(which, i+1) else: logging.info(_('No ".head" file --> Initial Conditions')) if self.to_filter_independent: self.filter_independent_zones() self.current_result = which self.loaded=True
[docs] def _read_oneresult_only_h(self, which:int=-1): """ Lecture d'un pas de sauvegarde :param which: result number to read; 0-based; -1 == last one """ which = self._sanitize_result_step(which) if exists(self.filename+'.head'): logging.info(_('Reading from results - step :{}'.format(which+1))) wolfpy.r2d_init(self.filename.ljust(255).encode('ansi')) for i in range(self.nb_blocks): self._read_oneblockresult_only_h(which, i+1) else: logging.info(_('No ".head" file --> Initial Conditions')) if self.to_filter_independent: self.filter_independent_zones() self.current_result = which self.loaded=True
[docs] def read_next(self): """ Lecture du pas suivant """ self.current_result += self._step_interval self._update_result_view()
[docs] def _sanitize_result_step(self, which_step:int=-1): """ Sanitize result step index -- 0-based """ nb = self.get_nbresults() while which_step<0: which_step += nb which_step = min(nb-1, which_step) which_step = max(0, which_step) return which_step
[docs] def _update_result_view(self): self.current_result = self._sanitize_result_step(self.current_result) if exists(self.filename+'.head'): logging.info(_('Reading result step :{}'.format(self.current_result))) wolfpy.r2d_init(self.filename.ljust(255).encode('ansi')) for i in range(self.nb_blocks): self.read_oneblockresult(self.current_result, i+1) else: logging.info(_('No ".head" file --> Initial Conditions')) self.loaded=True
[docs] def set_hqxqy_as_initial_conditions(self, idx:int = None, as_multiblocks:bool = True): """ Set the result as IC :param idx : 0-based index """ if idx is not None: if idx>=0 and idx<self.get_nbresults(): self.read_oneresult(idx) elif idx ==-1: self.read_oneresult(-1) # last one else: logging.error(_('Bad index for initial conditions')) return self.set_currentview(views_2D.WATERDEPTH) hini = self.as_WolfArray() self.set_currentview(views_2D.QX) qxini = self.as_WolfArray() self.set_currentview(views_2D.QY) qyini = self.as_WolfArray() if (hini is not None) and (qxini is not None) and (qyini is not None): if as_multiblocks: hini.write_all(self.filename.strip() + '.hbinb') qxini.write_all(self.filename.strip() + '.qxbinb') qyini.write_all(self.filename.strip() + '.qybinb') logging.info(_('Initial conditions saved as MB binary files')) else: hini = hini.as_WolfArray() qxini = qxini.as_WolfArray() qyini = qyini.as_WolfArray() hini.write_all(self.filename.strip() + '.hbin') qxini.write_all(self.filename.strip() + '.qxbin') qyini.write_all(self.filename.strip() + '.qybin') logging.info(_('Initial conditions saved as binary files')) else: logging.error(_('No initial conditions saved'))
[docs] def set_keps_as_initial_conditions(self, idx:int = None, as_multiblocks:bool = True): """ Set the turbulent result as IC :param idx : 0-based index """ if idx is not None: if idx>=0 and idx<self.get_nbresults(): self.read_oneresult(idx) elif idx ==-1: self.read_oneresult(-1) # last one else: logging.error(_('Bad index for initial conditions')) return self.set_currentview(views_2D.KINETIC_ENERGY) kini = self.as_WolfArray() self.set_currentview(views_2D.EPSILON) epsini = self.as_WolfArray() if (kini is not None) and (epsini is not None): if as_multiblocks: kini.write_all(self.filename.strip() + '.kbinb') epsini.write_all(self.filename.strip() + '.epsbinb') logging.info(_('Initial conditions saved as MB binary files')) else: kini = kini.as_WolfArray() epsini = epsini.as_WolfArray() kini.write_all(self.filename.strip() + '.kbin') epsini.write_all(self.filename.strip() + '.epsbin') logging.info(_('Initial conditions saved as binary files')) else: logging.error(_('No initial conditions saved'))
[docs] def read_previous(self): """ Lecture du pas suivant """ # if self.current_result > 0: self.current_result -= self._step_interval self._update_result_view()
[docs] def get_h_for_block(self, block: Union[int, str]) -> WolfArray: """ Retourne la matrice de hauteur d'eau pour un bloc spécifique block : numéro du bloc; 1-based; """ if isinstance(block,str): return self.myblocks[block].waterdepth else: return self.myblocks[getkeyblock(block,False)].waterdepth
[docs] def get_top_for_block(self, block: Union[int, str]) -> WolfArray: """ Retourne la matrice de topographie pour un bloc spécifique block : numéro du bloc; 1-based; """ if isinstance(block,str): return self.myblocks[block].top else: return self.myblocks[getkeyblock(block,False)].top
[docs] def get_qx_for_block(self, block: Union[int, str]) -> WolfArray: """ Retourne la matrice de débit selon X pour un bloc spécifique block : numéro du bloc; 1-based; """ if isinstance(block,str): return self.myblocks[block].qx else: return self.myblocks[getkeyblock(block,False)].qx
[docs] def get_qy_for_block(self, block: Union[int, str]) -> WolfArray: """ Retourne la matrice de débit selon Y pour un bloc spécifique block : numéro du bloc; 1-based; """ if isinstance(block,str): return self.myblocks[block].qy else: return self.myblocks[getkeyblock(block,False)].qy
[docs] def get_values_as_wolf(self, i:int, j:int, which_block:int=1): """ Retourne les valeurs associées à des indices (i,j) et un numéro de block which_block : numéro du bloc; 1-based; *** ATTENTION : Les indices sont passés comme WOLF --> en numérotation Fortran (démarrage à 1 et non à 0) *** """ h=-1 qx=-1 qy=-1 vx=-1 vy=-1 vabs=-1 fr=-1 nbx = self.myblocks[getkeyblock(which_block,False)].waterdepth.nbx nby = self.myblocks[getkeyblock(which_block,False)].waterdepth.nby if(i>0 and i<=nbx and j>0 and j<=nby): h = self.myblocks[getkeyblock(which_block,False)].waterdepth.array[i-1,j-1] top = self.myblocks[getkeyblock(which_block,False)].top.array[i-1,j-1] qx = self.myblocks[getkeyblock(which_block,False)].qx.array[i-1,j-1] qy = self.myblocks[getkeyblock(which_block,False)].qy.array[i-1,j-1] if(h>0.): vx = qx/h vy = qy/h vabs=(vx**2.+vy**2.)**.5 fr = vabs/(9.81*h)**.5 return h,qx,qy,vx,vy,vabs,fr,h+top,top
[docs] def get_values_turb_as_wolf(self, i:int, j:int, which_block:int=1): """ Retourne les valeurs de turbulence associées à des indices (i,j) et un numéro de block which_block : numéro du bloc; 1-based; ATTENTION : Les indices sont passés comme WOLF --> en numérottaion Fortran (démarrage à 1 et non à 0) """ k=-1 e=-1 nut=-1 nbx = self.myblocks[getkeyblock(which_block,False)].waterdepth.nbx nby = self.myblocks[getkeyblock(which_block,False)].waterdepth.nby if(i>0 and i<=nbx and j>0 and j<=nby): k = self.myblocks[getkeyblock(which_block,False)].k.array[i-1,j-1] e = self.myblocks[getkeyblock(which_block,False)].eps.array[i-1,j-1] if e>0.: nut = 0.09*k**2./e return k,e,nut
[docs] def get_header_block(self, which_block=1) -> header_wolf: """ Obtention du header_wolf d'un block which_block : numéro du bloc; 1-based; """ return self.head_blocks[getkeyblock(which_block,False)]
[docs] def get_xy_infootprint_vect(self, myvect: vector, which_block=1) -> tuple[np.ndarray,np.ndarray]: """ :return numpy array content les coordonnées (x,y) des mailles dans l'empreinte du vecteur """ myptsij = self.get_ij_infootprint_vect(myvect, which_block) mypts=np.asarray(myptsij.copy(),dtype=np.float64) lochead = self.get_header_block(which_block) mypts[:,0] = (mypts[:,0]+.5)*lochead.dx +lochead.origx +lochead.translx mypts[:,1] = (mypts[:,1]+.5)*lochead.dy +lochead.origy +lochead.transly return mypts,myptsij
[docs] def get_ij_infootprint_vect(self, myvect: vector, which_block=1) -> np.ndarray: """ Returns: numpy array content les indices ij des mailles dans l'empreinte du vecteur """ lochead = self.get_header_block(which_block) nbx = lochead.nbx nby = lochead.nby i1, j1 = self.get_ij_from_xy(myvect.xmin, myvect.ymin, which_block) i2, j2 = self.get_ij_from_xy(myvect.xmax, myvect.ymax, which_block) i1 = max(i1,0) j1 = max(j1,0) i2 = min(i2,nbx-1) j2 = min(j2,nby-1) xv,yv = np.meshgrid(np.arange(i1,i2+1),np.arange(j1,j2+1)) mypts = np.hstack((xv.flatten()[:,np.newaxis],yv.flatten()[:,np.newaxis])) return mypts
[docs] def get_xy_inside_polygon(self, myvect: vector, usemask=True): """ Obtention des coordonnées contenues dans un polygone usemask = restreint les éléments aux éléments non masqués de la matrice """ myvect.find_minmax() mypointsxy={} myvert = myvect.asnparray() path = mpltPath.Path(myvert) for curblock in range(self.nb_blocks): locpointsxy,locpointsij = self.get_xy_infootprint_vect(myvect,curblock+1) inside = path.contains_points(locpointsxy) locpointsxy = locpointsxy[np.where(inside)] if usemask and len(locpointsxy)>0: locpointsij = locpointsij[np.where(inside)] mymask = np.logical_not(self.myblocks[getkeyblock(curblock)].current.array.mask[locpointsij[:, 0], locpointsij[:, 1]]) locpointsxy = locpointsxy[np.where(mymask)] mypointsxy[getkeyblock(curblock)]=locpointsxy return mypointsxy
[docs] def get_xy_under_polyline(self, myvect: vector) -> dict[str, (int,int)]: """ Obtention des coordonnées (x,y) sous une polyligne avec séparation des points par bloc usemask = restreint les éléments aux éléments non masqués de la matrice """ ds = self.get_dxdy_min() # récupération de la taille la plus fine pts = myvect._refine2D(ds)# récupération des (x,y) selon le vecteur au pas le plus fin mypoints={} for idx in range(self.nb_blocks): mypoints[getkeyblock(idx)]=[] for curpt in pts: i,j,curblock = self.get_blockij_from_xy(curpt.x, curpt.y, aswolf=False) if curblock>-1: mypoints[getkeyblock(curblock)].append([curpt.x, curpt.y]) return mypoints
[docs] def get_values_insidepoly(self, myvect:vector, usemask:bool=True, agglo:bool=True, getxy:bool=False): """ Retourne les valeurs des mailles contenues dans un polygone Traite la matrice courante et l'altitude de fond si on est en vue 'views_2D.WATERLEVEL' :param myvect : polygone :param usemask (optional) restreint les éléments aux éléments non masqués de la matrice :param agglo (optional) agglomère le résultat en une seule liste plutôt que d'avoir autant de liste que de blocs :param getxy (optional) retourne en plus les coordonnées des points :return myvalues : valeurs des mailles """ myvalues={} myvaluesel={} mypoints = self.get_xy_inside_polygon(myvect, usemask) for curblock in range(self.nb_blocks): curkey = getkeyblock(curblock) if len(mypoints[curkey])>0: locval = np.asarray([self.get_value(cur[0], cur[1]) for cur in mypoints[curkey]]) locel = np.asarray([self.get_value_elevation(cur[0],cur[1]) for cur in mypoints[curkey]]) locval=locval[np.where(locval!=-1)] locel =locel[np.where(locel!=-1)] myvalues[curkey]=locval myvaluesel[curkey]=locel else: myvalues[curkey] =np.asarray([]) myvaluesel[curkey]=np.asarray([]) if agglo: myvalues = np.concatenate([cur for cur in myvalues.values()]) myvaluesel = np.concatenate([cur for cur in myvaluesel.values()]) mypoints = np.concatenate([cur for cur in mypoints.values()]) if self._which_current_view == views_2D.WATERLEVEL: if getxy: return myvalues,myvaluesel,mypoints else: return myvalues,myvaluesel else: if getxy: return myvalues,None,mypoints else: return myvalues,None
[docs] def get_all_values_insidepoly(self,myvect:vector, usemask=True, agglo=True, getxy=False): """ Récupération de toutes les valeurs dans un polygone usemask (optional) restreint les éléments aux éléments non masqués de la matrice getxy (optional) retourne en plus les coordonnées des points """ myvalues={} mypoints = self.get_xy_inside_polygon(myvect, usemask) for curblock in range(self.nb_blocks): if len(mypoints[getkeyblock(curblock)])>0: locval = np.asarray([self.get_values_from_xy(cur[0], cur[1]) for cur in mypoints[getkeyblock(curblock)]], dtype=object) locval=np.asarray([tuple(valloc) for valloc in locval if tuple(valloc)!=((-1,-1,-1,-1,-1,-1,-1),('-','-','-'))], dtype=object) myvalues[getkeyblock(curblock)]=locval else: myvalues[getkeyblock(curblock)]=np.empty([0,2]) if agglo: myvalues = np.concatenate([cur for cur in myvalues.values()]) mypoints = np.concatenate([cur for cur in mypoints.values()]) if getxy: return myvalues,mypoints else: return myvalues
[docs] def get_all_values_underpoly(self,myvect:vector, usemask=True, agglo=True, getxy=False, integrate_q = False): """ Récupération de toutes les valeurs sous la polyligne Les valeurs retrounées sont identiques à la fonction "get_values_from_xy" soit (h,qx,qy,vx,vy,vabs,fr,h+top,top),(i+1,j+1,curblock.idx+1) usemask (optional) restreint les éléments aux éléments non masqués de la matrice getxy (optional) retourne en plus les coordonnées des points agglo (optional) agglomère le résultat en une seule liste plutôt que d'avoir autant de liste que de blocs """ myvalues={} mypoints = self.get_xy_under_polyline(myvect) for curkey, ptsblock in mypoints.items(): if len(ptsblock)>0: myvalues[curkey]=np.asarray([self.get_values_from_xy(cur[0], cur[1], integrate_q=integrate_q) for cur in ptsblock], dtype=object) else: myvalues[curkey]=np.empty([0,2]) if agglo: loclist = [cur for cur in myvalues.values()] if len(loclist)>0: myvalues = np.concatenate([cur for cur in myvalues.values()]) else: myvalues = [] if getxy: loclist = [cur for cur in mypoints.values() if len(cur)>0] if len(loclist)>0: mypoints = np.concatenate([cur for cur in mypoints.values() if len(cur)>0]) else: mypoints = np.asarray([]) if getxy: return myvalues,mypoints else: return myvalues
[docs] def get_q_alongpoly(self, myvect:vector, x_or_y:str = 'x', to_sum=True): """alias""" self.get_q_underpoly(myvect, x_or_y, to_sum)
[docs] def get_q_underpoly(self, myvect:vector, x_or_y:str = 'x', to_sum=True): """ Récupération du débit sous un vecteur to_sum pour sommer les valeurs et multiplier par la taille de maille """ vals = self.get_all_values_underpoly(myvect, integrate_q=to_sum) if x_or_y.lower()=='x': idxq=1 else: idxq=2 q = np.asarray([cur[0][idxq] for cur in vals]) if to_sum: return q.sum() else: return q
[docs] def get_q_alongpoly_with_neighbor(self, myvect:vector, x_or_y:str = 'x', to_sum=True): """ Récupération du débit sous un vecteur mais uniquement si un voisin existe dans la direction d'intégration to_sum pour sommer les valeurs et multiplier par la taille de maille """ vals, xy = self.get_all_values_underpoly(myvect, integrate_q=to_sum, getxy=True) # ij = [self.get_ij_from_xy(x,y) for x,y in xy] if x_or_y.lower()=='x': # sommation de qx # test du voisinage selon x -> i idxq=1 dx = self[0].dx dy = 0. else: idxq=2 dx = 0. dy = self[0].dy q = np.asarray([cur[0][idxq] for cur in vals]) for idx, ((x,y), val) in enumerate(zip(xy, q)): if val > 0.: unk = self.get_values_from_xy(x+dx,y+dy) if unk[0][0] is np.ma.masked or unk[1][0] =='-': q[idx] = 0. elif val < 0.: unk = self.get_values_from_xy(x-dx,y-dy) if unk[0][0] is np.ma.masked or unk[1][0] =='-': q[idx] = 0. if to_sum: return q.sum() else: return q
[docs] def get_q_alongpoly_raster_splitting(self, myvect:vector, to_sum=True, to_rasterize = True): """ Récupération du débit sous un vecteur après avoir rasterisé selon les bords et appliqué le splitting WOLF aux flux Forcage préalable du vecteur slon la grille de calcul :param myvect : wolf polyline :param to_sum : pour sommer les valeurs et multiplier par la taille de maille :param to_rasterize : pour rasteriser le vecteur selon la grille de calcul """ myhead = self.get_header_block(1) if to_rasterize: myvect = myhead.rasterize_vector(myvect) mynormals = myvect.get_normal_segments() xy_center = myvect.get_center_segments() norm_neigh = mynormals.copy() norm_neigh[:,0] = norm_neigh[:,0] * myhead.dx / 2. norm_neigh[:,1] = norm_neigh[:,1] * myhead.dy / 2. xy_left = xy_center + norm_neigh xy_right = xy_center - norm_neigh values_left = [self.get_values_from_xy(x, y, integrate_q=True) for x,y in xy_left] values_right = [self.get_values_from_xy(x, y, integrate_q=True) for x,y in xy_right] q=[] for curnorm, val_left, val_right in zip(mynormals, values_left, values_right): if not(outside_domain(val_left) or outside_domain(val_right)): if curnorm[0] != 0.: q.append(q_splitting(val_left[0][1], val_left[0][1]) * curnorm[0]) else: q.append(q_splitting(val_left[0][2], val_left[0][2]) * curnorm[1]) q = np.asarray(q) if to_sum: return q.sum() else: return q
[docs] def _plot_one_q(self, vect:vector, x_or_y:str = 'x', absolute=True) -> float: qloc = self.get_q_alongpoly_with_neighbor(vect,x_or_y,True) if absolute: qloc = abs(qloc) return qloc
[docs] def _plot_one_q_raster_splitting(self, vect:vector, absolute=True, to_rasterize = True) -> float: qloc = self.get_q_alongpoly_raster_splitting(vect, to_sum = True, to_rasterize = to_rasterize) if absolute: qloc = abs(qloc) return qloc
[docs] def setup_cache(self, start_idx:int, end_idx:int = -1, only_h:bool = False): """ Setup cache for results #FIXME : not implemented for général 2D results -- just for GPU Defined here for compatibility """ self._cache = None
[docs] def clear_cache(self): """ Clear cache #FIXME : not implemented for général 2D results -- just for GPU Defined here for compatibility """ self._cache = None
[docs] def get_hydrographs(self, vect: Union[vector, list[vector], zone], progress_callback=None): """ Get hydrograph across a vector :param vect: wolf polyline or list of wolf polylines or zone :param progress_callback: optional callback to update progress """ assert isinstance(vect, vector | list | zone), 'Expected a vector' if isinstance(vect, zone): vect = vect.myvectors nb = self.get_nbresults() # Total number of iterations times, steps = self.times, self.timesteps if isinstance(vect, vector): q = [] for i in tqdm(range(nb)): if i == 0: myhead = self.get_header_block(1) vect_raster = myhead.rasterize_vector(vect) self.read_oneresult(i) q.append(self._plot_one_q_raster_splitting(vect_raster, True, to_rasterize=False)) # Update progress if progress_callback: progress_callback(int((i + 1) / nb * 100)) # Percentage elif isinstance(vect, list): q = {str: list} vect_raster = [] myhead = self.get_header_block(1) for i in range(len(vect)): q[vect[i].myname] = [] vect_raster.append(myhead.rasterize_vector(vect[i])) for i in tqdm(range(nb)): self.read_oneresult(i) for curvec, cur_vect_raster in zip(vect, vect_raster): q[curvec.myname].append(self._plot_one_q_raster_splitting(cur_vect_raster, True, to_rasterize=False)) # Update progress if progress_callback: progress_callback(int((i + 1) / nb * 100)) # Percentage return times, q
[docs] def export_hydrographs(self, vect: Union[vector, list[vector], zone], filename: str | Path, progress_callback=None): """ Export hydrograph across a vector as CSV file :param vect: wolf polyline or list of wolf polylines or zone :param filename: output filename :param progress_callback: optional callback to update progress """ assert isinstance(vect, vector | list | zone), 'Expected a vector' filename = Path(filename) filename = filename.with_suffix('.csv') if isinstance(vect, zone): vect = vect.myvectors # Get hydrographs with progress updates times, q = self.get_hydrographs(vect, progress_callback=progress_callback) # Write to file with open(filename, 'w') as f: f.write('Time [s],') if isinstance(vect, vector): f.write(vect.myname) elif isinstance(vect, list): f.write(','.join([cur.myname for cur in vect])) f.write('\n') for i in range(len(times)): f.write(f'{times[i]},') if isinstance(vect, vector): f.write(f'{q[i]}') elif isinstance(vect, list): f.write(','.join([str(q[cur.myname][i]) for cur in vect])) f.write('\n') return True
#FIXME : rename 'x_or_y' to be more explicit
[docs] def plot_q(self, vect:Union[vector, list[vector]], x_or_y:Union[str, list[str]] = 'border', toshow=False, absolute=True, figax = None): """ Plot discharge along vector :param vector : wolf polyline -- will be splitted according to spatial step size :param x_or_y : 'x' for qx, 'y' for qy - integration axis or 'border' for q normal at border :param toshow : show the plot :param absolute : plot absolute value of discharge :param figax : tuple of matplotlib figure and axis """ nb = self.get_nbresults() times, steps = self.times, self.timesteps if figax is None: fig, ax = plt.subplots() else: fig,ax = figax if isinstance(vect, vector): assert x_or_y in ['x', 'y', 'border'], 'x_or_y must be "x", "y" or "border"' q=[] for i in tqdm(range(nb)): self.read_oneresult(i) if x_or_y in ['x', 'y']: q.append(self._plot_one_q(vect, x_or_y, absolute)) elif x_or_y == 'border': if i==0: myhead = self.get_header_block(1) vect_raster = myhead.rasterize_vector(vect) q.append(self._plot_one_q_raster_splitting(vect_raster, absolute, to_rasterize = False)) ax.plot(times,q, c='blue', label=vect.myname) else: assert len(vect) == len(x_or_y), 'Exepected same length for vect and x_or_y' for cur in x_or_y: assert cur in ['x', 'y', 'border'], 'x_or_y must be "x", "y" or "border"' q={int:list} vect_raster = [] myhead = self.get_header_block(1) for i in range(len(vect)): q[i]= [] if x_or_y[i] == 'border': vect_raster.append(myhead.rasterize_vector(vect[i])) for i in tqdm(range(nb)): self.read_oneresult(i) for idx, (curvec, orient) in enumerate(zip(vect, x_or_y)): if orient in ['x', 'y']: q[idx].append(self._plot_one_q(curvec, orient, absolute)) elif orient == 'border': cur_vect_raster = vect_raster[idx] q[idx].append(self._plot_one_q_raster_splitting(cur_vect_raster, absolute, to_rasterize = False)) for i in range(len(vect)): ax.plot(times,q[i], label=vect[i].myname) ax.set_xlabel(_('Time [s]')) ax.set_ylabel(_('Discharge/Flow rate [$m^3s^{-1}$]')) ax.legend(loc=2) fig.tight_layout() if toshow: fig.show() return fig,ax
[docs] def plot_h(self, xy:Union[list[float], vector], h_or_z:Literal['h', 'z', 'head'], toshow=False, figax = None): """ Plot water depth at one or multiple coordinates :param xy : list of coordinates :param h_or_z : 'h' for water depth, 'z' for bed elevation, 'head' for water head :param toshow : show the plot :param figax : tuple of matplotlib figure and axis """ nb = self.get_nbresults(force_update_timessteps=True) times, steps = self.times, self.timesteps vals=[] if isinstance(xy, vector): label = xy.myname oldview = self.get_currentview() if h_or_z == 'h': curview = views_2D.WATERDEPTH elif h_or_z == 'z': curview = views_2D.WATERLEVEL elif h_or_z == 'head': curview = views_2D.HEAD label_y = curview.value for i in tqdm(range(nb)): self.read_oneresult(i) self.set_current(curview) vals.append(self.get_values_insidepoly(xy,)[0]) self.set_currentview(oldview) else: label = _('Selected nodes') for i in tqdm(range(nb)): self.read_oneresult(i) values = [self.get_values_from_xy(x,y) for x,y in xy] which = 0 if h_or_z =='h': which = 0 values = np.asarray([valsloc[0][which] for valsloc in values]) label_y = _('Water depth [$m$]') elif h_or_z =='z': which = 7 values = np.asarray([valsloc[0][which] for valsloc in values]) label_y = _('Water level [$m$]') elif h_or_z =='head': values = np.asarray([valsloc[0][7]+valsloc[0][5]**2./2/9.81 for valsloc in values]) label_y = _('Water head [$m$]') vals.append(values) if figax is None: fig, ax = plt.subplots() else: fig, ax = figax means= [] min = [] max = [] for curvals in vals: means.append(curvals.mean()) min.append(curvals.min()) max.append(curvals.max()) ax.plot(times, means, label=_('Mean value - {}'.format(label))) ax.plot(times, min, label=_('Min value - {}'.format(label))) ax.plot(times, max, label=_('Max value - {}'.format(label))) ax.set_xlabel(_('Time [s]')) ax.set_ylabel(label_y) ax.legend(loc=2) fig.tight_layout() if toshow: fig.show() return fig,ax
[docs] def get_values_from_xy(self, x:float, y:float, aswolf=True, integrate_q = False): """ Retrouve les valeurs sur base de la coordonnée (x,y) aswolf : (optional) si True alors ajoute 1 à i et j pour se retrouver en numérotation VB6/Fortran """ h=-1 qx=-1 qy=-1 vx=-1 vy=-1 vabs=-1 fr=-1 exists=False for i,j,curblock in self.enum_block_xy(x,y): h = curblock.waterdepth.array[i,j] top = curblock.top.array[i,j] qx = curblock.qx.array[i,j] qy = curblock.qy.array[i,j] exists = top>0. if(h>0.): vx = qx/h vy = qy/h vabs=(vx**2.+vy**2.)**.5 fr = vabs/(9.81*h)**.5 exists=True if exists: if integrate_q: qx *= curblock.qx.dy qy *= curblock.qy.dx break if exists: if aswolf: return (h,qx,qy,vx,vy,vabs,fr,h+top,top),(i+1,j+1,curblock.blockindex+1) else: return (h,qx,qy,vx,vy,vabs,fr,h+top,top),(i,j,curblock.blockindex) else: return (-1,-1,-1,-1,-1,-1,-1),('-','-','-')
[docs] def get_values_turb_from_xy(self, x:float, y:float, aswolf=True): """ Retrouve les valeurs de turbulence sur base de la coordonnée (x,y) aswolf : (optional) si True alors ajoute 1 à i et j pour se retrouver en numérotation VB6/Fortran """ h=-1 k=-1 e=-1 nut=-1 exists=False for i,j,curblock in self.enum_block_xy(x,y): h = curblock.waterdepth.array[i,j] k = curblock.k.array[i,j] e = curblock.eps.array[i,j] if(h>0. and e>0.): nut = 0.09*k**2./e exists=True if exists: break if exists: if aswolf: return (k,e,nut),(i+1,j+1,curblock.blockindex+1) else: return (k,e,nut),(i,j,curblock.blockindex) else: return (-1,-1,-1),('-','-','-')
[docs] def get_value(self, x:float, y:float, nullvalue=-1): """ Return the value of the current array at (X,Y) position """ h=-1 exists=False for i,j,curblock in self.enum_block_xy(x,y): h = curblock.waterdepth.array[i,j] val = curblock.current.array[i,j] if h is not np.nan: exists=np.abs(h)>0. if exists: break if exists: return val else: return nullvalue
[docs] def get_values_labels(self,x:float, y:float): """ Return the values and labels of the current view at (X,Y) position """ h=-1 exists=False for i,j, curblock in self.enum_block_xy(x,y): h = curblock.waterdepth.array[i,j] if h is not np.nan: exists=np.abs(h)>0. if exists: break if exists: vals,labs = curblock.get_values_labels(i,j) vals = [self.current_result+1] + vals labs = ["Stored step (1-based)"] + labs vals = [self.times[self.current_result]] + vals labs = ["Real Time [s]"] + labs if self.times[self.current_result]>3600: time_in_sec = self.times[self.current_result] vals = [str(timedelta(seconds=int(time_in_sec), milliseconds=int(time_in_sec-int(time_in_sec))*1000))] + vals labs = ["Real Time [date]"] + labs vals = [self.timesteps[self.current_result]] + vals labs = ["Step Time [iter]"] + labs return vals,labs else: return -1
[docs] def get_value_elevation(self, x:float, y:float, nullvalue=-1): """Return the value of the bed elevation at (X,Y) position""" exists=False for i,j,curblock in self.enum_block_xy(x,y): h = curblock.waterdepth.array[i,j] val = curblock.top.array[i,j] if h is not np.nan: exists=np.abs(h)>0. if exists: break if exists: return val else: return nullvalue
[docs] def get_xy_from_ij(self, i:int, j:int, which_block:int=1, aswolf:bool=False, abs:bool=True): """ Retourne les coordonnées (x,y) depuis les indices (i,j) et le numéro de block :param i: i index :param j: j index :param which_block: block number; 1-based; :param aswolf: True to add 1 to i and j to match the VB6/Fortran numbering format :param abs: True to return absolute coordinates """ x,y = self.myblocks[getkeyblock(which_block,False)].waterdepth.get_xy_from_ij(i, j, aswolf=aswolf, abs=abs) return x,y
[docs] def get_ij_from_xy(self, x:float, y:float, which_block:int = 1, aswolf=False, abs=True): """ Retrouve les indices d'un point (x,y) dans un bloc spécifique Utilise la routine du même nom dans la martrice 'waterdepth' :param x: x coordinate :param y: y coordinate :param which_block: block number; 1-based; :param aswolf: True to add 1 to i and j to match the VB6/Fortran numbering format :param abs: True to return absolute coordinates """ i,j = self.myblocks[getkeyblock(which_block, False)].waterdepth.get_ij_from_xy(x,y, aswolf=aswolf, abs=abs) return i,j # Par défaut en indices Python et non WOLF (VB6/Fortran)
[docs] def _test_bounds_block(self, x:float, y:float, curblock:OneWolfResult): """ Teste les bornes d'un bloc versus les coordonnées (x,y) d'un point """ nbx = curblock.waterdepth.nbx nby = curblock.waterdepth.nby i,j = curblock.waterdepth.get_ij_from_xy(x, y, aswolf=False) if(i>=0 and i<nbx and j>=0 and j<nby): return True else: return False
[docs] def enum_block_xy(self, x:float, y:float, aswolf=False, abs=True): """ Enumération des blocs contenant la coordonnée (x,y) aswolf : True ajoute 1 à i et j pour corresppondre au format de numérotation VB6/Fortran """ for curblock in self.myblocks.values(): if self._test_bounds_block(x, y, curblock): i,j=curblock.waterdepth.get_ij_from_xy(x, y, aswolf=aswolf, abs=abs) yield i,j,curblock
[docs] def get_blockij_from_xy(self, x:float, y:float, abs=True, aswolf=True): """ Retourne les indices i,j et le numéro du block depuis les coordonnées (x,y) aswolf : True ajoute 1 à i et j pour corresppondre au format de numérotation VB6/Fortran """ exists = False for i, j, curblock in self.enum_block_xy(x,y): if not curblock.waterdepth.array.mask[i, j]: exists = True break if exists: if aswolf: return i+1, j+1, curblock.blockindex+1 else: return i, j, curblock.blockindex else: return -1, -1, -1
[docs] def check_plot(self): """ L'objet est coché/à traiter dans une fenêtre graphique 'WolfMapViewer' """ self.plotted = True self.mimic_plotdata() if not self.loaded and self.filename!='': self.read_oneresult(self.current_result) self.reset_plot()
[docs] def uncheck_plot(self, unload=False): """ L'objet est décoché/pas à traiter dans une fenêtre graphique 'WolfMapViewer' """ self.plotted = False self.mimic_plotdata()
[docs] def get_min_max(self, which:Literal[views_2D.TOPOGRAPHY, views_2D.WATERDEPTH, 'current']): """ Retourne la valeur min et max de la topo, de la hauteur d'eau ou de la matrice courante """ def sanitize(val:list, op:Literal['min', 'max']): if len(val) == 0: return 0 if np.all(np.ma.asarray(val) is np.ma.masked): return 0 else: if op == 'min': return np.min(val) else: return np.max(val) if which == views_2D.TOPOGRAPHY: tmp = [np.ma.min(curblock.top.array) for curblock in self.myblocks.values() if curblock.top.nbnotnull>0] min = sanitize(tmp, 'min') tmp = [np.ma.max(curblock.top.array) for curblock in self.myblocks.values() if curblock.top.nbnotnull>0] max = sanitize(tmp, 'max') elif which == views_2D.WATERDEPTH: tmp = [np.ma.min(curblock.waterdepth.array) for curblock in self.myblocks.values() if curblock.waterdepth.nbnotnull>0] min = sanitize(tmp, 'min') tmp = [np.ma.max(curblock.waterdepth.array) for curblock in self.myblocks.values() if curblock.waterdepth.nbnotnull>0] max = sanitize(tmp, 'max') elif which == 'current': tmp = [np.ma.min(curblock.current.array) for curblock in self.myblocks.values() if curblock.current.nbnotnull>0] min = sanitize(tmp, 'min') tmp = [np.ma.max(curblock.current.array) for curblock in self.myblocks.values() if curblock.current.nbnotnull>0] max = sanitize(tmp, 'max') return min,max
[docs] def get_working_array(self,onzoom=[]): """ Délimitation d'une portion de matrice sur base de bornes onzoom : Liste Python de type [xmin, xmax, ymin, ymax] """ if onzoom!=[]: allarrays=[] for curblock in self.myblocks.values(): istart,jstart = curblock._current.get_ij_from_xy(onzoom[0],onzoom[2]) iend,jend = curblock._current.get_ij_from_xy(onzoom[1],onzoom[3]) istart= 0 if istart < 0 else istart jstart= 0 if jstart < 0 else jstart iend= curblock._current.nbx if iend > curblock._current.nbx else iend jend= curblock._current.nby if jend > curblock._current.nby else jend partarray=curblock._current.array[istart:iend,jstart:jend] partarray=partarray[partarray.mask==False] if len(partarray)>0: allarrays.append(partarray.flatten()) allarrays=np.concatenate(allarrays) else: allarrays = np.concatenate([curblock.current.array[curblock.current.array.mask==False].flatten() for curblock in self.myblocks.values()]) self.nbnotnull = allarrays.count() return allarrays
[docs] def updatepalette(self, which=0, onzoom=[]): """ Mise à jour des palettes de couleur/colormaps palgray : niveaux de gris palblue : niveaux de bleu mypal : coloration paramétrique :param which: 0 :param onzoom: Liste Python de type [xmin, xmax, ymin, ymax] """ self.palgray = wolfpalette() self.palblue = wolfpalette() self.palgray.defaultgray() self.palblue.defaultblue() locmin, locmax = self.get_min_max(views_2D.TOPOGRAPHY) self.palgray.distribute_values(locmin, locmax) locmin, locmax = self.get_min_max(views_2D.WATERDEPTH) self.palblue.distribute_values(locmin, locmax) if self.mypal.automatic: # self.mypal.default16() self.mypal.isopop(self.get_working_array(onzoom=onzoom),self.nbnotnull) self.link_palette() if self.properties is not None: # update GUI properties self.properties.update_palette() if self.mypal.automatic: self.properties._cm_auto.SetValue(1) else: self.properties._cm_auto.SetValue(0)
[docs] def delete_lists(self): """ Reset des listes OpenGL de la matrice courante """ for curblock in self.myblocks.values(): curblock.delete_lists()
[docs] def mimic_plotdata(self, plotting=False): """ Force la mise à jour de paramètres entre tous les blocs """ self.plotting=plotting for curblock in self.myblocks.values(): curblock._current.plotted = self.plotted curblock._current.plotting = self.plotting curblock.set_linkedvec(self.linkedvec) curblock.set_epsilon(self.epsilon)
[docs] def plot(self, sx=None, sy=None,xmin=None,ymin=None,xmax=None,ymax=None, size=None): """Dessin OpenGL""" self.mimic_plotdata(True) for curblock in self.myblocks.values(): curblock.plot(sx, sy,xmin,ymin,xmax,ymax) if self.myparam is not None: #conditions limites faibles self.myparam.weak_bc_x.myzones.plot() self.myparam.weak_bc_y.myzones.plot() # Plot selected nodes if self.mngselection is not None: self.mngselection.plot_selection() self.mimic_plotdata(False)
[docs] def fillonecellgrid(self,curscale,loci,locj,force=False): """Dessin d'une fraction de la matrice pour tous les blocs""" for curblock in self.myblocks.values(): curblock._current.fillonecellgrid(curscale,loci,locj,force)
[docs] def set_current(self, which): """ Change le type de résultat à présenter/traiter :param which: mode de visualisation --> see 'views_2D' for supported values""" if not which in views_2D: logging.error('Unknown view -- Check in views_2D') return for curblock in self.myblocks.values(): curblock.set_current(which) self._which_current_view = which
[docs] def next_result(self): """Lecture du pas suivant""" nb = self.get_nbresults() if self.current_result==-1: self.read_oneresult(-1) else: self.current_result+= self._step_interval self.current_result = min(nb,self.current_result) self.read_oneresult(self.current_result) self.reset_plot()
[docs] def reset_plot(self,whichpal=0): """Reset du dessin""" self.delete_lists() self.get_working_array() self.updatepalette(whichpal)
[docs] def danger_map(self, start:int=0, end:int=-1, every:int=1) -> Union[tuple[WolfArray, WolfArray, WolfArray, WolfArray], tuple[WolfArrayMB, WolfArrayMB, WolfArrayMB, WolfArrayMB]]: """ Create Danger Maps :param start: start time step - 0-based :param end: end time step - 0-based :param every: step interval :return : tuple of WolfArray or WolfArrayMB - H, U_norm, Q_norm, Z """ # Number of time steps number_of_time_steps = self.get_nbresults() if end ==-1: end = number_of_time_steps # Init Danger Maps basde on results type # If only one block --> WolfArray # If only multiple blocks --> WolfArrayMB danger_map_matrix_h = self.as_WolfArray(copyarray=True) danger_map_matrix_v = self.as_WolfArray(copyarray=True) danger_map_matrix_mom = self.as_WolfArray(copyarray=True) danger_map_matrix_z = self.as_WolfArray(copyarray=True) danger = [danger_map_matrix_h, danger_map_matrix_v, danger_map_matrix_mom, danger_map_matrix_z] for curdanger in danger: curdanger.nullvalue = 0. curdanger.reset() curdanger.mask_reset() for time_step in tqdm(range(start, end, every)): self.read_oneresult(time_step+1) if self.nb_blocks>1: for curblock in self.myblocks.keys(): # Get WolfArray wd = self.get_h_for_block(curblock) qx = self.get_qx_for_block(curblock) qy = self.get_qy_for_block(curblock) top = self.get_top_for_block(curblock) ij = np.where(~wd.array.mask) # mom = np.zeros_like(wd.array) v = np.zeros_like(wd.array) z = np.zeros_like(wd.array) mom[ij] = (qx.array[ij]**2.+qy.array[ij]**2.)**.5 v[ij] = mom[ij]/wd.array[ij] z[ij] = wd.array[ij] + top.array[ij] # Comparison for curdanger, curcomp in zip(danger, [wd.array, v, mom, z]): ij = np.where((curdanger.array < curcomp) & (~wd.array.mask)) curdanger.array.data[ij] = curcomp[ij] curdanger.array.mask[ij] = False else: curblock = getkeyblock(0) wd = self.get_h_for_block(curblock) qx = self.get_qx_for_block(curblock) qy = self.get_qy_for_block(curblock) top = self.get_top_for_block(curblock) ij = np.where(~wd.array.mask) # mom = np.zeros_like(wd.array) v = np.zeros_like(wd.array) z = np.zeros_like(wd.array) mom[ij] = (qx.array[ij]**2.+qy.array[ij]**2.)**.5 v[ij] = mom[ij]/wd.array[ij] z[ij] = wd.array[ij] + top.array[ij] # Comparison for curdanger, curcomp in zip(danger, [wd.array, v, mom, z]): ij = np.where((curdanger.array < curcomp) & (~wd.array.mask)) curdanger.array.data[ij] = curcomp[ij] curdanger.array.mask[ij] = False danger_map_matrix_h.mask_lower(self.epsilon) if self.nb_blocks>1: for i in range(self.nb_blocks): danger_map_matrix_v[i].array.mask[:,:] = danger_map_matrix_h[i].array.mask[:,:] danger_map_matrix_mom[i].array.mask[:,:] = danger_map_matrix_h[i].array.mask[:,:] danger_map_matrix_z[i].array.mask[:,:] = danger_map_matrix_h[i].array.mask[:,:] else: danger_map_matrix_v.array.mask[:,:] = danger_map_matrix_h.array.mask[:,:] danger_map_matrix_mom.array.mask[:,:] = danger_map_matrix_h.array.mask[:,:] danger_map_matrix_z.array.mask[:,:] = danger_map_matrix_h.array.mask[:,:] return (danger_map_matrix_h, danger_map_matrix_v, danger_map_matrix_mom, danger_map_matrix_z)
[docs] def danger_map_only_h(self, start:int=0, end:int=-1, every:int=1) -> WolfArray: """ Create Danger Maps :param start: start time step - 0-based :param end: end time step - 0-based :param every: step interval :return : tuple of WolfArray or WolfArrayMB - H, U_norm, Q_norm """ # Number of time steps number_of_time_steps = self.get_nbresults() if end ==-1: end = number_of_time_steps # Init Danger Maps basde on results type # If only one block --> WolfArray # If only multiple blocks --> WolfArrayMB danger_map_matrix_h = self.as_WolfArray(copyarray=True) danger_map_matrix_h.nullvalue = 0. danger_map_matrix_h.reset() danger_map_matrix_h.mask_reset() for time_step in tqdm(range(start, end, every)): self._read_oneresult_only_h(time_step+1) if self.nb_blocks>1: for curblock in self.myblocks.keys(): # Get WolfArray wd = self.get_h_for_block(curblock) # Comparison ij = np.where((danger_map_matrix_h.array < wd.array.data) & (~wd.array.mask)) danger_map_matrix_h.array.data[ij] = wd.array[ij] danger_map_matrix_h.array.mask[ij] = False else: curblock = getkeyblock(0) # Get WolfArray wd = self.get_h_for_block(curblock) # Comparison ij = np.where((danger_map_matrix_h.array < wd.array.data) & (~wd.array.mask)) danger_map_matrix_h.array.data[ij] = wd.array[ij] danger_map_matrix_h.array.mask[ij] = False danger_map_matrix_h.mask_lower(self.epsilon) return danger_map_matrix_h
[docs] def export_as(self, outdir:str, fields:list[views_2D], which:Literal['geotiff', 'shape', 'numpy'], multiband:bool=True): """ Export as geotiff or shapefile or numpy """ oldview = self.get_currentview() old_plotted = self.plotted arrays:dict[views_2D, WolfArray] = {} for curfield in tqdm(fields, _('Concatenating fields')): self.set_currentview(curfield) arrays[curfield] = self.as_WolfArray() arrays[curfield].nullvalue = 0. arrays[curfield].set_nullvalue_in_mask() if which =='geotiff': self.export_as_geotif(outdir, self.idx, [arr for arr in arrays.values()], [key.value for key,arr in arrays.items()], multiband= multiband) elif which=='shape': if views_2D.WATERDEPTH in arrays: shape_mask = arrays[views_2D.WATERDEPTH] else: logging.warning(_('No waterdepth found for shape export')) logging.warning(_('The first field will be used as mask for shape export')) shape_mask = arrays[fields[0]] self.export_as_shape(outdir, self.idx, [arr for arr in arrays.values()], [key.value for key,arr in arrays.items()], shape_mask) elif which=='numpy': for curfield, curarray in arrays.items(): if curfield == views_2D.TOPOGRAPHY: curarray.write_all(Path(outdir) / 'bathymetry.npy') elif curfield == views_2D.WATERDEPTH: curarray.write_all(Path(outdir) / 'h.npy') elif curfield == views_2D.QX: curarray.write_all(Path(outdir) / 'qx.npy') elif curfield == views_2D.QY: curarray.write_all(Path(outdir) / 'qy.npy') else: curarray.write_all(Path(outdir) / (curfield.value + '.npy')) self.plotted = old_plotted self.set_currentview(oldview)
[docs] def export_as_shape(self, outdir:str= '', fn:str = '', myarrays:list[WolfArray]= [], descr:list[str]= [], mask:WolfArray=None): """ Export multiple arrays to shapefile :param outdir: output directory :param fn: filename -- .shp will be added if not present :param myarrays: list of Wolfarrays to export :param descr: list of descriptions :param mask: mask array -- export only where mask > 0 """ if len(myarrays)==0: logging.warning(_('No arrays provided for shapefile export')) return if mask is None: logging.warning(_('No mask provided for shapefile export')) return from osgeo import gdal, osr, gdalconst,ogr # create the spatial reference system, Lambert72 srs = osr.SpatialReference() srs.ImportFromEPSG(31370) # create the data source driver: ogr.Driver driver = ogr.GetDriverByName("ESRI Shapefile") # create the data source filename = join(outdir,fn) if not filename.endswith('.shp'): filename+='.shp' ds = driver.CreateDataSource(filename) # create one layer layer = ds.CreateLayer("results", srs, ogr.wkbPolygon) # Convert Fields new_descr = [] for curfield in descr: if curfield == views_2D.TOPOGRAPHY.value: new_descr.append('TOP[m]') elif curfield == views_2D.WATERDEPTH.value: new_descr.append('WD[m]') elif curfield == views_2D.QX.value: new_descr.append('QX[m2/s]') elif curfield == views_2D.QY.value: new_descr.append('QY[m2/s]') elif curfield == views_2D.UNORM.value: new_descr.append('UN[m/s]') elif curfield == views_2D.FROUDE.value: new_descr.append('FR[-]') elif curfield == views_2D.HEAD.value: new_descr.append('HEAD[m]') elif curfield == views_2D.CRITICAL_DIAMETER_SHIELDS.value: new_descr.append('DSh[m]') elif curfield == views_2D.CRITICAL_DIAMETER_IZBACH.value: new_descr.append('DIz[m]') elif curfield == views_2D.QNORM.value: new_descr.append('QN[m2/s]') elif curfield == views_2D.WATERLEVEL.value: new_descr.append('WL[m]') elif curfield == views_2D.CRITICAL_DIAMETER_SUSPENSION_50.value: new_descr.append('DS50[m]') elif curfield == views_2D.CRITICAL_DIAMETER_SUSPENSION_100.value: new_descr.append('DS100[m]') descr = new_descr # Add ID fields idFields=[] for curlab in descr: idFields.append(ogr.FieldDefn(curlab, ogr.OFTReal)) layer.CreateField(idFields[-1]) # Create the feature and set values featureDefn = layer.GetLayerDefn() feature = ogr.Feature(featureDefn) usednodes = np.argwhere(mask.array>0.) for i,j in tqdm(usednodes): x,y = mask.get_xy_from_ij(i,j) # Creating a line geometry ring = ogr.Geometry(ogr.wkbLinearRing) ring.AddPoint(x-mask.dx/2,y-mask.dy/2) ring.AddPoint(x+mask.dx/2,y-mask.dy/2) ring.AddPoint(x+mask.dx/2,y+mask.dy/2) ring.AddPoint(x-mask.dx/2,y+mask.dy/2) ring.AddPoint(x-mask.dx/2,y-mask.dy/2) # Create polygon poly = ogr.Geometry(ogr.wkbPolygon) poly.AddGeometry(ring) feature.SetGeometry(poly) for arr, id in zip(myarrays,descr): feature.SetField(id, float(arr.array[i,j])) layer.CreateFeature(feature) feature = None # Save and close DataSource ds = None
[docs] def export_as_geotif(self, outdir:str= '', fn:str = '', myarrays:list[WolfArray]= [], descr:list[str]= [], multiband:bool= True): """ Export results as geotiff :param outdir: output directory :param fn: filename -- .tif will be added if not present :param myarrays: list of Wolfarrays to export :param descr: list of descriptions -- Bands names """ if len(myarrays)==0: logging.warning(_('No arrays provided for geotiff export')) return from osgeo import gdal, osr, gdalconst srs = osr.SpatialReference() srs.ImportFromEPSG(31370) driver: gdal.Driver out_ds: gdal.Dataset band: gdal.Band driver = gdal.GetDriverByName("GTiff") if multiband: filename = join(outdir,fn) if not filename.endswith('.tif'): filename+='.tif' arr = myarrays[0] # Check if estimated file size exceeds 4GB estimated_file_size = arr.memory_usage * len(myarrays) if (estimated_file_size > 4 * 1024**3): # 4GB in bytes options = ['COMPRESS=LZW', 'BIGTIFF=YES'] logging.info('BigTIFF format will be used!') else: options = ['COMPRESS=LZW'] out_ds = driver.Create(filename, arr.shape[0], arr.shape[1], len(myarrays), arr.dtype_gdal, options= options) out_ds.SetProjection(srs.ExportToWkt()) out_ds.SetGeoTransform([myarrays[0].origx+myarrays[0].translx, myarrays[0].dx, 0., myarrays[0].origy+myarrays[0].transly, 0., myarrays[0].dy]) k=1 for arr, name in tqdm(zip(myarrays,descr), 'Writing geotiff - bands'): band = out_ds.GetRasterBand(k) band.SetNoDataValue(arr.nullvalue) band.SetDescription(name) band.WriteArray(arr.array.transpose()) band.FlushCache() band.ComputeStatistics(True) k+=1 out_ds = None else: for arr, name in tqdm(zip(myarrays,descr), 'Writing geotiff'): filename = join(outdir,fn) if filename.endswith('.tif'): filename = filename[:-4] filename = filename+'_'+name+'.tif' estimated_file_size = arr.memory_usage if (estimated_file_size > 4 * 1024**3): # 4GB in bytes options = ['COMPRESS=LZW', 'BIGTIFF=YES'] logging.info('BigTIFF format will be used!') else: options = ['COMPRESS=LZW'] out_ds = driver.Create(filename, arr.shape[0], arr.shape[1], 1, arr.dtype_gdal, options= options) out_ds.SetProjection(srs.ExportToWkt()) out_ds.SetGeoTransform([myarrays[0].origx+myarrays[0].translx, myarrays[0].dx, 0., myarrays[0].origy+myarrays[0].transly, 0., myarrays[0].dy]) band = out_ds.GetRasterBand(1) band.SetNoDataValue(arr.nullvalue) band.SetDescription(name) band.WriteArray(arr.array.transpose()) band.FlushCache() band.ComputeStatistics(True) out_ds = None