Source code for wolfhece._wolf2dresults_manager

"""
Companion manager for Results 2D menu and actions (CPU and GPU).
Extracted from PyDraw.WolfMapViewer.

Menu architecture:
  - menu_build()          : base "Results 2D" menu, shared between CPU and GPU results.
  - menu_build_gpu_ext()  : extends the same menu with GPU-only items (cache, tiles,
                            bathymetry); must be called after menu_build().
  - on_menu(event)        : single dispatcher for all items; GPU-specific branches are
                            clearly marked with  # --- GPU ONLY ---  comments.
"""
from __future__ import annotations

import logging
from pathlib import Path
from typing import TYPE_CHECKING

import wx

from .PyTranslate import _

if TYPE_CHECKING:
    from .PyDraw import WolfMapViewer


[docs] class Wolf2DResultsManager: """Manages the Results 2D menu and all 2D result actions (CPU + GPU).""" def __init__(self, viewer: 'WolfMapViewer') -> None:
[docs] self._viewer = viewer
[docs] self._menu: wx.Menu | None = None
[docs] self.menu2d_cache_setup: wx.MenuItem | None = None
# ------------------------------------------------------------------ # Menu build — shared base (CPU + GPU compatible) # ------------------------------------------------------------------
[docs] def menu_build(self) -> None: """Create the base 'Results 2D' menu, shared between CPU and GPU.""" v = self._viewer if self._menu is not None: return self._menu = wx.Menu() self.menu2d_explore_results = self._menu.Append(wx.ID_ANY, _("Explore time/index results"), _("Open a dialog to explore time/index results")) self._menu.AppendSeparator() self.menu2d_curentview = self._menu.Append(wx.ID_ANY, _("Change current view"), _("Current view")) self.menu2d_lastres = self._menu.Append(wx.ID_ANY, _("Read last result"), _("Current view")) self.menu2d_epsilon = self._menu.Append(wx.ID_ANY, _("Set epsilon water depth"), _("Set the epsilon used in the mask")) self.menu_filter_independent = self._menu.Append(wx.ID_ANY, _("Filter independent"), _("Filter independent"), kind=wx.ITEM_CHECK) self.menu2d_video = self._menu.Append(wx.ID_ANY, _("Create video..."), _("Video/Movie")) self._menu.AppendSeparator() self.menu2d_dangermap = self._menu.Append(wx.ID_ANY, _("Danger map"), _("Compute the danger map")) # --- GPU ONLY (tiled mode) — item always visible, guarded in on_menu --- self.menu2d_dangermap_tiled = self._menu.Append(wx.ID_ANY, _("Danger map tiled"), _("Compute the danger map in tiled mode -- Need wolfgpu >= 1.4.0")) self.menu2d_dangermap_mp = self._menu.Append(wx.ID_ANY, _("Danger map (multiprocess)"), _("Compute the danger map using multiprocessing -- Need to duplicate the model, " "the memory usage can be very high for large model")) self.menu2d_dangermaph = self._menu.Append(wx.ID_ANY, _("Danger map - only h"), _("Compute the danger map - only waterdepth")) self._menu.AppendSeparator() self.menu2d_export_as = self._menu.Append(wx.ID_ANY, _("Export results as..."), _("Export results as Geotif, Shapefile or Numpy arrays")) self._menu.AppendSeparator() # Possible cache entries (GPU) will be added after this separator v.menubar.Append(self._menu, _('Results 2D')) self._menu.Bind(wx.EVT_MENU, self._on_explore, self.menu2d_explore_results) self._menu.Bind(wx.EVT_MENU, self._on_change_view, self.menu2d_curentview) self._menu.Bind(wx.EVT_MENU, self._on_read_last, self.menu2d_lastres) self._menu.Bind(wx.EVT_MENU, self._on_set_epsilon, self.menu2d_epsilon) self._menu.Bind(wx.EVT_MENU, self._on_filter_indep, self.menu_filter_independent) self._menu.Bind(wx.EVT_MENU, self._on_video, self.menu2d_video) self._menu.Bind(wx.EVT_MENU, self._on_danger_map, self.menu2d_dangermap) self._menu.Bind(wx.EVT_MENU, self._on_danger_map_tiled, self.menu2d_dangermap_tiled) self._menu.Bind(wx.EVT_MENU, self._on_danger_map_mp, self.menu2d_dangermap_mp) self._menu.Bind(wx.EVT_MENU, self._on_danger_map_h, self.menu2d_dangermaph) self._menu.Bind(wx.EVT_MENU, self._on_export_as, self.menu2d_export_as)
# ------------------------------------------------------------------ # Menu build — GPU extension (adds items to the existing menu) # ------------------------------------------------------------------
[docs] def menu_build_gpu_ext(self) -> None: """Append GPU-only items to the existing Results 2D menu.""" if self._menu is None: return if self.menu2d_cache_setup is not None: return # already added # --- GPU ONLY --- self.menu2d_cache_setup = self._menu.Append(wx.ID_ANY, _("Setup cache..."), _("Set up cache for 2D GPU model")) self.menu2d_cache_reset = self._menu.Append(wx.ID_ANY, _("Clear cache..."), _("Clear cache for 2D GPU model")) self.menu2d_show_tiles = self._menu.Append(wx.ID_ANY, _("Show tiles..."), _("Show a grid of tiles for 2D GPU model")) self._menu.AppendSeparator() self.menu2d_add_bathy = self._menu.Append(wx.ID_ANY, _("Add bathymetry to the viwer..."), _("Add bathymetry data to current viewer")) self._menu.Bind(wx.EVT_MENU, self._on_setup_cache, self.menu2d_cache_setup) self._menu.Bind(wx.EVT_MENU, self._on_clear_cache, self.menu2d_cache_reset) self._menu.Bind(wx.EVT_MENU, self._on_show_tiles, self.menu2d_show_tiles) self._menu.Bind(wx.EVT_MENU, self._on_add_bathy, self.menu2d_add_bathy)
# ------------------------------------------------------------------ # Action dispatcher # ------------------------------------------------------------------
[docs] def _on_read_last(self, event: wx.MenuEvent) -> None: self._viewer.read_last_result()
[docs] def _on_export_as(self, event: wx.MenuEvent) -> None: self._viewer.export_results_as()
[docs] def _on_explore(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_res2d is None: logging.warning(_('No active 2D result ! -- Please activate a 2D result first')) return v._add_sim_explorer(v.active_res2d)
[docs] def _on_change_view(self, event: wx.MenuEvent) -> None: from .PyDraw import draw_type from .wolfresults_2D import views_2D self._handle_change_current_view(views_2D, draw_type)
[docs] def _on_set_epsilon(self, event: wx.MenuEvent) -> None: from .PyDraw import draw_type from .wolfresults_2D import Wolfresults_2D self._handle_set_epsilon(draw_type, Wolfresults_2D)
[docs] def _on_filter_indep(self, event: wx.MenuEvent) -> None: from .PyDraw import draw_type from .wolfresults_2D import Wolfresults_2D self._handle_filter_independent(draw_type, Wolfresults_2D)
[docs] def _on_video(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_res2d is None: logging.warning(_('No active 2D result ! - Please load and activate a 2D result first')) return v.create_video()
[docs] def _on_danger_map_h(self, event: wx.MenuEvent) -> None: self._handle_danger_map_only_h()
[docs] def _on_danger_map(self, event: wx.MenuEvent) -> None: self._handle_danger_map(_('Danger map'))
[docs] def _on_danger_map_tiled(self, event: wx.MenuEvent) -> None: self._handle_danger_map(_('Danger map tiled'))
[docs] def _on_danger_map_mp(self, event: wx.MenuEvent) -> None: self._handle_danger_map(_('Danger map (multiprocess)'))
[docs] def _on_setup_cache(self, event: wx.MenuEvent) -> None: self._handle_setup_cache()
[docs] def _on_clear_cache(self, event: wx.MenuEvent) -> None: self._handle_clear_cache()
[docs] def _on_show_tiles(self, event: wx.MenuEvent) -> None: self._handle_show_tiles()
[docs] def _on_add_bathy(self, event: wx.MenuEvent) -> None: self._handle_add_bathy()
# ------------------------------------------------------------------ # Shared handlers # ------------------------------------------------------------------
[docs] def _handle_change_current_view(self, views_2D, draw_type) -> None: from .wolfresults_2D import Wolfresults_2D v = self._viewer choices = [cur.value for cur in views_2D] dlg = wx.SingleChoiceDialog(None, _("Pick a view"), "Choices", choices) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: dlg.Destroy() return method = dlg.GetStringSelection() method = list(views_2D)[choices.index(method)] dlg.Destroy() diamsize = None density = None if method == views_2D.SHIELDS_NUMBER: if v.active_res2d is not None: sediment_diam = v.active_res2d.sediment_diameter sediment_density = v.active_res2d.sediment_density elif v.compare_results is not None: sediment_diam = 0.001 sediment_density = 2.650 else: logging.warning(_('No active 2D result or comparison !')) return dlg = wx.TextEntryDialog(None, _("Diameter grain size [m] ?"), value=str(sediment_diam)) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: dlg.Destroy() return try: diamsize = float(dlg.GetValue()) except Exception: dlg.Destroy() logging.warning(_("Bad value -- Rety")) return dlg = wx.TextEntryDialog(None, _("Density grain [-] ?"), value=str(sediment_density)) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: dlg.Destroy() return try: density = float(dlg.GetValue()) except Exception: dlg.Destroy() logging.warning(_("Bad value -- Rety")) return if len(v.myres2D) > 1: dlg = wx.MessageDialog(None, _('Apply to all results?'), style=wx.YES_NO) ret = dlg.ShowModal() if ret == wx.ID_NO: if diamsize is not None: v.active_res2d.sediment_diameter = diamsize v.active_res2d.sediment_density = density v.active_res2d.load_default_colormap('shields_cst') v.active_res2d.set_currentview(method, force_wx=True, force_updatepal=True) else: for curarray in v.iterator_over_objects(draw_type.RES2D): curarray: Wolfresults_2D if diamsize is not None: curarray.sediment_diameter = diamsize curarray.sediment_density = density curarray.load_default_colormap('shields_cst') curarray.set_currentview(method, force_wx=True, force_updatepal=True) else: if v.active_res2d is not None: if diamsize is not None: v.active_res2d.sediment_diameter = diamsize v.active_res2d.sediment_density = density v.active_res2d.load_default_colormap('shields_cst') v.active_res2d.set_currentview(method, force_wx=True, force_updatepal=True) if v.compare_results is not None: if diamsize is not None: v.compare_results.set_shields_param(diamsize, density) v.compare_results.update_type_result(method)
[docs] def _handle_set_epsilon(self, draw_type, Wolfresults_2D) -> None: v = self._viewer dlg = wx.TextEntryDialog(v, _('Enter an epsilon [m]'), value='0.0') ret = dlg.ShowModal() if ret == wx.ID_CANCEL: dlg.Destroy() return try: neweps = float(dlg.GetValue()) dlg.Destroy() except Exception: logging.error(_('Bad value -- retry !')) dlg.Destroy() return pgbar = wx.ProgressDialog(_('Setting epsilon'), _('Setting epsilon'), maximum=len(v.myres2D), parent=v, style=wx.PD_APP_MODAL | wx.PD_AUTO_HIDE) for idx, curmodel in enumerate(v.iterator_over_objects(draw_type.RES2D)): curmodel: Wolfresults_2D curmodel.epsilon = neweps curmodel._epsilon_default = neweps curmodel.read_oneresult(curmodel.current_result) curmodel.set_currentview() pgbar.Update(idx, _('Setting epsilon for result {}'.format(curmodel.idx))) pgbar.Destroy()
[docs] def _handle_filter_independent(self, draw_type, Wolfresults_2D) -> None: v = self._viewer v.menu_filter_independent.IsChecked = not v.menu_filter_independent.IsChecked pgbar = wx.ProgressDialog(_('Filtering independent zones'), _('Filtering independent zones'), maximum=len(v.myres2D), parent=v, style=wx.PD_APP_MODAL | wx.PD_AUTO_HIDE) for idx, curmodel in enumerate(v.iterator_over_objects(draw_type.RES2D)): curmodel: Wolfresults_2D curmodel.to_filter_independent = not v.menu_filter_independent.IsChecked pgbar.Update(idx, _('Filtering independent zones for result {}'.format(curmodel.idx))) pgbar.Destroy()
[docs] def _handle_danger_map_only_h(self) -> None: v = self._viewer if v.active_res2d is None: logging.warning(_('No active 2D result !')) return with wx.NumberEntryDialog(None, _('Danger map'), _('From step'), _('Danger map'), 1, 1, v.active_res2d.get_nbresults()) as dlg: if dlg.ShowModal() == wx.ID_CANCEL: return start_step = dlg.GetValue() with wx.NumberEntryDialog(None, _('Danger map'), _('To step'), _('Danger map'), v.active_res2d.get_nbresults(), start_step, v.active_res2d.get_nbresults()) as dlg: if dlg.ShowModal() == wx.ID_CANCEL: return end_step = dlg.GetValue() with wx.NumberEntryDialog(None, _('Danger map'), _('Every'), _('Danger map'), 1, 1, 60) as dlg: if dlg.ShowModal() == wx.ID_CANCEL: return every = dlg.GetValue() with wx.NumberEntryDialog(None, _('Danger map'), _('Minimum water depth [mm]'), _('Danger map'), int(v.active_res2d.epsilon * 1000), 1, 1000) as dlg: if dlg.ShowModal() == wx.ID_CANCEL: return hmin = float(dlg.GetValue()) / 1000. danger_map = v.active_res2d.danger_map_only_h(start_step - 1, end_step - 1, every, hmin) with wx.DirDialog(None, _('Choose a directory'), style=wx.DD_DEFAULT_STYLE) as dlg: if dlg.ShowModal() == wx.ID_CANCEL: return outdir = dlg.GetPath() danger_map.write_all(Path(outdir) / 'danger_h.tif')
[docs] def _handle_danger_map(self, itemlabel: str) -> None: from .wolf_array import WolfArray, WolfArrayMB v = self._viewer if v.active_res2d is None: logging.warning(_('No active 2D result !')) return # --- GPU ONLY check for tiled mode --- if itemlabel == _("Danger map tiled"): from .Results2DGPU import wolfres2DGPU if not isinstance(v.active_res2d, wolfres2DGPU): logging.error(_('Tiled danger map is only available for wolfres2DGPU results !')) return with wx.NumberEntryDialog(None, _('Danger map'), _('From step'), _('Danger map'), 1, 1, v.active_res2d.get_nbresults()) as dlg: if dlg.ShowModal() == wx.ID_CANCEL: return start_step = dlg.GetValue() with wx.NumberEntryDialog(None, _('Danger map'), _('To step'), _('Danger map'), v.active_res2d.get_nbresults(), start_step, v.active_res2d.get_nbresults()) as dlg: if dlg.ShowModal() == wx.ID_CANCEL: return end_step = dlg.GetValue() with wx.NumberEntryDialog(None, _('Danger map'), _('Every'), _('Danger map'), 1, 1, 60) as dlg: if dlg.ShowModal() == wx.ID_CANCEL: return every = dlg.GetValue() with wx.NumberEntryDialog(None, _('Danger map'), _('Minimum water depth [mm]'), _('Danger map'), int(v.active_res2d.epsilon * 1000), 1, 1000) as dlg: if dlg.ShowModal() == wx.ID_CANCEL: return hmin = float(dlg.GetValue()) / 1000. if itemlabel == _("Danger map"): logging.info(_('Danger map -- Be patient !')) pgbar = wx.ProgressDialog(_('Danger map'), _('Danger map'), maximum=end_step - 1, parent=v, style=wx.PD_APP_MODAL | wx.PD_AUTO_HIDE) def _callback(idx, msg): pgbar.Update(idx + 1 - start_step, msg) danger_maps = v.active_res2d.danger_map(start_step - 1, end_step - 1, every, _callback, hmin) pgbar.Hide() pgbar.Destroy() logging.info(_('Danger map done !')) elif itemlabel == _("Danger map (multiprocess)"): logging.info(_('Multiprocess danger map -- Be patient !')) danger_maps = v.active_res2d.danger_map_multiprocess(start_step - 1, end_step - 1, every, hmin=hmin) logging.info(_('Multiprocess danger map done !')) elif itemlabel == _("Danger map tiled"): # --- GPU ONLY --- from .Results2DGPU import wolfres2DGPU logging.info(_('Tiled danger map -- Be patient !')) assert isinstance(v.active_res2d, wolfres2DGPU) pgbar = wx.ProgressDialog(_('Danger map'), _('Danger map'), maximum=end_step - 1, parent=v, style=wx.PD_APP_MODAL | wx.PD_AUTO_HIDE) def _callback(idx, msg): pgbar.Update(idx + 1 - start_step, msg) danger_maps = v.active_res2d.danger_map_gpu_tiled(start_step - 1, end_step - 1, every, _callback, hmin=hmin) logging.info(_('Tiled danger map done !')) with wx.DirDialog(None, _('Choose a directory to store results'), style=wx.DD_DEFAULT_STYLE) as dlg: if dlg.ShowModal() == wx.ID_CANCEL: return outdir = dlg.GetPath() names = ['danger_h', 'danger_u', 'danger_q', 'danger_Z', 'danger_head', 'danger_toa', 'danger_tom', 'danger_doi', 'danger_toe'] for name, danger_map in zip(names, danger_maps): if isinstance(danger_map, WolfArrayMB): name = name + '.bin' logging.info(_('Saving danger map {}').format(name)) danger_map.write_all(Path(outdir) / name) elif isinstance(danger_map, WolfArray): name = name + '.tif' logging.info(_('Saving danger map {}').format(name)) danger_map.write_all(Path(outdir) / name) else: logging.error(_('Bad type for danger map {} -- not saved !').format(name))
# ------------------------------------------------------------------ # GPU-only handlers # ------------------------------------------------------------------
[docs] def _handle_setup_cache(self) -> None: # --- GPU ONLY --- v = self._viewer if v.active_res2d is None: logging.warning(_('No active 2D result !')) return dlg = wx.MessageDialog(None, _('Cache only water depth results ?'), style=wx.YES_NO) ret = dlg.ShowModal() only_h = ret != wx.ID_NO dlg.Destroy() dlg = wx.MessageDialog(None, _('Cache all results ?'), style=wx.YES_NO) ret = dlg.ShowModal() if ret == wx.ID_NO: dlg_start = wx.SingleChoiceDialog( None, _('Choosing the start index'), _('Choices'), [str(cur) for cur in range(1, v.active_res2d.get_nbresults() + 1)]) ret = dlg_start.ShowModal() if ret == wx.ID_CANCEL: dlg_start.Destroy() dlg.Destroy() return start_idx = int(dlg_start.GetStringSelection()) dlg_start.Destroy() dlg_end = wx.SingleChoiceDialog( None, _('Choosing the end index'), _('Choices'), [str(cur) for cur in range(start_idx + 1, v.active_res2d.get_nbresults() + 1)]) ret = dlg_end.ShowModal() if ret == wx.ID_CANCEL: dlg_end.Destroy() dlg.Destroy() return end_idx = int(dlg_end.GetStringSelection()) dlg_end.Destroy() logging.info(_('Caching from {} to {} - Be patient !').format(start_idx, end_idx)) v.active_res2d.setup_cache(start_idx=start_idx - 1, end_idx=end_idx - 1, only_h=only_h) logging.info(_('Caching done !')) else: logging.info(_('Caching all results - Be patient !')) v.active_res2d.setup_cache(only_h=only_h) logging.info(_('Caching done !')) dlg.Destroy()
[docs] def _handle_clear_cache(self) -> None: # --- GPU ONLY --- v = self._viewer if v.active_res2d is None: logging.warning(_('No active 2D result !')) return v.active_res2d.clear_cache() logging.info(_('Cache cleared !'))
[docs] def _handle_show_tiles(self) -> None: # --- GPU ONLY --- v = self._viewer if v.active_res2d is None: logging.warning(_('No active 2D result !')) return v.active_res2d.show_tiles()
[docs] def _handle_add_bathy(self) -> None: # --- GPU ONLY --- from .wolf_array import WolfArray v = self._viewer if v.active_res2d is None: logging.warning(_('No active 2D result !')) return bathy = WolfArray(mold=v.active_res2d.get_top_for_block(1)) v.add_object('array', newobj=bathy, id='bathy_2d_gpu at {:.2f} s'.format(v.active_res2d._reference_bathymetry_time))