Source code for wolfhece.plugins.viewer_proxy

"""Technical runtime proxy used by companion plugins.

This proxy sits between companions and the viewer API. Companion code should
prefer accessing viewer facilities through this object.

Temporary migration escape hatch
--------------------------------
The underlying viewer remains reachable through the private ``_viewer``
attribute of this proxy so legacy companions can still access raw viewer
features while dedicated helper methods are progressively added.
"""
from __future__ import annotations

import logging
import inspect
from typing import TYPE_CHECKING, Callable, Iterable, Optional

from .._action_kind import ActionKind
from ..dialog_provider import DialogProvider
from ..PyTranslate import _

if TYPE_CHECKING:
    import wx
    from .types import ActionSpec, MenuEntry, MenuItem, MultiStepSpec, Separator, SubMenuSpec
    from ..PyDraw import WolfMapViewer
    from .._viewer_plugin_handlers import MouseContext


[docs] class ViewerProxy: """Bridge object that hosts technical viewer/wx/dialog/OpenGL routines. Companion subclasses can focus on business logic while this proxy owns the low-level plumbing and viewer integration details. The proxy is the **sole owner** of both the viewer reference and the dialog provider. Companion code should access the viewer only through ``self.proxy._viewer`` (escape hatch) or dedicated helper methods. """ # ------------------------------------------------------------------ # Construction and attachment # ------------------------------------------------------------------ def __init__(self, name: str, viewer: 'WolfMapViewer | None' = None, dialogs: DialogProvider | None = None, namespace: str = '', ) -> None: """Initialize a viewer proxy bound to one companion instance. :param name: Companion class or instance name used for logs. :param viewer: Optional viewer to bind immediately. :param dialogs: Optional dialog provider override. :param namespace: Namespace used to qualify action ids. """ #: Class name of the companion — used in log messages.
[docs] self._name: str = name
#: Direct viewer reference — escape hatch for advanced companion code. #: Prefer dedicated helper methods when they exist.
[docs] self._viewer: 'WolfMapViewer | None' = viewer
[docs] self._dialogs: DialogProvider = dialogs if dialogs is not None else DialogProvider()
[docs] self._namespace: str = namespace
[docs] self._menu: 'wx.Menu | None' = None
[docs] self._menu_host: str = 'top_level'
[docs] self._menu_parent: 'wx.Menu | None' = None
[docs] self._menu_parent_item: 'wx.MenuItem | None' = None
#: All action ids registered via :meth:`register_action`; used by #: :meth:`destroy` to clean up automatically.
[docs] self._registered_action_ids: list[str] = []
[docs] self._appended_items: 'list[tuple[wx.Menu, wx.MenuItem]]' = []
[docs] self._multistep_specs: dict[str, 'MultiStepSpec'] = {}
[docs] self._multistep_step_index: dict[str, int] = {}
[docs] self._active_multistep_action_id: str | None = None
[docs] def attach(self, viewer: 'WolfMapViewer', dialogs: DialogProvider | None = None) -> None: """Attach (or re-attach) the proxy to a viewer after construction. Useful when a companion is created without a viewer and the viewer becomes available later:: companion = MyCompanion() ... companion.runtime.attach(viewer) companion.runtime.activate() :param viewer: The :class:`~wolfhece.PyDraw.WolfMapViewer` to bind. :param dialogs: Optional dialog provider; when omitted the current provider is kept (or a default one is used if none was set yet). :raises RuntimeError: If a *different* viewer is already attached. """ if self._viewer is not None and self._viewer is not viewer: raise RuntimeError( f"Companion '{self._name}' is already attached to a different viewer. " "Call detach() first if you really intend to switch viewers." ) self._viewer = viewer if dialogs is not None: self._dialogs = dialogs self._replace_previous_proxy_instance()
[docs] def detach(self) -> None: """Detach the proxy from the current viewer. After calling this the companion is inert: all viewer-dependent helpers will fail until :meth:`attach` is called again. The dialog provider is kept so that standalone dialogs (e.g. file pickers) still work. """ if self._viewer is not None: reg = getattr(self._viewer, '_companion_proxy_registry', None) key = self._proxy_registry_key() if isinstance(reg, dict) and reg.get(key) is self: reg.pop(key, None) self._viewer = None
# ------------------------------------------------------------------ # Proxy registry and identity # ------------------------------------------------------------------
[docs] def _proxy_registry_key(self) -> str: """Return the key used to de-duplicate proxy instances on a viewer.""" return (self._namespace or self._name).lower()
[docs] def _get_proxy_registry(self) -> dict[str, 'ViewerProxy']: """Return viewer-scoped proxy registry, creating it if needed.""" reg = getattr(self._viewer, '_companion_proxy_registry', None) if not isinstance(reg, dict): reg = {} setattr(self._viewer, '_companion_proxy_registry', reg) return reg
[docs] def _replace_previous_proxy_instance(self) -> None: """Replace a previous attached proxy with the same identity. Notebook cells are often re-executed, creating a new companion instance while the old one is still attached. We proactively destroy and detach the previous proxy to keep registration/menu state idempotent. """ reg = self._get_proxy_registry() key = self._proxy_registry_key() previous = reg.get(key) if previous is self: return if isinstance(previous, ViewerProxy): try: previous.destroy() except Exception as exc: logging.debug( "%s._replace_previous_proxy_instance: failed to destroy previous proxy '%s': %s", self._name, key, exc, ) previous._viewer = None reg[key] = self
# ------------------------------------------------------------------ # Menu construction and menu event adaptation # ------------------------------------------------------------------
[docs] def action_id(self, local_id: str) -> str: """Return ``'{namespace}.{local_id}'`` — the companion's namespaced action id. Guarantees uniqueness across companions that independently pick the same *local_id*. Use this instead of building the string manually:: self.proxy.register_action('pick', ldown=self._ldown) :param local_id: Short, human-readable name (e.g. ``'pick'``, ``'place wall'``). :returns: ``'{namespace}.{local_id}'`` — e.g. ``'mycompanion.pick'``. """ return f"{self._namespace}.{local_id}"
[docs] def build_menu(self, title: str, items: list['MenuEntry'], *, host: str = 'top_level', companions_root_label: str | None = None, ) -> None: """Build a companion menu either as top-level or under "Companions". Menu item callbacks are always called with a synthetic :class:`~wolfhece._viewer_plugin_handlers.MouseContext` instead of a raw wx event. This keeps companion logic framework-agnostic. """ if self._menu is not None: return import wx self._replace_existing_companion_menu(title=title, host=host) self._menu = wx.Menu() if host == 'top_level': self._viewer.menubar.Append(self._menu, title) self._menu_host = 'top_level' self._menu_parent = None self._menu_parent_item = None elif host == 'companions_root': root = self._ensure_companions_root_menu( companions_root_label or _('Companions') ) self._menu_parent_item = root.AppendSubMenu(self._menu, title) self._menu_host = 'companions_root' self._menu_parent = root else: raise ValueError(f"Unsupported menu host: {host!r}") self.fill_wx_menu(self._menu, items) self._register_companion_menu(title=title, host=host)
[docs] def _menu_registry_key(self, *, title: str, host: str) -> tuple[str, str, str]: """Return a stable key for de-duplicating companion menus per viewer.""" return (host, self._namespace or self._name.lower(), title)
@staticmethod
[docs] def _safe_isinstance(obj: object, type_or_tuple: object) -> bool: """Like isinstance(), but tolerant when test doubles replace wx classes.""" try: return isinstance(obj, type_or_tuple) except TypeError: return False
[docs] def _get_companion_menu_registry(self) -> dict[tuple[str, str, str], dict]: """Return viewer-scoped companion menu registry, creating it if needed.""" reg = getattr(self._viewer, '_companion_menu_registry', None) if not isinstance(reg, dict): reg = {} setattr(self._viewer, '_companion_menu_registry', reg) return reg
[docs] def _replace_existing_companion_menu(self, *, title: str, host: str) -> None: """Remove an existing menu built by a previous companion instance. This makes notebook re-execution idempotent: rebuilding a companion with the same identity (host + namespace + title) replaces the old menu instead of appending duplicates. """ import wx reg = self._get_companion_menu_registry() key = self._menu_registry_key(title=title, host=host) entry = reg.get(key) if not isinstance(entry, dict): return old_menu = entry.get('menu') if host == 'top_level' and self._safe_isinstance(old_menu, wx.Menu): menubar: 'wx.MenuBar' = self._viewer.menubar for pos in range(menubar.GetMenuCount()): if menubar.GetMenu(pos) is old_menu: menubar.Remove(pos) break elif host == 'companions_root': parent = entry.get('parent') parent_item = entry.get('parent_item') if self._safe_isinstance(parent, wx.Menu) and self._safe_isinstance(parent_item, wx.MenuItem): try: parent.Remove(parent_item) except Exception: logging.debug( "%s._replace_existing_companion_menu: previous submenu already removed?", self._name, ) self._cleanup_companions_root_menu_if_empty() reg.pop(key, None)
[docs] def _register_companion_menu(self, *, title: str, host: str) -> None: """Store bookkeeping for later de-duplication and safe teardown.""" reg = self._get_companion_menu_registry() key = self._menu_registry_key(title=title, host=host) reg[key] = { 'menu': self._menu, 'parent': self._menu_parent, 'parent_item': self._menu_parent_item, }
[docs] def _ensure_companions_root_menu(self, label: str) -> 'wx.Menu': """Return the shared top-level root menu used by companions.""" import wx root = getattr(self._viewer, '_companions_root_menu', None) if self._safe_isinstance(root, wx.Menu): return root root = wx.Menu() self._viewer.menubar.Append(root, label) setattr(self._viewer, '_companions_root_menu', root) return root
[docs] def _cleanup_companions_root_menu_if_empty(self) -> None: """Remove the shared companions root when no child submenus remain.""" import wx root = getattr(self._viewer, '_companions_root_menu', None) if not self._safe_isinstance(root, wx.Menu): return if root.GetMenuItemCount() > 0: return menubar: 'wx.MenuBar' = self._viewer.menubar for pos in range(menubar.GetMenuCount()): if menubar.GetMenu(pos) is root: menubar.Remove(pos) break if hasattr(self._viewer, '_companions_root_menu'): delattr(self._viewer, '_companions_root_menu')
[docs] def fill_wx_menu(self, menu: 'wx.Menu', items: list['MenuEntry']) -> None: """Populate a wx menu from declarative entries. The proxy intercepts ``wx.EVT_MENU`` and forwards a synthetic mouse context to handlers. """ import wx from .types import MenuItem, Separator, SubMenuSpec for entry in items: if isinstance(entry, Separator): menu.AppendSeparator() elif isinstance(entry, SubMenuSpec): sub = wx.Menu() self.fill_wx_menu(sub, entry.items) menu.AppendSubMenu(sub, entry.label, entry.help) elif isinstance(entry, MenuItem): if entry.checkable: item = menu.AppendCheckItem(wx.ID_ANY, entry.label, entry.help) else: item = menu.Append(wx.ID_ANY, entry.label, entry.help) if not entry.enabled: menu.Enable(item.GetId(), False) menu.Bind(wx.EVT_MENU, self._wrap_menu_handler(entry.handler), item)
[docs] def _wrap_menu_handler(self, handler: Callable[..., None]) -> Callable: """Adapt a wx menu callback to a MouseContext callback. Companion handlers stay independent from wx by receiving a synthetic :class:`~wolfhece._viewer_plugin_handlers.MouseContext`. """ def _on_menu(event) -> None: ctx = self._menu_event_to_mouse_context(event) handler(ctx) return _on_menu
[docs] def _menu_event_to_mouse_context(self, event) -> 'MouseContext': """Build a synthetic :class:`MouseContext` from a wx menu event. Strategy: 1. Reuse the viewer's latest cached mouse context when available. 2. Fall back to a neutral origin context when no cache exists. """ from .._viewer_plugin_handlers import KeyboardSnapshot, MouseContext last_ctx = getattr(self._viewer, '_mouse_context', None) if isinstance(last_ctx, MouseContext): return MouseContext( x=last_ctx.x, y=last_ctx.y, x_snap=last_ctx.x_snap, y_snap=last_ctx.y_snap, x_pixel=last_ctx.x_pixel, y_pixel=last_ctx.y_pixel, keyboard=last_ctx.keyboard, left_down=False, middle_down=False, right_down=False, pressure=last_ctx.pressure, wheel_rotation=0, wheel_delta=last_ctx.wheel_delta, ) return MouseContext( x=0.0, y=0.0, x_snap=0.0, y_snap=0.0, x_pixel=0, y_pixel=0, keyboard=KeyboardSnapshot(), left_down=False, middle_down=False, right_down=False, pressure=1.0, wheel_rotation=0, wheel_delta=120, )
[docs] def append_to_menu(self, menu: 'wx.Menu', items: list['MenuEntry']) -> None: """Append declarative entries to an existing wx menu. The proxy records the appended items so :meth:`menu_destroy` can later remove them cleanly. """ before = menu.GetMenuItemCount() self.fill_wx_menu(menu, items) for wx_item in menu.GetMenuItems()[before:]: self._appended_items.append((menu, wx_item))
[docs] def append_to_create_menu(self, items: list['MenuEntry']) -> None: """Append entries to the viewer's create-object menu.""" self.append_to_menu(self._viewer.menucreateobj, items)
[docs] def append_to_add_menu(self, items: list['MenuEntry']) -> None: """Append entries to the viewer's add-object menu.""" self.append_to_menu(self._viewer.menuaddobj, items)
# ------------------------------------------------------------------ # Viewer object access — arrays / zones / triangulations # ------------------------------------------------------------------
[docs] def _require_viewer(self) -> 'WolfMapViewer': """Return the attached viewer or raise when the proxy is detached. :raises RuntimeError: If no viewer is currently attached. """ if self._viewer is None: raise RuntimeError(f"Companion '{self._name}' is not attached to a viewer") return self._viewer
@property
[docs] def active_array(self): """Return the viewer's active array, or ``None`` when unset.""" return self._require_viewer().active_array
@active_array.setter def active_array(self, value) -> None: """Set the viewer's active array.""" self._require_viewer().active_array = value @property
[docs] def active_matrix(self): """Alias for :attr:`active_array`. The viewer code historically uses "array" while some companions and docs still talk about matrices. """ return self.active_array
@active_matrix.setter def active_matrix(self, value) -> None: """Set the active matrix alias.""" self.active_array = value @property
[docs] def active_zones(self): """Return the viewer's active zones, or ``None`` when unset.""" return self._require_viewer().active_zones
@active_zones.setter def active_zones(self, value) -> None: """Set the viewer's active zones object.""" self._require_viewer().active_zones = value @property
[docs] def active_triangulation(self): """Return the viewer's active triangulation, or ``None`` when unset.""" return self._require_viewer().active_tri
@active_triangulation.setter def active_triangulation(self, value) -> None: """Set the viewer's active triangulation.""" self._require_viewer().active_tri = value
[docs] def get_array(self, id: str | None = None): """Return the active array or a named array from the viewer.""" viewer = self._require_viewer() if id is None: return viewer.active_array from ..PyDraw import draw_type return viewer.get_obj_from_id(id, drawing_type=draw_type.ARRAYS)
[docs] def get_matrix(self, id: str | None = None): """Alias for :meth:`get_array`. Kept for companions / docs that still use the matrix wording. """ return self.get_array(id)
[docs] def get_zones(self, id: str | None = None): """Return the active zones or a named zones object from the viewer.""" viewer = self._require_viewer() if id is None: return viewer.active_zones from ..PyDraw import draw_type return viewer.get_obj_from_id(id, drawing_type=draw_type.VECTORS)
[docs] def get_triangulation(self, id: str | None = None): """Return the active triangulation or a named triangulation.""" viewer = self._require_viewer() if id is None: return viewer.active_tri from ..PyDraw import draw_type return viewer.get_obj_from_id(id, drawing_type=draw_type.TRIANGULATION)
[docs] def add_array(self, newobj=None, *, filename: str = '', ToCheck: bool = True, id: str = '', ) -> int: """Add an array to the viewer and return the viewer's add result.""" return self._require_viewer().add_object( 'array', filename=filename, newobj=newobj, ToCheck=ToCheck, id=id )
[docs] def add_matrix(self, newobj=None, *, filename: str = '', ToCheck: bool = True, id: str = '', ) -> int: """Alias for :meth:`add_array`.""" return self.add_array( newobj, filename=filename, ToCheck=ToCheck, id=id )
[docs] def add_zones(self, newobj=None, *, filename: str = '', ToCheck: bool = True, id: str = '', ) -> int: """Add a zones object to the viewer and return the add result.""" return self._require_viewer().add_object( 'vector', filename=filename, newobj=newobj, ToCheck=ToCheck, id=id )
[docs] def add_triangulation(self, newobj=None, *, filename: str = '', ToCheck: bool = True, id: str = '', ) -> int: """Add a triangulation to the viewer and return the add result.""" return self._require_viewer().add_object( 'triangulation', filename=filename, newobj=newobj, ToCheck=ToCheck, id=id )
# ------------------------------------------------------------------ # Lifecycle and teardown # ------------------------------------------------------------------
[docs] def destroy(self) -> None: """Unregister actions, remove menus, and detach the proxy. This is the main teardown hook used by the plugin manager and by the notebook re-execution cleanup path. """ if self._viewer is None: self._registered_action_ids.clear() self._multistep_specs.clear() self._multistep_step_index.clear() self._active_multistep_action_id = None return self.menu_destroy() for action_id in list(self._registered_action_ids): try: self._viewer.unregister_action(action_id) except Exception: logging.debug("%s.destroy: could not unregister action '%s'", self._name, action_id) self._registered_action_ids.clear() self._multistep_specs.clear() self._multistep_step_index.clear() self._active_multistep_action_id = None reg = getattr(self._viewer, '_companion_proxy_registry', None) key = self._proxy_registry_key() if isinstance(reg, dict) and reg.get(key) is self: reg.pop(key, None) self.force_redraw()
[docs] def menu_destroy(self) -> None: """Remove menus and appended items that were registered by the proxy.""" if self._menu is not None: # Remove this menu from viewer-level de-dup registry when owned. reg = getattr(self._viewer, '_companion_menu_registry', None) if isinstance(reg, dict): stale_keys: list[tuple[str, str, str]] = [] for k, entry in reg.items(): if isinstance(entry, dict) and entry.get('menu') is self._menu: stale_keys.append(k) for k in stale_keys: reg.pop(k, None) if self._menu_host == 'top_level': import wx menubar: 'wx.MenuBar' = self._viewer.menubar for pos in range(menubar.GetMenuCount()): if menubar.GetMenu(pos) is self._menu: menubar.Remove(pos) break else: logging.debug("%s.menu_destroy: menu not found in menubar (already removed?)", self._name) elif self._menu_host == 'companions_root': if self._menu_parent is not None and self._menu_parent_item is not None: try: self._menu_parent.Remove(self._menu_parent_item) except Exception: logging.debug("%s.menu_destroy: companion submenu already removed?", self._name) self._cleanup_companions_root_menu_if_empty() self._menu = None self._menu_host = 'top_level' self._menu_parent = None self._menu_parent_item = None for menu, item in list(self._appended_items): try: menu.Remove(item) except Exception: logging.debug("%s.menu_destroy: could not remove appended item '%s'", self._name, item) self._appended_items.clear()
# ------------------------------------------------------------------ # Action registration and multi-step dispatch # ------------------------------------------------------------------
[docs] def _resolve_action_id(self, action_id: 'str | ActionKind') -> str: """Return a fully-qualified (namespaced) action id string. If *action_id* is an :class:`ActionKind` enum it is converted to its value. If the resulting string contains no ``'.'``, it is treated as a *local* id and the companion namespace is prepended automatically:: rt._resolve_action_id('pick') # → 'mycompanion.pick' rt._resolve_action_id('mycompanion.pick') # → 'mycompanion.pick' rt._resolve_action_id(ActionKind.SCULPT) # → 'sculpt' (enum value kept) This means companion code can simply write ``self.proxy.register_action('pick', …)`` without a preceding ``self.proxy.action_id('pick')`` call. """ raw = ( action_id.value if isinstance(action_id, ActionKind) else str(action_id) ).lower() if '.' not in raw and self._namespace: return f"{self._namespace}.{raw}" return raw
[docs] def resolve_action_id(self, action_id: 'str | ActionKind') -> str: """Public resolver for local/namespaced action ids. Declarative objects (e.g. ``MultiStepAction``) should use this method instead of duplicating namespacing logic. """ return self._resolve_action_id(action_id)
@staticmethod
[docs] def _accepts_n_or_more_positional(fn: Callable, n: int) -> bool: """Return True when *fn* can consume at least *n* positional args.""" try: sig = inspect.signature(fn) except (TypeError, ValueError): return True params = list(sig.parameters.values()) if any(p.kind == inspect.Parameter.VAR_POSITIONAL for p in params): return True positional = [ p for p in params if p.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD) ] return len(positional) >= n
[docs] def _adapt_mouse_handler(self, handler: Optional[Callable]) -> Optional[Callable]: """Adapt mouse handlers to support both ``(viewer, ctx)`` and ``(ctx)``.""" if handler is None: return None if self._accepts_n_or_more_positional(handler, 2): return handler def _wrapped(_viewer, ctx): return handler(ctx) return _wrapped
[docs] def _adapt_key_handler(self, handler: Optional[Callable]) -> Optional[Callable]: """Adapt key handlers to support both ``(viewer, kb)`` and ``(kb)``.""" if handler is None: return None if self._accepts_n_or_more_positional(handler, 2): return handler def _wrapped(_viewer, kb): return handler(kb) return _wrapped
[docs] def _adapt_paint_handler(self, handler: Optional[Callable]) -> Optional[Callable]: """Adapt paint handlers to support both ``(viewer)`` and ``()``.""" if handler is None: return None if self._accepts_n_or_more_positional(handler, 1): return handler def _wrapped(_viewer): return handler() return _wrapped
[docs] def register_action(self, action_id: str | ActionKind, *, rdown: Optional[Callable] = None, motion: Optional[Callable] = None, ldown: Optional[Callable] = None, key: Optional[Callable] = None, paint: Optional[Callable] = None, overload: bool = False, ) -> None: """Register interactive handlers for an action id. Supported callback signatures: - ``rdown/motion/ldown``: ``(viewer, ctx)`` or ``(ctx)`` - ``key``: ``(viewer, kb)`` or ``(kb)`` - ``paint``: ``(viewer)`` or ``()`` This allows companion code to ignore direct viewer access and rely on ``self.proxy`` for viewer interactions. """ key_str = self._resolve_action_id(action_id) self._viewer.register_action( key_str, rdown_handler=self._adapt_mouse_handler(rdown), motion_handler=self._adapt_mouse_handler(motion), ldown_handler=self._adapt_mouse_handler(ldown), key_handler=self._adapt_key_handler(key), paint_handler=self._adapt_paint_handler(paint), overload=overload, ) if key_str not in self._registered_action_ids: self._registered_action_ids.append(key_str)
[docs] def register_actions(self, specs: Iterable['ActionSpec | MultiStepSpec']) -> None: """Register a declarative batch of ActionSpec and/or MultiStepSpec.""" from .types import ActionSpec, MultiStepSpec for spec in specs: if isinstance(spec, ActionSpec): self.register_action( spec.action_id, rdown=spec.rdown, motion=spec.motion, ldown=spec.ldown, key=spec.key, paint=spec.paint, overload=spec.overload, ) continue if isinstance(spec, MultiStepSpec): self.register_multistep_action(spec) continue raise TypeError( f"ViewerProxy.register_actions(): unsupported spec type {type(spec)!r}. " "Expected ActionSpec or MultiStepSpec." )
[docs] def register_multistep_action(self, spec: 'MultiStepSpec') -> None: """Register one declarative multi-step action with runtime step dispatch.""" key_str = self._resolve_action_id(spec.action_id) self._multistep_specs[key_str] = spec self._multistep_step_index[key_str] = 0 self.register_action( key_str, rdown=self._build_multistep_mouse_dispatch(key_str, 'rdown'), motion=self._build_multistep_mouse_dispatch(key_str, 'motion'), ldown=self._build_multistep_mouse_dispatch(key_str, 'ldown'), key=self._build_multistep_key_dispatch(key_str), paint=self._build_multistep_paint_dispatch(key_str), overload=spec.overload, )
[docs] def _current_multistep_step(self, action_id: str): """Return the currently active step object for a multi-step action.""" spec = self._multistep_specs[action_id] idx = self._multistep_step_index.get(action_id, 0) if idx < 0: idx = 0 if idx >= len(spec.steps): idx = len(spec.steps) - 1 return spec.steps[idx]
[docs] def _coerce_transition(self, result) -> 'object': """Normalize a handler return value into a :class:`StepTransition`.""" from .types import StepTransition if isinstance(result, StepTransition): return result if isinstance(result, str): try: return StepTransition(result.lower()) except ValueError: return StepTransition.NONE return StepTransition.NONE
[docs] def _apply_multistep_transition(self, action_id: str, result) -> None: """Apply a handler-driven transition to a multi-step action.""" from .types import StepTransition transition = self._coerce_transition(result) if transition is StepTransition.NONE: return if transition is StepTransition.NEXT: self._advance_multistep(action_id) return if transition is StepTransition.FINISH: self._finish_multistep(action_id) return if transition is StepTransition.CANCEL: self._cancel_multistep(action_id)
[docs] def _advance_multistep(self, action_id: str) -> None: """Advance a multi-step action to its next step when possible.""" spec = self._multistep_specs[action_id] current = self._multistep_step_index.get(action_id, 0) nxt = current + 1 if nxt >= len(spec.steps): self._finish_multistep(action_id) return self._multistep_step_index[action_id] = nxt hint = spec.steps[nxt].hint if hint: self._viewer.start_action(action_id, hint)
[docs] def _finish_multistep(self, action_id: str) -> None: """Finish a multi-step action and reset its runtime state.""" spec = self._multistep_specs.get(action_id) self._multistep_step_index[action_id] = 0 self._active_multistep_action_id = None self.end_action(spec.finish_message if spec is not None else '')
[docs] def _cancel_multistep(self, action_id: str) -> None: """Cancel a multi-step action and clear its runtime state.""" self._multistep_step_index[action_id] = 0 self._active_multistep_action_id = None self.end_action()
[docs] def _build_multistep_mouse_dispatch(self, action_id: str, event_name: str) -> Callable: """Build a mouse-event dispatcher for one step-driven action.""" def _dispatch(viewer, ctx): step = self._current_multistep_step(action_id) handler = getattr(step, event_name) if handler is None: return None adapted = self._adapt_mouse_handler(handler) result = adapted(viewer, ctx) self._apply_multistep_transition(action_id, result) return result return _dispatch
[docs] def _build_multistep_key_dispatch(self, action_id: str) -> Callable: """Build a keyboard-event dispatcher for one step-driven action.""" def _dispatch(viewer, kb): step = self._current_multistep_step(action_id) handler = step.key if handler is None: return None adapted = self._adapt_key_handler(handler) result = adapted(viewer, kb) self._apply_multistep_transition(action_id, result) return result return _dispatch
[docs] def _build_multistep_paint_dispatch(self, action_id: str) -> Callable: """Build a paint dispatcher for one step-driven action.""" def _dispatch(viewer): step = self._current_multistep_step(action_id) handler = step.paint if handler is None: return None adapted = self._adapt_paint_handler(handler) return adapted(viewer) return _dispatch
[docs] def unregister_action(self, action_id: str | ActionKind) -> None: """Unregister a previously registered action id.""" key_str = self._resolve_action_id(action_id) try: self._viewer.unregister_action(key_str) except Exception: logging.debug("%s._unregister_action: could not unregister '%s'", self._name, key_str) if key_str in self._registered_action_ids: self._registered_action_ids.remove(key_str) self._multistep_specs.pop(key_str, None) self._multistep_step_index.pop(key_str, None) if self._active_multistep_action_id == key_str: self._active_multistep_action_id = None
[docs] def start_action(self, action_id: str | ActionKind, message: str = '') -> None: """Start an interaction on the viewer, honoring multi-step actions.""" key_str = self._resolve_action_id(action_id) if key_str in self._multistep_specs: self._multistep_step_index[key_str] = 0 self._active_multistep_action_id = key_str if not message: message = self._multistep_specs[key_str].effective_start_message else: self._active_multistep_action_id = None self._viewer.start_action(key_str, message)
[docs] def end_action(self, message: str = '') -> None: """End the current viewer interaction.""" if self._active_multistep_action_id is not None: self._multistep_step_index[self._active_multistep_action_id] = 0 self._active_multistep_action_id = None self._viewer.end_action(message)
[docs] def require_active_array(self) -> bool: """Warn and return ``False`` when no active array is selected.""" if self._viewer.active_array is None: logging.warning(_("No active array - activate/select one first")) return False return True
@staticmethod
[docs] def require(condition: bool, warning: str) -> bool: """Emit a warning and return the truth value of *condition*.""" if not condition: logging.warning(warning) return bool(condition)
# ------------------------------------------------------------------ # Dialog and user interaction helpers # ------------------------------------------------------------------
[docs] def show_error(self, message: str, title: str = '') -> None: """Show an error dialog. :param message: Message body shown to the user. :param title: Optional dialog title. """ from ..dialog_provider import DialogStyles self._dialogs.show_message( message, caption=title or _("Error"), style=DialogStyles.OK | DialogStyles.ICON_ERROR, parent=self._viewer, )
[docs] def show_info(self, message: str, title: str = '') -> None: """Show an informational dialog.""" self._dialogs.show_message( message, caption=title or _("Information"), parent=self._viewer, )
[docs] def confirm(self, message: str, title: str = '', default: str = 'yes') -> bool: """Ask the user for a yes/no style confirmation.""" return self._dialogs.ask_confirmation( message, caption=title or _("Confirm"), default=default, parent=self._viewer, )
[docs] def ask_text(self, message: str, title: str = '', default: str = '') -> str | None: """Ask the user for free-form text.""" return self._dialogs.ask_text( message, caption=title, default=default, parent=self._viewer, )
[docs] def ask_float(self, message: str, title: str = '', default: float | str = '') -> float | None: """Ask the user for a floating-point value.""" return self._dialogs.ask_float( message, caption=title, default=default, parent=self._viewer, )
[docs] def ask_integer(self, message: str, title: str = '', prompt: str = '', default: int = 0, min_value: int = 0, max_value: int = 0, ) -> int | None: """Ask the user for an integer value.""" return self._dialogs.ask_integer( message, prompt=prompt, caption=title, default=default, min_value=min_value, max_value=max_value, parent=self._viewer, )
[docs] def ask_single_choice(self, message: str, title: str, choices: list[str], preselected: int | None = None, ) -> str | None: """Ask the user to pick one value from a finite list of choices.""" return self._dialogs.ask_single_choice( message, caption=title, choices=choices, preselected=preselected, parent=self._viewer, )
[docs] def ask_file_open(self, message: str, wildcard: str = 'all (*.*)|*.*', default_path: str = '', ) -> str | None: """Open a file-selection dialog and return the chosen path.""" return self._dialogs.ask_file_open( message, wildcard=wildcard, default_path=default_path, parent=self._viewer, )
[docs] def ask_file_save(self, message: str, wildcard: str = 'all (*.*)|*.*', default_path: str = '', default_file: str = '', ) -> str | None: """Open a save-file dialog and return the chosen path.""" return self._dialogs.ask_file_save( message, wildcard=wildcard, default_path=default_path, default_file=default_file, parent=self._viewer, )
[docs] def ask_directory(self, message: str, default_path: str = '') -> str | None: """Open a directory picker and return the chosen path.""" return self._dialogs.ask_directory( message, default_path=default_path, parent=self._viewer, )
[docs] def run_with_progress(self, title: str, steps: list[tuple[int, str, Callable]], ) -> None: """Run a list of callables while updating a progress dialog.""" handle = self._dialogs.create_progress( title, _("Working..."), maximum=100, parent=self._viewer, ) try: for percent, label, fn in steps: handle.update(percent, label) fn() self._viewer.Refresh() finally: handle.close()
# ------------------------------------------------------------------ # Viewer feedback and OpenGL drawing helpers # ------------------------------------------------------------------
[docs] def set_status(self, message: str) -> None: """Set the viewer status bar text.""" self._viewer.set_statusbar_text(message)
[docs] def force_redraw(self) -> None: """Trigger a repaint of the viewer canvas.""" self._viewer.Paint()
[docs] def viewport_fraction(self, fraction: float = 0.008) -> float: """Return a fraction of the current viewport width.""" return (self._viewer.xmax - self._viewer.xmin) * fraction
@staticmethod
[docs] def draw_crosses( points: Iterable[tuple[float, float]], half_size: float, color: tuple[float, float, float, float] = (1.0, 0.0, 0.0, 1.0), *, selected_idx: int = -1, selected_color: tuple[float, float, float, float] = (1.0, 0.8, 0.0, 1.0), line_width: float = 2.0, ) -> None: """Draw cross markers at the given points using OpenGL primitives.""" from OpenGL.GL import ( glBegin, glEnd, glVertex2f, glColor4f, glLineWidth, GL_LINES, ) pts = list(points) if not pts: return glLineWidth(line_width) glBegin(GL_LINES) for i, (x, y) in enumerate(pts): col = selected_color if i == selected_idx else color glColor4f(*col) glVertex2f(x - half_size, y) glVertex2f(x + half_size, y) glVertex2f(x, y - half_size) glVertex2f(x, y + half_size) glEnd() glLineWidth(1.0)
@staticmethod
[docs] def draw_polyline( points: Iterable[tuple[float, float]], color: tuple[float, float, float, float] = (0.0, 0.5, 1.0, 1.0), *, closed: bool = False, line_width: float = 1.5, ) -> None: """Draw a polyline or closed loop using OpenGL primitives.""" from OpenGL.GL import ( glBegin, glEnd, glVertex2f, glColor4f, glLineWidth, GL_LINE_STRIP, GL_LINE_LOOP, ) pts = list(points) if len(pts) < 2: return primitive = GL_LINE_LOOP if closed else GL_LINE_STRIP glLineWidth(line_width) glColor4f(*color) glBegin(primitive) for x, y in pts: glVertex2f(x, y) glEnd() glLineWidth(1.0)
@staticmethod
[docs] def draw_segments( segments: Iterable[tuple[float, float, float, float]], color: tuple[float, float, float, float] = (1.0, 1.0, 0.0, 1.0), *, line_width: float = 1.5, ) -> None: """Draw independent line segments using OpenGL primitives.""" from OpenGL.GL import ( glBegin, glEnd, glVertex2f, glColor4f, glLineWidth, GL_LINES, ) segs = list(segments) if not segs: return glLineWidth(line_width) glColor4f(*color) glBegin(GL_LINES) for x1, y1, x2, y2 in segs: glVertex2f(x1, y1) glVertex2f(x2, y2) glEnd() glLineWidth(1.0)
@staticmethod
[docs] def draw_points( points: Iterable[tuple[float, float]], color: tuple[float, float, float, float] = (1.0, 0.0, 0.0, 1.0), *, point_size: float = 4.0, ) -> None: """Draw points using OpenGL point primitives.""" from OpenGL.GL import ( glBegin, glEnd, glVertex2f, glColor4f, glPointSize, GL_POINTS, ) pts = list(points) if not pts: return glPointSize(point_size) glColor4f(*color) glBegin(GL_POINTS) for x, y in pts: glVertex2f(x, y) glEnd() glPointSize(1.0)