Source code for wolfhece.opengl.cloud_points_shader2d

"""
Shared-resource shader for cloud point rendering.

Draws cloud vertices as GL_POINTS with configurable shape in fragment shader
(point, circle, cross, quad), color and fill mode.
"""

from __future__ import annotations

import ctypes
import importlib
import io
import logging
from pathlib import Path
from PIL import Image

import numpy as np

from OpenGL.GL import (
    glCreateShader, glShaderSource, glCompileShader, glGetShaderiv,
    glGetShaderInfoLog, glDeleteShader,
    glCreateProgram, glAttachShader, glLinkProgram, glGetProgramiv,
    glGetProgramInfoLog, glDeleteProgram,
    glUseProgram, glGetUniformLocation,
    glGenVertexArrays, glBindVertexArray, glDeleteVertexArrays,
    glGenBuffers, glBindBuffer, glBufferData, glDeleteBuffers,
    glEnableVertexAttribArray, glVertexAttribPointer,
    glDrawArrays, glPointSize,
    glEnable, glDisable, glIsEnabled, glBlendFunc,
    glUniform1f, glUniform1i, glUniform4f, glUniformMatrix4fv,
    glGenTextures, glBindTexture, glDeleteTextures,
    glTexImage2D, glTexParameteri, glGenerateMipmap,
    glActiveTexture,
    GL_VERTEX_SHADER, GL_FRAGMENT_SHADER,
    GL_COMPILE_STATUS, GL_LINK_STATUS, GL_FALSE,
    GL_ARRAY_BUFFER, GL_DYNAMIC_DRAW,
    GL_POINTS, GL_FLOAT,
    GL_BLEND, GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA,
    GL_DEPTH_TEST, GL_PROGRAM_POINT_SIZE,
    GL_POINT_SPRITE,
    GL_TEXTURE_2D, GL_TEXTURE0,
    GL_RGBA, GL_UNSIGNED_BYTE,
    GL_TEXTURE_MAG_FILTER, GL_TEXTURE_MIN_FILTER,
    GL_TEXTURE_WRAP_S, GL_TEXTURE_WRAP_T,
    GL_LINEAR, GL_LINEAR_MIPMAP_LINEAR,
    GL_CLAMP_TO_EDGE,
)

