"""
Companion manager for Results 2D menu and actions (CPU and GPU).
Extracted from PyDraw.WolfMapViewer.
Menu architecture:
- menu_build() : base "Results 2D" menu, shared between CPU and GPU results.
- menu_build_gpu_ext() : extends the same menu with GPU-only items (cache, tiles,
bathymetry); must be called after menu_build().
- on_menu(event) : single dispatcher for all items; GPU-specific branches are
clearly marked with # --- GPU ONLY --- comments.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING
import wx
from .PyTranslate import _
if TYPE_CHECKING:
from .PyDraw import WolfMapViewer
[docs]
class Wolf2DResultsManager:
"""Manages the Results 2D menu and all 2D result actions (CPU + GPU)."""
def __init__(self, viewer: 'WolfMapViewer') -> None:
# ------------------------------------------------------------------
# Menu build — shared base (CPU + GPU compatible)
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# Menu build — GPU extension (adds items to the existing menu)
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# Action dispatcher
# ------------------------------------------------------------------
[docs]
def _on_read_last(self, event: wx.MenuEvent) -> None:
self._viewer.read_last_result()
[docs]
def _on_export_as(self, event: wx.MenuEvent) -> None:
self._viewer.export_results_as()
[docs]
def _on_explore(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_res2d is None:
logging.warning(_('No active 2D result ! -- Please activate a 2D result first'))
return
v._add_sim_explorer(v.active_res2d)
[docs]
def _on_change_view(self, event: wx.MenuEvent) -> None:
from .PyDraw import draw_type
from .wolfresults_2D import views_2D
self._handle_change_current_view(views_2D, draw_type)
[docs]
def _on_set_epsilon(self, event: wx.MenuEvent) -> None:
from .PyDraw import draw_type
from .wolfresults_2D import Wolfresults_2D
self._handle_set_epsilon(draw_type, Wolfresults_2D)
[docs]
def _on_filter_indep(self, event: wx.MenuEvent) -> None:
from .PyDraw import draw_type
from .wolfresults_2D import Wolfresults_2D
self._handle_filter_independent(draw_type, Wolfresults_2D)
[docs]
def _on_video(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_res2d is None:
logging.warning(_('No active 2D result ! - Please load and activate a 2D result first'))
return
v.create_video()
[docs]
def _on_danger_map_h(self, event: wx.MenuEvent) -> None:
self._handle_danger_map_only_h()
[docs]
def _on_danger_map(self, event: wx.MenuEvent) -> None:
self._handle_danger_map(_('Danger map'))
[docs]
def _on_danger_map_tiled(self, event: wx.MenuEvent) -> None:
self._handle_danger_map(_('Danger map tiled'))
[docs]
def _on_danger_map_mp(self, event: wx.MenuEvent) -> None:
self._handle_danger_map(_('Danger map (multiprocess)'))
[docs]
def _on_setup_cache(self, event: wx.MenuEvent) -> None:
self._handle_setup_cache()
[docs]
def _on_clear_cache(self, event: wx.MenuEvent) -> None:
self._handle_clear_cache()
[docs]
def _on_show_tiles(self, event: wx.MenuEvent) -> None:
self._handle_show_tiles()
[docs]
def _on_add_bathy(self, event: wx.MenuEvent) -> None:
self._handle_add_bathy()
# ------------------------------------------------------------------
# Shared handlers
# ------------------------------------------------------------------
[docs]
def _handle_change_current_view(self, views_2D, draw_type) -> None:
from .wolfresults_2D import Wolfresults_2D
v = self._viewer
choices = [cur.value for cur in views_2D]
dlg = wx.SingleChoiceDialog(None, _("Pick a view"), "Choices", choices)
ret = dlg.ShowModal()
if ret == wx.ID_CANCEL:
dlg.Destroy()
return
method = dlg.GetStringSelection()
method = list(views_2D)[choices.index(method)]
dlg.Destroy()
diamsize = None
density = None
if method == views_2D.SHIELDS_NUMBER:
if v.active_res2d is not None:
sediment_diam = v.active_res2d.sediment_diameter
sediment_density = v.active_res2d.sediment_density
elif v.compare_results is not None:
sediment_diam = 0.001
sediment_density = 2.650
else:
logging.warning(_('No active 2D result or comparison !'))
return
dlg = wx.TextEntryDialog(None, _("Diameter grain size [m] ?"), value=str(sediment_diam))
ret = dlg.ShowModal()
if ret == wx.ID_CANCEL:
dlg.Destroy()
return
try:
diamsize = float(dlg.GetValue())
except Exception:
dlg.Destroy()
logging.warning(_("Bad value -- Rety"))
return
dlg = wx.TextEntryDialog(None, _("Density grain [-] ?"), value=str(sediment_density))
ret = dlg.ShowModal()
if ret == wx.ID_CANCEL:
dlg.Destroy()
return
try:
density = float(dlg.GetValue())
except Exception:
dlg.Destroy()
logging.warning(_("Bad value -- Rety"))
return
if len(v.myres2D) > 1:
dlg = wx.MessageDialog(None, _('Apply to all results?'), style=wx.YES_NO)
ret = dlg.ShowModal()
if ret == wx.ID_NO:
if diamsize is not None:
v.active_res2d.sediment_diameter = diamsize
v.active_res2d.sediment_density = density
v.active_res2d.load_default_colormap('shields_cst')
v.active_res2d.set_currentview(method, force_wx=True, force_updatepal=True)
else:
for curarray in v.iterator_over_objects(draw_type.RES2D):
curarray: Wolfresults_2D
if diamsize is not None:
curarray.sediment_diameter = diamsize
curarray.sediment_density = density
curarray.load_default_colormap('shields_cst')
curarray.set_currentview(method, force_wx=True, force_updatepal=True)
else:
if v.active_res2d is not None:
if diamsize is not None:
v.active_res2d.sediment_diameter = diamsize
v.active_res2d.sediment_density = density
v.active_res2d.load_default_colormap('shields_cst')
v.active_res2d.set_currentview(method, force_wx=True, force_updatepal=True)
if v.compare_results is not None:
if diamsize is not None:
v.compare_results.set_shields_param(diamsize, density)
v.compare_results.update_type_result(method)
[docs]
def _handle_set_epsilon(self, draw_type, Wolfresults_2D) -> None:
v = self._viewer
dlg = wx.TextEntryDialog(v, _('Enter an epsilon [m]'), value='0.0')
ret = dlg.ShowModal()
if ret == wx.ID_CANCEL:
dlg.Destroy()
return
try:
neweps = float(dlg.GetValue())
dlg.Destroy()
except Exception:
logging.error(_('Bad value -- retry !'))
dlg.Destroy()
return
pgbar = wx.ProgressDialog(_('Setting epsilon'), _('Setting epsilon'),
maximum=len(v.myres2D), parent=v,
style=wx.PD_APP_MODAL | wx.PD_AUTO_HIDE)
for idx, curmodel in enumerate(v.iterator_over_objects(draw_type.RES2D)):
curmodel: Wolfresults_2D
curmodel.epsilon = neweps
curmodel._epsilon_default = neweps
curmodel.read_oneresult(curmodel.current_result)
curmodel.set_currentview()
pgbar.Update(idx, _('Setting epsilon for result {}'.format(curmodel.idx)))
pgbar.Destroy()
[docs]
def _handle_filter_independent(self, draw_type, Wolfresults_2D) -> None:
v = self._viewer
v.menu_filter_independent.IsChecked = not v.menu_filter_independent.IsChecked
pgbar = wx.ProgressDialog(_('Filtering independent zones'),
_('Filtering independent zones'),
maximum=len(v.myres2D), parent=v,
style=wx.PD_APP_MODAL | wx.PD_AUTO_HIDE)
for idx, curmodel in enumerate(v.iterator_over_objects(draw_type.RES2D)):
curmodel: Wolfresults_2D
curmodel.to_filter_independent = not v.menu_filter_independent.IsChecked
pgbar.Update(idx, _('Filtering independent zones for result {}'.format(curmodel.idx)))
pgbar.Destroy()
[docs]
def _handle_danger_map_only_h(self) -> None:
v = self._viewer
if v.active_res2d is None:
logging.warning(_('No active 2D result !'))
return
with wx.NumberEntryDialog(None, _('Danger map'), _('From step'), _('Danger map'),
1, 1, v.active_res2d.get_nbresults()) as dlg:
if dlg.ShowModal() == wx.ID_CANCEL:
return
start_step = dlg.GetValue()
with wx.NumberEntryDialog(None, _('Danger map'), _('To step'), _('Danger map'),
v.active_res2d.get_nbresults(), start_step,
v.active_res2d.get_nbresults()) as dlg:
if dlg.ShowModal() == wx.ID_CANCEL:
return
end_step = dlg.GetValue()
with wx.NumberEntryDialog(None, _('Danger map'), _('Every'), _('Danger map'),
1, 1, 60) as dlg:
if dlg.ShowModal() == wx.ID_CANCEL:
return
every = dlg.GetValue()
with wx.NumberEntryDialog(None, _('Danger map'),
_('Minimum water depth [mm]'), _('Danger map'),
int(v.active_res2d.epsilon * 1000), 1, 1000) as dlg:
if dlg.ShowModal() == wx.ID_CANCEL:
return
hmin = float(dlg.GetValue()) / 1000.
danger_map = v.active_res2d.danger_map_only_h(start_step - 1, end_step - 1, every, hmin)
with wx.DirDialog(None, _('Choose a directory'), style=wx.DD_DEFAULT_STYLE) as dlg:
if dlg.ShowModal() == wx.ID_CANCEL:
return
outdir = dlg.GetPath()
danger_map.write_all(Path(outdir) / 'danger_h.tif')
[docs]
def _handle_danger_map(self, itemlabel: str) -> None:
from .wolf_array import WolfArray, WolfArrayMB
v = self._viewer
if v.active_res2d is None:
logging.warning(_('No active 2D result !'))
return
# --- GPU ONLY check for tiled mode ---
if itemlabel == _("Danger map tiled"):
from .Results2DGPU import wolfres2DGPU
if not isinstance(v.active_res2d, wolfres2DGPU):
logging.error(_('Tiled danger map is only available for wolfres2DGPU results !'))
return
with wx.NumberEntryDialog(None, _('Danger map'), _('From step'), _('Danger map'),
1, 1, v.active_res2d.get_nbresults()) as dlg:
if dlg.ShowModal() == wx.ID_CANCEL:
return
start_step = dlg.GetValue()
with wx.NumberEntryDialog(None, _('Danger map'), _('To step'), _('Danger map'),
v.active_res2d.get_nbresults(), start_step,
v.active_res2d.get_nbresults()) as dlg:
if dlg.ShowModal() == wx.ID_CANCEL:
return
end_step = dlg.GetValue()
with wx.NumberEntryDialog(None, _('Danger map'), _('Every'), _('Danger map'),
1, 1, 60) as dlg:
if dlg.ShowModal() == wx.ID_CANCEL:
return
every = dlg.GetValue()
with wx.NumberEntryDialog(None, _('Danger map'),
_('Minimum water depth [mm]'), _('Danger map'),
int(v.active_res2d.epsilon * 1000), 1, 1000) as dlg:
if dlg.ShowModal() == wx.ID_CANCEL:
return
hmin = float(dlg.GetValue()) / 1000.
if itemlabel == _("Danger map"):
logging.info(_('Danger map -- Be patient !'))
pgbar = wx.ProgressDialog(_('Danger map'), _('Danger map'),
maximum=end_step - 1, parent=v,
style=wx.PD_APP_MODAL | wx.PD_AUTO_HIDE)
def _callback(idx, msg):
pgbar.Update(idx + 1 - start_step, msg)
danger_maps = v.active_res2d.danger_map(start_step - 1, end_step - 1, every, _callback, hmin)
pgbar.Hide()
pgbar.Destroy()
logging.info(_('Danger map done !'))
elif itemlabel == _("Danger map (multiprocess)"):
logging.info(_('Multiprocess danger map -- Be patient !'))
danger_maps = v.active_res2d.danger_map_multiprocess(start_step - 1, end_step - 1, every, hmin=hmin)
logging.info(_('Multiprocess danger map done !'))
elif itemlabel == _("Danger map tiled"): # --- GPU ONLY ---
from .Results2DGPU import wolfres2DGPU
logging.info(_('Tiled danger map -- Be patient !'))
assert isinstance(v.active_res2d, wolfres2DGPU)
pgbar = wx.ProgressDialog(_('Danger map'), _('Danger map'),
maximum=end_step - 1, parent=v,
style=wx.PD_APP_MODAL | wx.PD_AUTO_HIDE)
def _callback(idx, msg):
pgbar.Update(idx + 1 - start_step, msg)
danger_maps = v.active_res2d.danger_map_gpu_tiled(start_step - 1, end_step - 1, every, _callback, hmin=hmin)
logging.info(_('Tiled danger map done !'))
with wx.DirDialog(None, _('Choose a directory to store results'),
style=wx.DD_DEFAULT_STYLE) as dlg:
if dlg.ShowModal() == wx.ID_CANCEL:
return
outdir = dlg.GetPath()
names = ['danger_h', 'danger_u',
'danger_q', 'danger_Z',
'danger_head',
'danger_toa', 'danger_tom',
'danger_doi', 'danger_toe']
for name, danger_map in zip(names, danger_maps):
if isinstance(danger_map, WolfArrayMB):
name = name + '.bin'
logging.info(_('Saving danger map {}').format(name))
danger_map.write_all(Path(outdir) / name)
elif isinstance(danger_map, WolfArray):
name = name + '.tif'
logging.info(_('Saving danger map {}').format(name))
danger_map.write_all(Path(outdir) / name)
else:
logging.error(_('Bad type for danger map {} -- not saved !').format(name))
# ------------------------------------------------------------------
# GPU-only handlers
# ------------------------------------------------------------------
[docs]
def _handle_setup_cache(self) -> None: # --- GPU ONLY ---
v = self._viewer
if v.active_res2d is None:
logging.warning(_('No active 2D result !'))
return
dlg = wx.MessageDialog(None, _('Cache only water depth results ?'), style=wx.YES_NO)
ret = dlg.ShowModal()
only_h = ret != wx.ID_NO
dlg.Destroy()
dlg = wx.MessageDialog(None, _('Cache all results ?'), style=wx.YES_NO)
ret = dlg.ShowModal()
if ret == wx.ID_NO:
dlg_start = wx.SingleChoiceDialog(
None, _('Choosing the start index'), _('Choices'),
[str(cur) for cur in range(1, v.active_res2d.get_nbresults() + 1)])
ret = dlg_start.ShowModal()
if ret == wx.ID_CANCEL:
dlg_start.Destroy()
dlg.Destroy()
return
start_idx = int(dlg_start.GetStringSelection())
dlg_start.Destroy()
dlg_end = wx.SingleChoiceDialog(
None, _('Choosing the end index'), _('Choices'),
[str(cur) for cur in range(start_idx + 1, v.active_res2d.get_nbresults() + 1)])
ret = dlg_end.ShowModal()
if ret == wx.ID_CANCEL:
dlg_end.Destroy()
dlg.Destroy()
return
end_idx = int(dlg_end.GetStringSelection())
dlg_end.Destroy()
logging.info(_('Caching from {} to {} - Be patient !').format(start_idx, end_idx))
v.active_res2d.setup_cache(start_idx=start_idx - 1, end_idx=end_idx - 1, only_h=only_h)
logging.info(_('Caching done !'))
else:
logging.info(_('Caching all results - Be patient !'))
v.active_res2d.setup_cache(only_h=only_h)
logging.info(_('Caching done !'))
dlg.Destroy()
[docs]
def _handle_clear_cache(self) -> None: # --- GPU ONLY ---
v = self._viewer
if v.active_res2d is None:
logging.warning(_('No active 2D result !'))
return
v.active_res2d.clear_cache()
logging.info(_('Cache cleared !'))
[docs]
def _handle_show_tiles(self) -> None: # --- GPU ONLY ---
v = self._viewer
if v.active_res2d is None:
logging.warning(_('No active 2D result !'))
return
v.active_res2d.show_tiles()
[docs]
def _handle_add_bathy(self) -> None: # --- GPU ONLY ---
from .wolf_array import WolfArray
v = self._viewer
if v.active_res2d is None:
logging.warning(_('No active 2D result !'))
return
bathy = WolfArray(mold=v.active_res2d.get_top_for_block(1))
v.add_object('array', newobj=bathy,
id='bathy_2d_gpu at {:.2f} s'.format(v.active_res2d._reference_bathymetry_time))