Source code for wolfhece._particlesystem_manager

"""
Companion manager for particle system menu and actions.
Extracted from PyDraw.WolfMapViewer.
"""
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import numpy as np
import wx

from .PyTranslate import _

if TYPE_CHECKING:
    from .PyDraw import WolfMapViewer


[docs] class ParticleSystemManager: """Manages the Particle system menu and associated actions.""" def __init__(self, viewer: "WolfMapViewer") -> None:
[docs] self._viewer = viewer
[docs] self._menu: wx.Menu | None = None
[docs] self._menu_load: wx.Menu | None = None
[docs] def menu_build(self) -> None: v = self._viewer if self._menu is not None: return self._menu = wx.Menu() self._menu_load = wx.Menu() item_set = self._menu.Append(wx.ID_ANY, _("Set..."), _("Set arrays as the domain/uv of the particle system -- Must be a 2D array - Mask will be used to separate water and land")) item_setemit = self._menu.Append(wx.ID_ANY, _("Set emitter from selected nodes"), _("Set the selected nodes as emitters of the particle system")) self._menu.AppendSubMenu(self._menu_load, _("Load..."), _('Load data for the particle system in the UI')) item_domain = self._menu_load.Append(wx.ID_ANY, _("Load domain..."), _("Loading the domain in the UI")) item_negdomain = self._menu_load.Append(wx.ID_ANY, _("Load ~domain..."), _("Loading the negative of the domain in the UI")) item_emitters = self._menu_load.Append(wx.ID_ANY, _("Load emitters..."), _("Loading the emitters in the UI")) item_uv = self._menu_load.Append(wx.ID_ANY, _("Load uv..."), _("Loading the UV velocity field in the UI")) item_uvnorm = self._menu_load.Append(wx.ID_ANY, _("Load uv norm..."), _("Loading the norm of the velocity field in the UI")) self._menu.AppendSeparator() item_check = self._menu.Append(wx.ID_ANY, _("Check"), _("Check if the particle system is ready to be computed")) item_bake = self._menu.Append(wx.ID_ANY, _("Bake"), _("Compute the particle system")) item_reset = self._menu.Append(wx.ID_ANY, _("Reset"), _("Clear all results but keep the particle system settings")) self._menu.AppendSeparator() item_start = self._menu.Append(wx.ID_ANY, _("Start"), _("Run all steps")) item_stop = self._menu.Append(wx.ID_ANY, _("Stop"), _("Stop the current animation")) item_resume = self._menu.Append(wx.ID_ANY, _("Resume"), _("Resume animation")) v.timer_ps = wx.Timer(v) self._menu.Bind(wx.EVT_MENU, self._on_set, item_set) self._menu.Bind(wx.EVT_MENU, self._on_set_emitters, item_setemit) self._menu_load.Bind(wx.EVT_MENU, self._on_load_domain, item_domain) self._menu_load.Bind(wx.EVT_MENU, self._on_load_neg_domain,item_negdomain) self._menu_load.Bind(wx.EVT_MENU, self._on_load_emitters, item_emitters) self._menu_load.Bind(wx.EVT_MENU, self._on_load_uv, item_uv) self._menu_load.Bind(wx.EVT_MENU, self._on_load_uv_norm, item_uvnorm) self._menu.Bind(wx.EVT_MENU, self._on_check, item_check) self._menu.Bind(wx.EVT_MENU, self._on_bake, item_bake) self._menu.Bind(wx.EVT_MENU, self._on_reset, item_reset) self._menu.Bind(wx.EVT_MENU, self._on_start, item_start) self._menu.Bind(wx.EVT_MENU, self._on_stop, item_stop) self._menu.Bind(wx.EVT_MENU, self._on_resume, item_resume) v.Bind(wx.EVT_TIMER, self.on_timer, v.timer_ps) v.menubar.Append(self._menu, _('Particle system'))
[docs] def _on_start(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_particle_system is None: return v.active_particle_system.current_step = 0 v.active_particle_system.current_step_idx = 0 v.timer_ps.Start(1000.0 / v.active_particle_system.fps)
[docs] def _on_stop(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_particle_system is None: return v.timer_ps.Stop()
[docs] def _on_resume(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_particle_system is None: return v.timer_ps.Start(1000.0 / v.active_particle_system.fps)
[docs] def _on_load_domain(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_particle_system is None: return domain = v.active_particle_system.get_domain(output_type='wolf') v.add_object('array', id=domain.idx, newobj=domain, ToCheck=True) v.Refresh()
[docs] def _on_load_neg_domain(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_particle_system is None: return domain = v.active_particle_system.get_domain(output_type='wolf') domain.idx = domain.idx + '_neg' domain.mask_reset() ones = np.where(domain.array.data == 1) domain.array[:, :] = 1 domain.array[ones] = 0 domain.mask_data(domain.nullvalue) v.add_object('array', id=domain.idx, newobj=domain, ToCheck=True) v.Refresh()
[docs] def _on_load_emitters(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_particle_system is None: return emitters = v.active_particle_system.get_emitters(output_type='wolf') v.add_object('vector', id=emitters.idx, newobj=emitters, ToCheck=True) v.Refresh()
[docs] def _on_load_uv(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_particle_system is None: return u = v.active_particle_system.get_u(output_type='wolf') vel_v = v.active_particle_system.get_v(output_type='wolf') v.add_object('array', id=u.idx, newobj=u, ToCheck=True) v.add_object('array', id=vel_v.idx, newobj=vel_v, ToCheck=True) v.Refresh()
[docs] def _on_load_uv_norm(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_particle_system is None: return uvnorm = v.active_particle_system.get_uv_absolute(output_type='wolf') v.add_object('array', id=uvnorm.idx, newobj=uvnorm, ToCheck=True) v.Refresh()
[docs] def _on_bake(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_particle_system is None: return check, msg = v.active_particle_system.bake() if not check: dlg = wx.MessageDialog(v, msg, _('Error'), wx.OK | wx.ICON_ERROR) dlg.ShowModal() dlg.Destroy()
[docs] def _on_reset(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_particle_system is None: return v.active_particle_system.reset()
[docs] def _on_set(self, event: wx.MenuEvent) -> None: if self._viewer.active_particle_system is None: return self._set_particle_system()
[docs] def _on_set_emitters(self, event: wx.MenuEvent) -> None: if self._viewer.active_particle_system is None: return self._set_emitters_from_selected_nodes()
[docs] def _on_check(self, event: wx.MenuEvent) -> None: v = self._viewer if v.active_particle_system is None: return check, msg = v.active_particle_system.check() if not check: dlg = wx.MessageDialog(v, msg, _('Error'), wx.OK | wx.ICON_ERROR) dlg.ShowModal() dlg.Destroy() return dlg = wx.MessageDialog(v, _('All is fine -- You can bake you system !'), _('Chesk particle system'), wx.OK | wx.ICON_INFORMATION) dlg.ShowModal() dlg.Destroy()
[docs] def on_timer(self, event: wx.Event) -> None: v = self._viewer if v.active_particle_system is None: return nb = v.active_particle_system.nb_steps v.active_particle_system.current_step_idx += 1 v.Paint() v._update_tooltip() if v.active_particle_system.current_step_idx == nb - 1: v.timer_ps.Stop()
[docs] def _set_particle_system(self) -> None: from .PyDraw import draw_type from .ui.wolf_multiselection_collapsiblepane import Wolf_MultipleSelection v = self._viewer setter = Wolf_MultipleSelection( v, title=_("Set particle system"), message=_("Choose arrays/emitters for the particle system"), values_dict={ 'domain': v.get_list_keys(draw_type.ARRAYS), 'u': v.get_list_keys(draw_type.ARRAYS), 'v': v.get_list_keys(draw_type.ARRAYS), 'emitters': v.get_list_keys(draw_type.VECTORS), }, info='Set : \n - domain (1 value)\n - u and v (multiple values)\n - emitters (multiple values)', styles=[wx.LB_SINGLE, wx.LB_EXTENDED, wx.LB_EXTENDED, wx.LB_EXTENDED], max_choices=[1, None, None, None], delete_if_transfer=[True, False, False, True], destroyOK=False, ) setter.ShowModal() ret_dict = setter.get_values() setter.Destroy() if 'domain' in ret_dict and len(ret_dict['domain']) == 1: domain = v.getobj_from_id(ret_dict['domain'][0]) v.active_particle_system.set_domain(domain) if 'u' in ret_dict and 'v' in ret_dict and len(ret_dict['u']) > 0: assert len(ret_dict['u']) == len(ret_dict['v']), _('Please select the same number of u and v arrays') time = 0.0 for u_id, v_id in zip(ret_dict['u'], ret_dict['v']): u_arr = v.getobj_from_id(u_id) v_arr = v.getobj_from_id(v_id) assert u_arr.array.shape == v_arr.array.shape, _('Please select arrays with the same shape') assert u_arr.origx == v_arr.origx and u_arr.origy == v_arr.origy, _('Please select arrays with the same origin') assert u_arr.dx == v_arr.dx and u_arr.dy == v_arr.dy, _('Please select arrays with the same resolution') v.active_particle_system.set_uv( (u_arr, v_arr), (u_arr.origx, u_arr.origy, u_arr.dx, u_arr.dy), time=time, ) time += 1.0 if 'emitters' in ret_dict and len(ret_dict['emitters']) > 0: emitters = [v.getobj_from_id(cur) for cur in ret_dict['emitters']] v.active_particle_system.set_emitters(emitters) if v.active_particle_system._ui is not None: v.active_particle_system.show_properties()
[docs] def _set_emitters_from_selected_nodes(self) -> None: from .lagrangian.emitter import Emitter v = self._viewer if v.active_array is None: logging.warning(_('No active array -- Please activate an array first')) return if len(v.active_array.SelectionData.myselection) == 0 and len(v.active_array.SelectionData.selections) == 0: logging.warning(_('No selection -- Please select some nodes first')) return newemitters = [] if len(v.active_array.SelectionData.myselection) > 0: indices = [v.active_array.get_ij_from_xy(cur[0], cur[1]) for cur in v.active_array.SelectionData.myselection] newemitters = [ Emitter( indices, header=(v.active_array.origx, v.active_array.origy, v.active_array.dx, v.active_array.dy), ) ] if len(v.active_array.SelectionData.selections) > 0: for cursel in v.active_array.SelectionData.selections.values(): indices = [v.active_array.get_ij_from_xy(cur[0], cur[1]) for cur in cursel['select']] newemitters += [ Emitter( indices, header=(v.active_array.origx, v.active_array.origy, v.active_array.dx, v.active_array.dy), ) ] v.active_particle_system.set_emitters(newemitters) if v.active_particle_system._ui is not None: v.active_particle_system.show_properties()