[docs] SHADER_DIR = Path(__file__).resolve().parent.parent / "shaders"
[docs] SYMBOLS_DIR = Path(__file__).resolve().parent.parent / "symbols"
[docs] class CloudPointsShader2D: """Singleton shader for point-cloud rendering."""
[docs] _instance: "CloudPointsShader2D | None" = None
def __init__(self):
[docs] self._program: int | None = None
[docs] self._locs: dict | None = None
[docs] self._vao: int | None = None
[docs] self._vao_pt: int | None = None # per-point transform layout [x,y,rot,scale]
[docs] self._vbo: int | None = None
[docs] self._texture_cache: dict[tuple[str, float], int] = {}
@classmethod
[docs] def get_instance(cls) -> "CloudPointsShader2D": if cls._instance is None: cls._instance = cls() return cls._instance
[docs] def _init_program(self): if self._program is not None: return vs = self._compile(GL_VERTEX_SHADER, SHADER_DIR / "cloud_points_vertex.glsl") fs = self._compile(GL_FRAGMENT_SHADER, SHADER_DIR / "cloud_points_frag.glsl") prog = glCreateProgram() glAttachShader(prog, vs) glAttachShader(prog, fs) glLinkProgram(prog) if glGetProgramiv(prog, GL_LINK_STATUS) == GL_FALSE: info = glGetProgramInfoLog(prog) glDeleteProgram(prog) glDeleteShader(vs) glDeleteShader(fs) raise RuntimeError(f"Cloud points shader link failed: {info}") glDeleteShader(vs) glDeleteShader(fs) self._program = prog glUseProgram(prog) self._locs = { 'mvp': glGetUniformLocation(prog, 'mvp'), 'pointSize': glGetUniformLocation(prog, 'pointSize'), 'pointColor': glGetUniformLocation(prog, 'pointColor'), 'pointStyle': glGetUniformLocation(prog, 'pointStyle'), 'pointFilled': glGetUniformLocation(prog, 'pointFilled'), 'useSymbolTex': glGetUniformLocation(prog, 'useSymbolTex'), 'symbolTex': glGetUniformLocation(prog, 'symbolTex'), 'symbolRotation': glGetUniformLocation(prog, 'symbolRotation'), 'symbolScale': glGetUniformLocation(prog, 'symbolScale'), 'perPointTransform': glGetUniformLocation(prog, 'perPointTransform'), } glUseProgram(0)
@staticmethod
[docs] def _compile(shader_type: int, path: Path) -> int: shader = glCreateShader(shader_type) with open(path, 'r') as f: glShaderSource(shader, f.read()) glCompileShader(shader) if glGetShaderiv(shader, GL_COMPILE_STATUS) == GL_FALSE: info = glGetShaderInfoLog(shader) glDeleteShader(shader) raise RuntimeError(f"Shader compile error ({path.name}): {info}") return shader
[docs] def _ensure_vao_vbo(self): if self._vao is not None: return self._vao = glGenVertexArrays(1) self._vbo = glGenBuffers(1) glBindVertexArray(self._vao) glBindBuffer(GL_ARRAY_BUFFER, self._vbo) glEnableVertexAttribArray(0) glVertexAttribPointer(0, 2, GL_FLOAT, GL_FALSE, 2 * 4, ctypes.c_void_p(0)) glBindVertexArray(0) # Per-point transform VAO: layout [x, y, rotation, scale] — stride = 16 bytes self._vao_pt = glGenVertexArrays(1) glBindVertexArray(self._vao_pt) glBindBuffer(GL_ARRAY_BUFFER, self._vbo) glEnableVertexAttribArray(0) glVertexAttribPointer(0, 2, GL_FLOAT, GL_FALSE, 4 * 4, ctypes.c_void_p(0)) glEnableVertexAttribArray(1) glVertexAttribPointer(1, 1, GL_FLOAT, GL_FALSE, 4 * 4, ctypes.c_void_p(2 * 4)) glEnableVertexAttribArray(2) glVertexAttribPointer(2, 1, GL_FLOAT, GL_FALSE, 4 * 4, ctypes.c_void_p(3 * 4)) glBindVertexArray(0) glBindBuffer(GL_ARRAY_BUFFER, 0)
[docs] def _upload(self, pts_xy: np.ndarray): glBindBuffer(GL_ARRAY_BUFFER, self._vbo) glBufferData(GL_ARRAY_BUFFER, pts_xy.nbytes, pts_xy, GL_DYNAMIC_DRAW) glBindBuffer(GL_ARRAY_BUFFER, 0)
[docs] def _get_symbol_texture_id(self, symbol_path: str | None, symbol_raster_size: int = 128) -> int | None: if not symbol_path: return None path = Path(symbol_path) if not path.exists(): logging.warning('CloudPointsShader2D: symbol texture not found: %s', path) return None raster_size = int(max(16, min(int(symbol_raster_size), 2048))) try: key = (str(path.resolve()), float(path.stat().st_mtime), raster_size) except OSError: key = (str(path), 0.0, raster_size) if key in self._texture_cache: return self._texture_cache[key] if path.suffix.lower() == '.svg': try: cairosvg = importlib.import_module('cairosvg') svg2png = cairosvg.svg2png except Exception: logging.warning( 'CloudPointsShader2D: SVG symbol requires cairosvg (missing for %s)', path, ) return None png_bytes = svg2png( url=str(path), output_width=raster_size, output_height=raster_size, ) with Image.open(io.BytesIO(png_bytes)) as src: image = src.convert('RGBA') else: with Image.open(path) as src: image = src.convert('RGBA') tex_id = glGenTextures(1) glBindTexture(GL_TEXTURE_2D, tex_id) glTexImage2D( GL_TEXTURE_2D, 0, GL_RGBA, image.width, image.height, 0, GL_RGBA, GL_UNSIGNED_BYTE, image.tobytes(), ) glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MAG_FILTER, GL_LINEAR) glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MIN_FILTER, GL_LINEAR_MIPMAP_LINEAR) glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_WRAP_S, GL_CLAMP_TO_EDGE) glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_WRAP_T, GL_CLAMP_TO_EDGE) glGenerateMipmap(GL_TEXTURE_2D) glBindTexture(GL_TEXTURE_2D, 0) self._texture_cache[key] = tex_id return tex_id
[docs] def draw_points(self, pts_xy: np.ndarray, mvp: np.ndarray, point_size_px: float, color: tuple[float, float, float, float], style: int, filled: bool, symbol_path: str | None = None, symbol_raster_size: int = 128, symbol_rotation: float = 0.0, symbol_scale: float = 1.0, pts_transform: np.ndarray | None = None): """Draw cloud points with the shader pipeline. Args: symbol_rotation: Per-cloud rotation in **radians** (CCW, Option A). Ignored when *pts_transform* is provided. symbol_scale: Per-cloud scale factor (Option A). Ignored when *pts_transform* is provided. pts_transform: Optional ``(N, 2)`` float32 array with per-point ``[rotation_rad, scale]`` columns (Option B). When provided, overrides *symbol_rotation* and *symbol_scale* on a per-point basis and uses the dedicated 4-float VAO layout. """ if pts_xy.size == 0: return self._init_program() self._ensure_vao_vbo() mvp = np.ascontiguousarray(mvp, dtype=np.float32) pts_xy = np.ascontiguousarray(pts_xy, dtype=np.float32) per_point = pts_transform is not None if per_point: n = int(pts_xy.shape[0]) tr = np.ascontiguousarray(pts_transform, dtype=np.float32) if tr.shape != (n, 2): raise ValueError( f'pts_transform shape {tr.shape!r} must be ({n}, 2)' ) self._upload(np.concatenate([pts_xy, tr], axis=1)) # (N, 4) else: self._upload(pts_xy) depth_was_enabled = glIsEnabled(GL_DEPTH_TEST) if depth_was_enabled: glDisable(GL_DEPTH_TEST) glEnable(GL_BLEND) glBlendFunc(GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA) prog_point_size_was_enabled = glIsEnabled(GL_PROGRAM_POINT_SIZE) if not prog_point_size_was_enabled: glEnable(GL_PROGRAM_POINT_SIZE) # Compatibility profiles may require POINT_SPRITE for valid gl_PointCoord. point_sprite_was_enabled = False try: point_sprite_was_enabled = glIsEnabled(GL_POINT_SPRITE) if not point_sprite_was_enabled: glEnable(GL_POINT_SPRITE) except Exception: point_sprite_was_enabled = True glUseProgram(self._program) locs = self._locs glUniformMatrix4fv(locs['mvp'], 1, GL_FALSE, mvp) glUniform1f(locs['pointSize'], float(max(1.0, min(point_size_px, 256.0)))) glUniform4f(locs['pointColor'], *color) glUniform1i(locs['pointStyle'], int(style)) glUniform1i(locs['pointFilled'], 1 if filled else 0) glUniform1f(locs['symbolRotation'], float(symbol_rotation)) glUniform1f(locs['symbolScale'], float(max(0.01, symbol_scale))) glUniform1i(locs['perPointTransform'], 1 if per_point else 0) symbol_tex_id = ( self._get_symbol_texture_id(symbol_path, symbol_raster_size) if int(style) == 4 else None ) glUniform1i(locs['useSymbolTex'], 1 if symbol_tex_id is not None else 0) glUniform1i(locs['symbolTex'], 0) glActiveTexture(GL_TEXTURE0) glBindTexture(GL_TEXTURE_2D, 0 if symbol_tex_id is None else symbol_tex_id) active_vao = self._vao_pt if per_point else self._vao glBindVertexArray(active_vao) glDrawArrays(GL_POINTS, 0, int(pts_xy.shape[0])) glBindVertexArray(0) glBindTexture(GL_TEXTURE_2D, 0) glUseProgram(0) glDisable(GL_BLEND) if not point_sprite_was_enabled: glDisable(GL_POINT_SPRITE) if not prog_point_size_was_enabled: glDisable(GL_PROGRAM_POINT_SIZE) if depth_was_enabled: glEnable(GL_DEPTH_TEST)
[docs] def destroy(self): if self._texture_cache: for tex_id in self._texture_cache.values(): try: glDeleteTextures(1, [tex_id]) except Exception: logging.debug('CloudPointsShader2D texture deletion failed', exc_info=True) self._texture_cache = {} if self._vao is not None: try: glDeleteVertexArrays(1, [self._vao]) except Exception: logging.debug('CloudPointsShader2D VAO deletion failed', exc_info=True) self._vao = None if self._vao_pt is not None: try: glDeleteVertexArrays(1, [self._vao_pt]) except Exception: logging.debug('CloudPointsShader2D VAO_pt deletion failed', exc_info=True) self._vao_pt = None if self._vbo is not None: try: glDeleteBuffers(1, [self._vbo]) except Exception: logging.debug('CloudPointsShader2D VBO deletion failed', exc_info=True) self._vbo = None if self._program is not None: try: glDeleteProgram(self._program) except Exception: logging.debug('CloudPointsShader2D program deletion failed', exc_info=True) self._program = None self._locs = None CloudPointsShader2D._instance = None