"""
Companion manager for particle system menu and actions.
Extracted from PyDraw.WolfMapViewer.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import numpy as np
import wx
from .PyTranslate import _
if TYPE_CHECKING:
from .PyDraw import WolfMapViewer
[docs]
class ParticleSystemManager:
"""Manages the Particle system menu and associated actions."""
def __init__(self, viewer: "WolfMapViewer") -> None:
[docs]
self._viewer = viewer
[docs]
self._menu: wx.Menu | None = None
[docs]
self._menu_load: wx.Menu | None = None
[docs]
def menu_build(self) -> None:
v = self._viewer
if self._menu is not None:
return
self._menu = wx.Menu()
self._menu_load = wx.Menu()
item_set = self._menu.Append(wx.ID_ANY, _("Set..."), _("Set arrays as the domain/uv of the particle system -- Must be a 2D array - Mask will be used to separate water and land"))
item_setemit = self._menu.Append(wx.ID_ANY, _("Set emitter from selected nodes"), _("Set the selected nodes as emitters of the particle system"))
self._menu.AppendSubMenu(self._menu_load, _("Load..."), _('Load data for the particle system in the UI'))
item_domain = self._menu_load.Append(wx.ID_ANY, _("Load domain..."), _("Loading the domain in the UI"))
item_negdomain = self._menu_load.Append(wx.ID_ANY, _("Load ~domain..."), _("Loading the negative of the domain in the UI"))
item_emitters = self._menu_load.Append(wx.ID_ANY, _("Load emitters..."), _("Loading the emitters in the UI"))
item_uv = self._menu_load.Append(wx.ID_ANY, _("Load uv..."), _("Loading the UV velocity field in the UI"))
item_uvnorm = self._menu_load.Append(wx.ID_ANY, _("Load uv norm..."), _("Loading the norm of the velocity field in the UI"))
self._menu.AppendSeparator()
item_check = self._menu.Append(wx.ID_ANY, _("Check"), _("Check if the particle system is ready to be computed"))
item_bake = self._menu.Append(wx.ID_ANY, _("Bake"), _("Compute the particle system"))
item_reset = self._menu.Append(wx.ID_ANY, _("Reset"), _("Clear all results but keep the particle system settings"))
self._menu.AppendSeparator()
item_start = self._menu.Append(wx.ID_ANY, _("Start"), _("Run all steps"))
item_stop = self._menu.Append(wx.ID_ANY, _("Stop"), _("Stop the current animation"))
item_resume = self._menu.Append(wx.ID_ANY, _("Resume"), _("Resume animation"))
v.timer_ps = wx.Timer(v)
self._menu.Bind(wx.EVT_MENU, self._on_set, item_set)
self._menu.Bind(wx.EVT_MENU, self._on_set_emitters, item_setemit)
self._menu_load.Bind(wx.EVT_MENU, self._on_load_domain, item_domain)
self._menu_load.Bind(wx.EVT_MENU, self._on_load_neg_domain,item_negdomain)
self._menu_load.Bind(wx.EVT_MENU, self._on_load_emitters, item_emitters)
self._menu_load.Bind(wx.EVT_MENU, self._on_load_uv, item_uv)
self._menu_load.Bind(wx.EVT_MENU, self._on_load_uv_norm, item_uvnorm)
self._menu.Bind(wx.EVT_MENU, self._on_check, item_check)
self._menu.Bind(wx.EVT_MENU, self._on_bake, item_bake)
self._menu.Bind(wx.EVT_MENU, self._on_reset, item_reset)
self._menu.Bind(wx.EVT_MENU, self._on_start, item_start)
self._menu.Bind(wx.EVT_MENU, self._on_stop, item_stop)
self._menu.Bind(wx.EVT_MENU, self._on_resume, item_resume)
v.Bind(wx.EVT_TIMER, self.on_timer, v.timer_ps)
v.menubar.Append(self._menu, _('Particle system'))
[docs]
def _on_start(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_particle_system is None:
return
v.active_particle_system.current_step = 0
v.active_particle_system.current_step_idx = 0
v.timer_ps.Start(1000.0 / v.active_particle_system.fps)
[docs]
def _on_stop(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_particle_system is None:
return
v.timer_ps.Stop()
[docs]
def _on_resume(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_particle_system is None:
return
v.timer_ps.Start(1000.0 / v.active_particle_system.fps)
[docs]
def _on_load_domain(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_particle_system is None:
return
domain = v.active_particle_system.get_domain(output_type='wolf')
v.add_object('array', id=domain.idx, newobj=domain, ToCheck=True)
v.Refresh()
[docs]
def _on_load_neg_domain(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_particle_system is None:
return
domain = v.active_particle_system.get_domain(output_type='wolf')
domain.idx = domain.idx + '_neg'
domain.mask_reset()
ones = np.where(domain.array.data == 1)
domain.array[:, :] = 1
domain.array[ones] = 0
domain.mask_data(domain.nullvalue)
v.add_object('array', id=domain.idx, newobj=domain, ToCheck=True)
v.Refresh()
[docs]
def _on_load_emitters(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_particle_system is None:
return
emitters = v.active_particle_system.get_emitters(output_type='wolf')
v.add_object('vector', id=emitters.idx, newobj=emitters, ToCheck=True)
v.Refresh()
[docs]
def _on_load_uv(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_particle_system is None:
return
u = v.active_particle_system.get_u(output_type='wolf')
vel_v = v.active_particle_system.get_v(output_type='wolf')
v.add_object('array', id=u.idx, newobj=u, ToCheck=True)
v.add_object('array', id=vel_v.idx, newobj=vel_v, ToCheck=True)
v.Refresh()
[docs]
def _on_load_uv_norm(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_particle_system is None:
return
uvnorm = v.active_particle_system.get_uv_absolute(output_type='wolf')
v.add_object('array', id=uvnorm.idx, newobj=uvnorm, ToCheck=True)
v.Refresh()
[docs]
def _on_bake(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_particle_system is None:
return
check, msg = v.active_particle_system.bake()
if not check:
dlg = wx.MessageDialog(v, msg, _('Error'), wx.OK | wx.ICON_ERROR)
dlg.ShowModal()
dlg.Destroy()
[docs]
def _on_reset(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_particle_system is None:
return
v.active_particle_system.reset()
[docs]
def _on_set(self, event: wx.MenuEvent) -> None:
if self._viewer.active_particle_system is None:
return
self._set_particle_system()
[docs]
def _on_set_emitters(self, event: wx.MenuEvent) -> None:
if self._viewer.active_particle_system is None:
return
self._set_emitters_from_selected_nodes()
[docs]
def _on_check(self, event: wx.MenuEvent) -> None:
v = self._viewer
if v.active_particle_system is None:
return
check, msg = v.active_particle_system.check()
if not check:
dlg = wx.MessageDialog(v, msg, _('Error'), wx.OK | wx.ICON_ERROR)
dlg.ShowModal()
dlg.Destroy()
return
dlg = wx.MessageDialog(v, _('All is fine -- You can bake you system !'), _('Chesk particle system'), wx.OK | wx.ICON_INFORMATION)
dlg.ShowModal()
dlg.Destroy()
[docs]
def on_timer(self, event: wx.Event) -> None:
v = self._viewer
if v.active_particle_system is None:
return
nb = v.active_particle_system.nb_steps
v.active_particle_system.current_step_idx += 1
v.Paint()
v._update_tooltip()
if v.active_particle_system.current_step_idx == nb - 1:
v.timer_ps.Stop()
[docs]
def _set_particle_system(self) -> None:
from .PyDraw import draw_type
from .ui.wolf_multiselection_collapsiblepane import Wolf_MultipleSelection
v = self._viewer
setter = Wolf_MultipleSelection(
v,
title=_("Set particle system"),
message=_("Choose arrays/emitters for the particle system"),
values_dict={
'domain': v.get_list_keys(draw_type.ARRAYS),
'u': v.get_list_keys(draw_type.ARRAYS),
'v': v.get_list_keys(draw_type.ARRAYS),
'emitters': v.get_list_keys(draw_type.VECTORS),
},
info='Set : \n - domain (1 value)\n - u and v (multiple values)\n - emitters (multiple values)',
styles=[wx.LB_SINGLE, wx.LB_EXTENDED, wx.LB_EXTENDED, wx.LB_EXTENDED],
max_choices=[1, None, None, None],
delete_if_transfer=[True, False, False, True],
destroyOK=False,
)
setter.ShowModal()
ret_dict = setter.get_values()
setter.Destroy()
if 'domain' in ret_dict and len(ret_dict['domain']) == 1:
domain = v.getobj_from_id(ret_dict['domain'][0])
v.active_particle_system.set_domain(domain)
if 'u' in ret_dict and 'v' in ret_dict and len(ret_dict['u']) > 0:
assert len(ret_dict['u']) == len(ret_dict['v']), _('Please select the same number of u and v arrays')
time = 0.0
for u_id, v_id in zip(ret_dict['u'], ret_dict['v']):
u_arr = v.getobj_from_id(u_id)
v_arr = v.getobj_from_id(v_id)
assert u_arr.array.shape == v_arr.array.shape, _('Please select arrays with the same shape')
assert u_arr.origx == v_arr.origx and u_arr.origy == v_arr.origy, _('Please select arrays with the same origin')
assert u_arr.dx == v_arr.dx and u_arr.dy == v_arr.dy, _('Please select arrays with the same resolution')
v.active_particle_system.set_uv(
(u_arr, v_arr),
(u_arr.origx, u_arr.origy, u_arr.dx, u_arr.dy),
time=time,
)
time += 1.0
if 'emitters' in ret_dict and len(ret_dict['emitters']) > 0:
emitters = [v.getobj_from_id(cur) for cur in ret_dict['emitters']]
v.active_particle_system.set_emitters(emitters)
if v.active_particle_system._ui is not None:
v.active_particle_system.show_properties()
[docs]
def _set_emitters_from_selected_nodes(self) -> None:
from .lagrangian.emitter import Emitter
v = self._viewer
if v.active_array is None:
logging.warning(_('No active array -- Please activate an array first'))
return
if len(v.active_array.SelectionData.myselection) == 0 and len(v.active_array.SelectionData.selections) == 0:
logging.warning(_('No selection -- Please select some nodes first'))
return
newemitters = []
if len(v.active_array.SelectionData.myselection) > 0:
indices = [v.active_array.get_ij_from_xy(cur[0], cur[1]) for cur in v.active_array.SelectionData.myselection]
newemitters = [
Emitter(
indices,
header=(v.active_array.origx, v.active_array.origy, v.active_array.dx, v.active_array.dy),
)
]
if len(v.active_array.SelectionData.selections) > 0:
for cursel in v.active_array.SelectionData.selections.values():
indices = [v.active_array.get_ij_from_xy(cur[0], cur[1]) for cur in cursel['select']]
newemitters += [
Emitter(
indices,
header=(v.active_array.origx, v.active_array.origy, v.active_array.dx, v.active_array.dy),
)
]
v.active_particle_system.set_emitters(newemitters)
if v.active_particle_system._ui is not None:
v.active_particle_system.show_properties()