Source code for wolfhece._analyze_manager

"""
Companion manager for the Analyze menu and all analysis actions.
Extracted from PyDraw.WolfMapViewer.

Routing architecture:
  - The Analyze menu and all its submenus are directly bound to on_menu()
    via menu.Bind(wx.EVT_MENU, self.on_menu) at the end of menu_build().
  - No label-string dispatch through OnMenubar is needed.

GPU-specific items:
  - "Compare checked simulations..."   : filters to wolfres2DGPU only  # --- GPU ONLY ---
  - "Compare all simulations in dir...": same                           # --- GPU ONLY ---
  All other items are shared (CPU + GPU results).
"""
from __future__ import annotations

import logging
from pathlib import Path
from typing import TYPE_CHECKING

import wx

from .PyTranslate import _

if TYPE_CHECKING:
    from .PyDraw import WolfMapViewer


[docs] class AnalyzeManager: """Manages the Analyze menu and all analysis actions (CPU + GPU).""" def __init__(self, viewer: 'WolfMapViewer') -> None:
[docs] self._viewer = viewer
[docs] self.analyzemenu: wx.Menu | None = None
# ------------------------------------------------------------------ # Menu build # ------------------------------------------------------------------
[docs] def menu_build(self) -> None: """Build the Analyze menu and append it to the menubar.""" if self.analyzemenu is not None: return v = self._viewer self.analyzemenu = wx.Menu() self.analyzeplot = wx.Menu() self.analyzeexport = wx.Menu() self.analyzeinpaint = wx.Menu() self.analyzesimsheet = wx.Menu() self.analyzeplot_plot_vector = self.analyzeplot.Append(wx.ID_ANY, _("Plot active vector..."), _("Plot the active vector and linked arrays")) self.analyzeplot_plot_vector_dynamic = self.analyzeplot.Append(wx.ID_ANY, _("Plot active vector (dynamic)..."), _("Plot the active vector and linked arrays - Track the position's changes of the active vector and update the plot accordingly")) self.analyzeplot_plot_polygons = self.analyzeplot.Append(wx.ID_ANY, _("Plot active polygons..."), _("Plot the active polygons and linked arrays")) self.analyzeplot.AppendSeparator() self.analyzemenu.Append(wx.ID_ANY, _('Plot...'), self.analyzeplot) self.analyzemenu.Append(wx.ID_ANY, _('Export...'), self.analyzeexport) self.analyzemenu.Append(wx.ID_ANY, _('Inpaint...'), self.analyzeinpaint) self.analyzemenu.AppendSeparator() self.analyzemenu.Append(wx.ID_ANY, _('Report...'), self.analyzesimsheet) self.analyzesimsheet_active = self.analyzesimsheet.Append(wx.ID_ANY, _("Active simulation..."), _("Generate a summary PDF report for the active simulation")) self.analyzesimsheet_checked = self.analyzesimsheet.Append(wx.ID_ANY, _("All checked simulations..."), _("Generate a summary PDF report for all checked simulations")) self.analyzesimsheet.AppendSeparator() self.analyzesimsheet_disk = self.analyzesimsheet.Append(wx.ID_ANY, _("One simulation from disk..."), _("Generate a summary PDF report for one simulation")) self.analyzesimsheet_directory = self.analyzesimsheet.Append(wx.ID_ANY, _("All simulations in directory..."), _("Generate a summary PDF report for all simulations in the current directory")) self.analyzesimsheet.AppendSeparator() self.analyzesimsheet_compare_checked = self.analyzesimsheet.Append(wx.ID_ANY, _("Compare checked simulations..."), _("Generate a summary PDF report for all the loaded simulations")) self.analyzesimsheet_compare_all_dir = self.analyzesimsheet.Append(wx.ID_ANY, _("Compare all simulations in a directory..."), _("Generate a summary PDF report for all the simulations in a directory")) self.analyzesimsheet.AppendSeparator() self.analyzesimsheet_compare_arrays = self.analyzesimsheet.Append(wx.ID_ANY, _("Compare arrays..."), _("Generate a summary PDF report for two loaded arrays")) self.analyzesimsheet_compare_arrays_files = self.analyzesimsheet.Append(wx.ID_ANY, _("Compare arrays from files..."), _("Generate a summary PDF report for two arrays from files")) self.analyzeinpaint_inpaint_array = self.analyzeinpaint.Append(wx.ID_ANY, _("Inpaint active array..."), _("Inpaint active array")) self.analyzeinpaint_inpaint_wl = self.analyzeinpaint.Append(wx.ID_ANY, _("Inpaint waterlevel..."), _("Inpaint a waterlevel result array based on sepcified dem and dtm data")) self.analyzeinpaint_inpaint_mask = self.analyzeinpaint.Append(wx.ID_ANY, _("Inpaint array with mask..."), _("Inpaint an array based on sepcified mask and test data")) self.analyzemenu.AppendSeparator() self.analyzemenu_load_mask = self.analyzemenu.Append(wx.ID_ANY, _("Load and apply mask (nap)..."), _("Apply mask from sim2D")) self.analyzemenu_filter_inundation = self.analyzemenu.Append(wx.ID_ANY, _("Filter inundation arrays..."), _("Filter arrays")) # Plot hydrographs self.analyzeplot_q_vector = self.analyzeplot.Append(wx.ID_ANY, _("Plot integrated Q along active vector..."), _("Integrate Q along the active vector and plot")) self.analyzeplot_q_zone = self.analyzeplot.Append(wx.ID_ANY, _("Plot integrated Q along active zone..."), _("Integrate Q along the active zone and plot")) self.analyzeplot.AppendSeparator() self.analyzeexport_q_vector = self.analyzeexport.Append(wx.ID_ANY, _("Export integrated Q along active vector..."), _("Integrate Q along the active vector and export")) self.analyzeexport_q_zone = self.analyzeexport.Append(wx.ID_ANY, _("Export integrated Q along all vectors in active zone..."), _("Integrate Q along ALL VECTORS of the active zone and export")) self.analyzeexport.AppendSeparator() self.analyzeplot_stats_selected = self.analyzeplot.Append(wx.ID_ANY, _("Plot stats unknown (selected nodes)..."), _("Compute stats and plot on the selected nodes")) self.analyzeplot_stats_vector = self.analyzeplot.Append(wx.ID_ANY, _("Plot stats unknown (inside active vector)..."), _("Compute stats and plot on nodes inside the active vector")) self.analyzeplot_stats_zone = self.analyzeplot.Append(wx.ID_ANY, _("Plot stats unknown (inside active zone)..."), _("Compute stats and plot on nodes inside the active zone")) self.analyzeexport_stats_selected = self.analyzeexport.Append(wx.ID_ANY, _("Export stats unknown (selected nodes)..."), _("Compute stats and export on the selected nodes")) self.analyzeexport_stats_vector = self.analyzeexport.Append(wx.ID_ANY, _("Export stats unknown (inside active vector)..."), _("Compute stats and export on nodes inside the active vector")) self.analyzeexport_stats_zone = self.analyzeexport.Append(wx.ID_ANY, _("Export stats unknown (inside active zone)..."), _("Compute stats and export on nodes inside the active zone")) v.menubar.Append(self.analyzemenu, _('&Analyze')) # Per-item direct bindings self.analyzeplot.Bind(wx.EVT_MENU, self._on_plot_vector, self.analyzeplot_plot_vector) self.analyzeplot.Bind(wx.EVT_MENU, self._on_plot_vector_dynamic, self.analyzeplot_plot_vector_dynamic) self.analyzeplot.Bind(wx.EVT_MENU, self._on_plot_polygons, self.analyzeplot_plot_polygons) self.analyzeplot.Bind(wx.EVT_MENU, self._on_plot_q_vector, self.analyzeplot_q_vector) self.analyzeplot.Bind(wx.EVT_MENU, self._on_plot_q_zone, self.analyzeplot_q_zone) self.analyzeplot.Bind(wx.EVT_MENU, self._on_plot_stats_selected, self.analyzeplot_stats_selected) self.analyzeplot.Bind(wx.EVT_MENU, self._on_plot_stats_vector, self.analyzeplot_stats_vector) self.analyzeplot.Bind(wx.EVT_MENU, self._on_plot_stats_zone, self.analyzeplot_stats_zone) self.analyzeexport.Bind(wx.EVT_MENU, self._on_export_q_vector, self.analyzeexport_q_vector) self.analyzeexport.Bind(wx.EVT_MENU, self._on_export_q_zone, self.analyzeexport_q_zone) self.analyzeexport.Bind(wx.EVT_MENU, self._on_export_stats_selected, self.analyzeexport_stats_selected) self.analyzeexport.Bind(wx.EVT_MENU, self._on_export_stats_vector, self.analyzeexport_stats_vector) self.analyzeexport.Bind(wx.EVT_MENU, self._on_export_stats_zone, self.analyzeexport_stats_zone) self.analyzeinpaint.Bind(wx.EVT_MENU, self._on_inpaint_array, self.analyzeinpaint_inpaint_array) self.analyzeinpaint.Bind(wx.EVT_MENU, self._on_inpaint_wl, self.analyzeinpaint_inpaint_wl) self.analyzeinpaint.Bind(wx.EVT_MENU, self._on_inpaint_mask, self.analyzeinpaint_inpaint_mask) self.analyzemenu.Bind(wx.EVT_MENU, self._on_load_mask, self.analyzemenu_load_mask) self.analyzemenu.Bind(wx.EVT_MENU, self._on_filter_inundation, self.analyzemenu_filter_inundation) self.analyzesimsheet.Bind(wx.EVT_MENU, self._on_report_active, self.analyzesimsheet_active) self.analyzesimsheet.Bind(wx.EVT_MENU, self._on_report_checked, self.analyzesimsheet_checked) self.analyzesimsheet.Bind(wx.EVT_MENU, self._on_report_disk, self.analyzesimsheet_disk) self.analyzesimsheet.Bind(wx.EVT_MENU, self._on_report_directory, self.analyzesimsheet_directory) self.analyzesimsheet.Bind(wx.EVT_MENU, self._on_compare_checked, self.analyzesimsheet_compare_checked) self.analyzesimsheet.Bind(wx.EVT_MENU, self._on_compare_all_dir, self.analyzesimsheet_compare_all_dir) self.analyzesimsheet.Bind(wx.EVT_MENU, self._on_compare_arrays, self.analyzesimsheet_compare_arrays) self.analyzesimsheet.Bind(wx.EVT_MENU, self._on_compare_arrays_files, self.analyzesimsheet_compare_arrays_files)
# ------------------------------------------------------------------ # Action dispatcher # ------------------------------------------------------------------
[docs] def _on_plot_q_vector(self, event: wx.MenuEvent) -> None: self._handle_plot_q_vector()
[docs] def _on_plot_q_zone(self, event: wx.MenuEvent) -> None: self._handle_plot_q_zone()
[docs] def _on_export_q_vector(self, event: wx.MenuEvent) -> None: self._handle_export_q_vector()
[docs] def _on_export_q_zone(self, event: wx.MenuEvent) -> None: self._handle_export_q_zone()
[docs] def _on_plot_stats_selected(self, event: wx.MenuEvent) -> None: self._handle_plot_stats('selected')
[docs] def _on_export_stats_selected(self, event: wx.MenuEvent) -> None: self._handle_export_stats('selected')
[docs] def _on_plot_stats_vector(self, event: wx.MenuEvent) -> None: self._handle_plot_stats('vector')
[docs] def _on_export_stats_vector(self, event: wx.MenuEvent) -> None: self._handle_export_stats('vector')
[docs] def _on_plot_stats_zone(self, event: wx.MenuEvent) -> None: self._handle_plot_stats('zone')
[docs] def _on_export_stats_zone(self, event: wx.MenuEvent) -> None: self._handle_export_stats('zone')
[docs] def _on_plot_vector(self, event: wx.MenuEvent) -> None: self._handle_plot_vector(dynamic=False)
[docs] def _on_plot_vector_dynamic(self, event: wx.MenuEvent) -> None: self._handle_plot_vector(dynamic=True)
[docs] def _on_plot_polygons(self, event: wx.MenuEvent) -> None: self._handle_plot_polygons()
[docs] def _on_load_mask(self, event: wx.MenuEvent) -> None: self._viewer.loadnap_and_apply()
[docs] def _on_filter_inundation(self, event: wx.MenuEvent) -> None: self._viewer.filter_inundation()
[docs] def _on_report_active(self, event: wx.MenuEvent) -> None: self._handle_report_active()
[docs] def _on_report_checked(self, event: wx.MenuEvent) -> None: self._handle_report_checked()
[docs] def _on_report_disk(self, event: wx.MenuEvent) -> None: self._handle_report_disk()
[docs] def _on_report_directory(self, event: wx.MenuEvent) -> None: self._handle_report_directory()
[docs] def _on_compare_arrays(self, event: wx.MenuEvent) -> None: self._viewer._compare_arrays()
[docs] def _on_compare_arrays_files(self, event: wx.MenuEvent) -> None: self._handle_compare_arrays_files()
[docs] def _on_compare_checked(self, event: wx.MenuEvent) -> None: self._handle_compare_checked()
[docs] def _on_compare_all_dir(self, event: wx.MenuEvent) -> None: self._handle_compare_all_dir()
[docs] def _on_inpaint_array(self, event: wx.MenuEvent) -> None: self._handle_inpaint_array()
[docs] def _on_inpaint_wl(self, event: wx.MenuEvent) -> None: self._handle_inpaint_waterlevel()
[docs] def _on_inpaint_mask(self, event: wx.MenuEvent) -> None: self._handle_inpaint_with_mask()
# ------------------------------------------------------------------ # Q-along-vector / zone handlers # ------------------------------------------------------------------
[docs] def _handle_plot_q_vector(self) -> None: v = self._viewer if v.active_vector is None: logging.warning(_('No active vector !')) return if v.active_vector.closed: logging.error(_('The active vector is closed ! - You can only plot Q along a cross section not a polygon !')) return if v.active_res2d is None: logging.warning(_('No active 2D result !')) return fig = v.new_fig(_('Q along active vector'), 'Q_along_active_vector', show=False, size=(800, 600)) v.active_res2d.plot_q_wx(v.active_vector, 'border', toshow=True, fig=fig)
[docs] def _handle_plot_q_zone(self) -> None: v = self._viewer if v.active_zone is None: logging.warning(_('No active zone !')) return if v.active_res2d is None: logging.warning(_('No active 2D result !')) return fig = v.new_fig(_('Q along active zone'), 'Q_along_active_zone', show=False, size=(800, 600)) v.active_res2d.plot_q_wx(v.active_zone.myvectors, ['border'] * v.active_zone.nbvectors, toshow=True, fig=fig)
[docs] def _handle_export_q_vector(self) -> None: v = self._viewer if v.active_vector is None: logging.warning(_('No active vector !')) return if v.active_res2d is None: logging.warning(_('No active 2D result !')) return filterArray = ".csv (*.csv)|*.csv|all (*.*)|*.*" fdlg = wx.FileDialog(v, "Choose file name : ", wildcard=filterArray, style=wx.FD_SAVE) ret = fdlg.ShowModal() hydrographCSVPath = None if ret == wx.ID_OK: hydrographCSVPath = fdlg.GetPath() fdlg.Destroy() if hydrographCSVPath is not None: progress_dialog = wx.ProgressDialog( _("Export Progress"), _("Exporting hydrographs..."), maximum=100, parent=v, style=wx.PD_AUTO_HIDE | wx.PD_APP_MODAL | wx.PD_ELAPSED_TIME) def update_progress(progress): progress_dialog.Update(progress) try: status = v.active_res2d.export_hydrographs( vect=v.active_vector, filename=hydrographCSVPath, progress_callback=update_progress) finally: progress_dialog.Destroy() if status: wx.MessageBox(_("Hydrographs exported successfully"), _("Export Hydrographs"), wx.OK | wx.ICON_INFORMATION) logging.info(_('Hydrographs exported successfully')) else: wx.MessageBox(_("Error exporting hydrographs"), _("Export Hydrographs"), wx.OK | wx.ICON_ERROR) logging.error(_('Error exporting hydrographs'))
[docs] def _handle_export_q_zone(self) -> None: v = self._viewer if v.active_zone is None: logging.warning(_('No active zone !')) return if v.active_res2d is None: logging.warning(_('No active 2D result !')) return filterArray = ".csv (*.csv)|*.csv|all (*.*)|*.*" fdlg = wx.FileDialog(v, "Choose file name : ", wildcard=filterArray, style=wx.FD_SAVE) ret = fdlg.ShowModal() hydrographCSVPath = None if ret == wx.ID_OK: hydrographCSVPath = fdlg.GetPath() fdlg.Destroy() if hydrographCSVPath is not None: progress_dialog = wx.ProgressDialog( _("Export Progress"), _("Exporting hydrographs..."), maximum=100, parent=v, style=wx.PD_AUTO_HIDE | wx.PD_APP_MODAL | wx.PD_ELAPSED_TIME) def update_progress(progress): progress_dialog.Update(progress) try: status = v.active_res2d.export_hydrographs( vect=v.active_zone, filename=hydrographCSVPath, progress_callback=update_progress) finally: progress_dialog.Destroy() if status: wx.MessageBox(_("Hydrographs exported successfully"), _("Export Hydrographs"), wx.OK | wx.ICON_INFORMATION) logging.info(_('Hydrographs exported successfully')) else: wx.MessageBox(_("Error exporting hydrographs"), _("Export Hydrographs"), wx.OK | wx.ICON_ERROR) logging.error(_('Error exporting hydrographs'))
# ------------------------------------------------------------------ # Stats plot/export handlers # ------------------------------------------------------------------
[docs] def _handle_plot_stats(self, scope: str) -> None: """Plot stats for scope in {'selected', 'vector', 'zone'}.""" from ._sim_panels import Select_Begin_end_interval_step from .wolfresults_2D import Extractable_results v = self._viewer if v.active_res2d is None: logging.warning(_('No active 2D result !')) return if scope == 'selected': all_selected = [] for curblock in v.active_res2d.myblocks.values(): if curblock.SelectionData.nb > 0: all_selected += curblock.SelectionData.myselection if len(all_selected) == 0: logging.warning(_('No selected nodes - Nothing to do !')) return scope_obj = all_selected fig_label = _('Series of {} - {} (nodes)') elif scope == 'vector': if v.active_vector is None: logging.warning(_('No active vector !')) return scope_obj = v.active_vector fig_label = _('Series of {} - {} (polygon)') else: # zone if v.active_zone is None: logging.warning(_('No active zone !')) return scope_obj = v.active_zone fig_label = _('Series of {} - {} (zone)') keys = Extractable_results.get_list() dlg = wx.SingleChoiceDialog(None, _('Choose the unknown/variable to plot'), _('Unknown'), keys) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: logging.info(_('No unknown chosen - Aborting !')) dlg.Destroy() return which = Extractable_results.get_from_key(dlg.GetStringSelection()) dlg.Destroy() try: choice_bes = Select_Begin_end_interval_step(v, _('Choose the interval and step'), v.active_res2d, checkbox=True) choice_bes.ShowModal() begin = choice_bes.begin end = choice_bes.end interval = choice_bes.step violin = getattr(choice_bes, 'check_violin', False) finally: choice_bes.Destroy() if begin == -1: logging.info(_('No interval chosen - Aborting !')) return newfig = v.new_fig(fig_label.format(which.value[0], v.active_res2d.idx), 'series_' + v.active_res2d.idx, show=False, size=(800, 600)) if scope == 'vector' and violin: v.active_res2d.plot_violin_values(scope_obj, which, toshow=False, figax=newfig, for_steps=(begin - 1, end - 1, interval)) else: v.active_res2d.plot_some_values(scope_obj, which, toshow=False, figax=newfig, for_steps=(begin - 1, end - 1, interval)) newfig.Show()
[docs] def _handle_export_stats(self, scope: str) -> None: """Export stats for scope in {'selected', 'vector', 'zone'}.""" from ._sim_panels import Select_Begin_end_interval_step from .wolfresults_2D import Extractable_results v = self._viewer if v.active_res2d is None: logging.warning(_('No active 2D result !')) return if scope == 'selected': all_selected = [] for curblock in v.active_res2d.myblocks.values(): if curblock.SelectionData.nb > 0: all_selected += curblock.SelectionData.myselection if len(all_selected) == 0: logging.warning(_('No selected nodes - Nothing to do !')) return scope_obj = all_selected elif scope == 'vector': if v.active_vector is None: logging.warning(_('No active vector !')) return scope_obj = v.active_vector else: # zone if v.active_zone is None: logging.warning(_('No active zone !')) return scope_obj = v.active_zone keys = Extractable_results.get_list() dlg = wx.SingleChoiceDialog(None, _('Choose the unknown/variable to plot'), _('Unknown'), keys) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: logging.info(_('No unknown chosen - Aborting !')) dlg.Destroy() return which = Extractable_results.get_from_key(dlg.GetStringSelection()) dlg.Destroy() if scope == 'zone': dlg = wx.DirDialog(None, _('Choose the directory where to export'), style=wx.DD_DEFAULT_STYLE) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: logging.info(_('No file chosen - Aborting !')) dlg.Destroy() return directory = Path(dlg.GetPath()) dlg.Destroy() else: dlg = wx.FileDialog(None, _('Choose the file to export'), wildcard='csv (*.csv)|*.csv', style=wx.FD_SAVE | wx.FD_OVERWRITE_PROMPT) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: logging.info(_('No file chosen - Aborting !')) dlg.Destroy() return filename = Path(dlg.GetPath()) dlg.Destroy() try: choice_bes = Select_Begin_end_interval_step(v, _('Choose the interval and step'), v.active_res2d, checkbox=True) choice_bes.ShowModal() begin = choice_bes.begin end = choice_bes.end interval = choice_bes.step all_values = getattr(choice_bes, 'check_all', False) finally: choice_bes.Destroy() if begin == -1: logging.info(_('No interval chosen - Aborting !')) return if scope == 'zone': allnames = [curvect.myname for curvect in v.active_zone.myvectors] if len(set(allnames)) != len(allnames): logging.warning(_('Some vectors have the same name !')) unique_name = [] for curvect in v.active_zone.myvectors: if curvect.myname in unique_name: unique_name.append(curvect.myname + '_' + str(unique_name.count(curvect.myname))) else: unique_name.append(curvect.myname) else: unique_name = allnames for curvect, name in zip(v.active_zone.myvectors, unique_name): v.active_res2d.export_some_values_to_csv(curvect, which, filename=directory / name, for_steps=(begin - 1, end - 1, interval), all_values=all_values) else: ret = v.active_res2d.export_some_values_to_csv(scope_obj, which, filename, for_steps=(begin - 1, end - 1, interval), all_values=all_values) if not ret: logging.error(_('Error in exporting values !'))
# ------------------------------------------------------------------ # Vector / polygon plot handlers # ------------------------------------------------------------------
[docs] def _handle_plot_vector(self, dynamic: bool) -> None: v = self._viewer if v.active_vector is None: logging.warning(_('No active vector !')) return add_cloud = False proxval = 5.0 tolval = 0.5 if v.active_cloud is not None: dlg = wx.MessageDialog(v, _('Do you want to plot the cloud ?'), style=wx.YES_NO) if dlg.ShowModal() == wx.ID_YES: add_cloud = True prox = wx.TextEntryDialog(None, _('Proximity [m] ?'), value='5.0') ret = prox.ShowModal() if ret == wx.ID_CANCEL: prox.Destroy() return try: proxval = float(prox.GetValue()) except Exception: prox.Destroy() logging.warning(_('Bad value -- Rety')) return prox.Destroy() tol = wx.TextEntryDialog(None, _('Tolerance [m] ?'), value='0.5') ret = tol.ShowModal() if ret == wx.ID_CANCEL: tol.Destroy() return try: tolval = float(tol.GetValue()) except Exception: tol.Destroy() logging.warning(_('Bad value -- Rety')) return tol.Destroy() dlg.Destroy() lab = _('Plot of active vector') + ' - ' + v.active_vector.myname figmpl = v.new_fig(lab, lab, show=False, size=(800, 600)) v.active_fig = figmpl linkedarrays = v.get_linked_arrays() with wx.MultiChoiceDialog(None, _('Choose the arrays to plot'), _('Arrays'), [curarray for curarray in list(linkedarrays.keys())]) as dlg: dlg.SetSelections(range(len(linkedarrays))) if dlg.ShowModal() == wx.ID_CANCEL: dlg.Destroy() return selected = dlg.GetSelections() keys = list(linkedarrays.keys()) selected = [keys[cur] for cur in selected] dlg.Destroy() linkedarrays = {curkey: curval for curkey, curval in linkedarrays.items() if curkey in selected} v.active_vector.plot_linked_wx(figmpl, linkedarrays) if add_cloud: s, z = v.active_cloud.projectontrace(v.active_vector, return_cloud=False, proximity=proxval) figmpl.plot(s, z, c='black', s=1.0, marker='x') for curs, curz in zip(s, z): figmpl.plot([curs, curs], [curz - tolval, curz + tolval], 'k--', linewidth=0.5) figmpl.plot([curs - .1, curs + .1], [curz + tolval, curz + tolval], c='black', linewidth=0.5) figmpl.plot([curs - .1, curs + .1], [curz - tolval, curz - tolval], c='black', linewidth=0.5) v.active_fig_options = { 'linkedarrays': linkedarrays, 'add_cloud': add_cloud, 'cloud': v.active_cloud, 'vector': v.active_vector, } figmpl.Show()
[docs] def _handle_plot_polygons(self) -> None: from .PyDraw import draw_type from .wolfresults_2D import Wolfresults_2D from .wolf_array import WolfArray v = self._viewer if v.active_zone is None: logging.warning(_('No active zone ! -- please select a zone containing polygons !')) return try: plotzone = [] zonename = v.active_zone.myname if '_left_' in zonename or '_right_' in zonename: logging.info(_('Left and Right polygons are detected')) testname = zonename.replace('_left_', '') testname = testname.replace('_right_', '') for curzone in v.active_zones.myzones: if testname == curzone.myname.replace('_left_', '').replace('_right_', ''): plotzone.append(curzone) msg = wx.MessageDialog(v, _('Left and Right polygons are detected \nDo you want like to plot left and right polygons on the same plot ?'), style=wx.YES_NO | wx.YES_DEFAULT) ret = msg.ShowModal() msg.Destroy() if ret == wx.ID_NO: plotzone = [v.active_zone] else: logging.info(_('Sole polygon detected')) plotzone = [v.active_zone] figmpl = v.new_fig(_('Plot of active polygons'), 'plot_active_polygons', show=False, size=(800, 600)) linkedarrays = {} for curarray in v.iterator_over_objects(draw_type.ARRAYS): curarray: WolfArray logging.info(_('Plotting array {}').format(curarray.idx)) linkedarrays[curarray.idx] = curarray for curarray in v.iterator_over_objects(draw_type.RES2D): curarray: Wolfresults_2D logging.info(_('Plotting results {}').format(curarray.idx)) linkedarrays[curarray.idx] = curarray linkedvecs = {} from .PyVertex_Manager import Zones for curvect in v.iterator_over_objects(draw_type.VECTORS): curvect: Zones logging.info(_('Plotting vector {}').format(curvect.idx)) linkedvecs[curvect.idx] = curvect if len(plotzone) > 1: for curzone in plotzone: if '_left_' in curzone.myname: locarrays = {curkey + '_left': curarray for curkey, curarray in linkedarrays.items()} curzone.plot_linked_polygons_wx(figmpl, locarrays, linked_vec=linkedvecs, linestyle='--') elif '_right_' in curzone.myname: locarrays = {curkey + '_right': curarray for curkey, curarray in linkedarrays.items()} curzone.plot_linked_polygons_wx(figmpl, locarrays, linked_vec=linkedvecs, linestyle='-.') else: plotzone[0].plot_linked_polygons_wx(figmpl, linkedarrays, linked_vec=linkedvecs) figmpl.Show() except Exception as e: logging.error(_('Error in plotting active polygons\n{}'.format(e))) logging.warning(_('Are you sure the active zone contains polygons ?'))
# ------------------------------------------------------------------ # Report handlers # ------------------------------------------------------------------
[docs] def _handle_report_active(self) -> None: from .Results2DGPU import wolfres2DGPU v = self._viewer if v.active_res2d is None: logging.warning(_('No active simulation !')) return from .report.simplesimgpu import SimpleSimGPU_Report_wx if isinstance(v.active_res2d, wolfres2DGPU): newsheet = SimpleSimGPU_Report_wx(Path(v.active_res2d.filename).parent, size=(800, 600)) newsheet.Show() else: logging.warning(_('Active simulation is not a GPU simulation - Not yet implemented for CPU simulations !'))
[docs] def _handle_report_checked(self) -> None: from .Results2DGPU import wolfres2DGPU from .PyDraw import draw_type v = self._viewer sims = v.get_list_keys(draw_type.RES2D, checked_state=True) if len(sims) == 0: logging.warning(_('No checked simulation !')) return from .report.simplesimgpu import SimpleSimGPU_Report_wx for curkey in sims: curmodel = v.get_obj_from_id(curkey, draw_type.RES2D) if isinstance(curmodel, wolfres2DGPU): newsheet = SimpleSimGPU_Report_wx(Path(curmodel.filename).parent, size=(800, 600)) newsheet.Show() else: logging.warning(_('Simulation {} is not a GPU simulation - Not yet implemented for CPU simulations !').format(curmodel.idx))
[docs] def _handle_report_disk(self) -> None: dlg = wx.DirDialog(None, _('Choose the directory containing the simulation'), style=wx.DD_DEFAULT_STYLE) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: dlg.Destroy() return directory = Path(dlg.GetPath()) dlg.Destroy() if not directory.exists(): logging.error(_('Directory {} does not exist !').format(directory)) wx.MessageBox(_('Directory {} does not exist !').format(directory), _('Error'), wx.OK | wx.ICON_ERROR) return if not directory.is_dir(): logging.error(_('Path {} is not a directory !').format(directory)) wx.MessageBox(_('Path {} is not a directory !').format(directory), _('Error'), wx.OK | wx.ICON_ERROR) return from .report.simplesimgpu import SimpleSimGPU_Report_wx newsheet = SimpleSimGPU_Report_wx(directory, size=(800, 600), show=True)
[docs] def _handle_report_directory(self) -> None: dlg = wx.DirDialog(None, _('Choose the directory containing the simulations'), style=wx.DD_DEFAULT_STYLE) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: dlg.Destroy() return directory = Path(dlg.GetPath()) dlg.Destroy() if not directory.exists(): logging.error(_('Directory {} does not exist !').format(directory)) wx.MessageBox(_('Directory {} does not exist !').format(directory), _('Error'), wx.OK | wx.ICON_ERROR) return if not directory.is_dir(): logging.error(_('Path {} is not a directory !').format(directory)) wx.MessageBox(_('Path {} is not a directory !').format(directory), _('Error'), wx.OK | wx.ICON_ERROR) return from .report.simplesimgpu import SimpleSimGPU_Reports_wx dlg = wx.MessageDialog(None, _('Do you want to show all reports ?'), _('Show all reports'), style=wx.YES_NO | wx.YES_DEFAULT) ret = dlg.ShowModal() dlg.Destroy() SimpleSimGPU_Reports_wx(directory, show=(ret == wx.ID_YES), size=(800, 600))
# ------------------------------------------------------------------ # Array comparison handlers # ------------------------------------------------------------------
[docs] def _handle_compare_arrays_files(self) -> None: from .wolf_array import WolfArray from .report.compare_arrays import CompareArrays_wx dlg = wx.FileDialog(None, _('Choose the reference file'), wildcard='*.tif, *.bin, *.npy|*.tif;*.bin;*.npy|all (*.*)|*.*', style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: dlg.Destroy() return ref_filename = dlg.GetPath() dlg.Destroy() dlg = wx.FileDialog(None, _('Choose the comparison file'), wildcard='*.tif, *.bin, *.npy|*.tif;*.bin;*.npy|all (*.*)|*.*', style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: dlg.Destroy() return comp_filename = dlg.GetPath() dlg.Destroy() try: wa_ref = WolfArray(ref_filename) wa_comp = WolfArray(comp_filename) if not (wa_ref.loaded and wa_comp.loaded): logging.error(_('Error in loading arrays from files')) wx.MessageBox(_('Error in loading arrays from files'), _('Error'), wx.OK | wx.ICON_ERROR) return if wa_ref.is_like(wa_comp): newsheet = CompareArrays_wx(wa_ref, wa_comp, size=(800, 600)) newsheet.Show() else: logging.error(_('The two arrays are not compatible - Cannot compare !')) wx.MessageBox(_('The two arrays are not compatible - Cannot compare !'), _('Error'), wx.OK | wx.ICON_ERROR) logging.info(_('Arrays {} and {} compared successfully').format(ref_filename, comp_filename)) except Exception as e: logging.error(_('Error in comparing arrays from files\n{}'.format(e)))
[docs] def _handle_compare_checked(self) -> None: # --- GPU ONLY --- from .Results2DGPU import wolfres2DGPU from .PyDraw import draw_type v = self._viewer sims = v.get_list_keys(draw_type.RES2D, checked_state=True) if len(sims) == 0: logging.warning(_('No checked simulation !')) return if len(sims) == 1: logging.warning(_('Only one checked simulation - Nothing to compare !')) return from .report.simplesimgpu import SimpleSimGPU_Report_Compare_wx sims = [v.get_obj_from_id(curkey, draw_type.RES2D) for curkey in sims] sims = [Path(curmodel.filename) for curmodel in sims if isinstance(curmodel, wolfres2DGPU)] sims = [sim.parent for sim in sims if 'simul_gpu_results' in str(sim)] if len(sims) == 0: logging.warning(_('No GPU simulation to compare !')) return elif len(sims) == 1: logging.warning(_('Only one GPU simulation - Nothing to compare !')) return try: newsheet = SimpleSimGPU_Report_Compare_wx(sims, size=(800, 600)) newsheet.Show() except Exception as e: logging.error(_('Error in comparing simulations\n{}'.format(e)))
[docs] def _handle_compare_all_dir(self) -> None: # --- GPU ONLY --- dlg = wx.DirDialog(None, _('Choose the directory containing the simulations'), style=wx.DD_DEFAULT_STYLE) ret = dlg.ShowModal() if ret == wx.ID_CANCEL: dlg.Destroy() return directory = Path(dlg.GetPath()) dlg.Destroy() if not directory.exists(): logging.error(_('Directory {} does not exist !').format(directory)) wx.MessageBox(_('Directory {} does not exist !').format(directory), _('Error'), wx.OK | wx.ICON_ERROR) return if not directory.is_dir(): logging.error(_('Path {} is not a directory !').format(directory)) wx.MessageBox(_('Path {} is not a directory !').format(directory), _('Error'), wx.OK | wx.ICON_ERROR) return from .report.simplesimgpu import SimpleSimGPU_Report_Compare_wx try: newsheet = SimpleSimGPU_Report_Compare_wx(directory, size=(800, 600)) newsheet.Show() except Exception as e: logging.error(_('Error in comparing simulations in directory\n{}'.format(e)))
# ------------------------------------------------------------------ # Inpaint handlers # ------------------------------------------------------------------
[docs] def _handle_inpaint_array(self) -> None: v = self._viewer if v.active_array is None: logging.warning(_('No active array !')) return nb = v.active_array.count_holes() if nb == 0: logging.warning(_('No hole in the array !')) return dlg = wx.SingleChoiceDialog(None, _('Ignore the last ones ?'), _('Holes'), [str(i) for i in range(10)], style=wx.CHOICEDLG_STYLE) if dlg.ShowModal() == wx.ID_CANCEL: dlg.Destroy() return nb = dlg.GetSelection() dlg.Destroy() v.active_array.inpaint(ignore_last=nb)
[docs] def _handle_inpaint_waterlevel(self) -> None: from .PyDraw import InPaint_waterlevel v = self._viewer InPaint_waterlevel(None, _("Choose the array to inpaint"), (400, 400), v)
[docs] def _handle_inpaint_with_mask(self) -> None: from .PyDraw import InPaint_array v = self._viewer InPaint_array(None, _("Choose the array to inpaint"), (400, 400), v)