"""
Shared-resource shader for cloud point rendering.
Draws cloud vertices as GL_POINTS with configurable shape in fragment shader
(point, circle, cross, quad), color and fill mode.
"""
from __future__ import annotations
import ctypes
import importlib
import io
import logging
from pathlib import Path
from PIL import Image
import numpy as np
from OpenGL.GL import (
glCreateShader, glShaderSource, glCompileShader, glGetShaderiv,
glGetShaderInfoLog, glDeleteShader,
glCreateProgram, glAttachShader, glLinkProgram, glGetProgramiv,
glGetProgramInfoLog, glDeleteProgram,
glUseProgram, glGetUniformLocation,
glGenVertexArrays, glBindVertexArray, glDeleteVertexArrays,
glGenBuffers, glBindBuffer, glBufferData, glDeleteBuffers,
glEnableVertexAttribArray, glVertexAttribPointer,
glDrawArrays, glPointSize,
glEnable, glDisable, glIsEnabled, glBlendFunc,
glUniform1f, glUniform1i, glUniform4f, glUniformMatrix4fv,
glGenTextures, glBindTexture, glDeleteTextures,
glTexImage2D, glTexParameteri, glGenerateMipmap,
glActiveTexture,
GL_VERTEX_SHADER, GL_FRAGMENT_SHADER,
GL_COMPILE_STATUS, GL_LINK_STATUS, GL_FALSE,
GL_ARRAY_BUFFER, GL_DYNAMIC_DRAW,
GL_POINTS, GL_FLOAT,
GL_BLEND, GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA,
GL_DEPTH_TEST, GL_PROGRAM_POINT_SIZE,
GL_POINT_SPRITE,
GL_TEXTURE_2D, GL_TEXTURE0,
GL_RGBA, GL_UNSIGNED_BYTE,
GL_TEXTURE_MAG_FILTER, GL_TEXTURE_MIN_FILTER,
GL_TEXTURE_WRAP_S, GL_TEXTURE_WRAP_T,
GL_LINEAR, GL_LINEAR_MIPMAP_LINEAR,
GL_CLAMP_TO_EDGE,
)
[docs]
SHADER_DIR = Path(__file__).resolve().parent.parent / "shaders"
[docs]
SYMBOLS_DIR = Path(__file__).resolve().parent.parent / "symbols"
[docs]
class CloudPointsShader2D:
"""Singleton shader for point-cloud rendering."""
[docs]
_instance: "CloudPointsShader2D | None" = None
def __init__(self):
[docs]
self._program: int | None = None
[docs]
self._locs: dict | None = None
[docs]
self._vao: int | None = None
[docs]
self._vao_pt: int | None = None # per-point transform layout [x,y,rot,scale]
[docs]
self._vbo: int | None = None
[docs]
self._texture_cache: dict[tuple[str, float], int] = {}
@classmethod
[docs]
def get_instance(cls) -> "CloudPointsShader2D":
if cls._instance is None:
cls._instance = cls()
return cls._instance
[docs]
def _init_program(self):
if self._program is not None:
return
vs = self._compile(GL_VERTEX_SHADER, SHADER_DIR / "cloud_points_vertex.glsl")
fs = self._compile(GL_FRAGMENT_SHADER, SHADER_DIR / "cloud_points_frag.glsl")
prog = glCreateProgram()
glAttachShader(prog, vs)
glAttachShader(prog, fs)
glLinkProgram(prog)
if glGetProgramiv(prog, GL_LINK_STATUS) == GL_FALSE:
info = glGetProgramInfoLog(prog)
glDeleteProgram(prog)
glDeleteShader(vs)
glDeleteShader(fs)
raise RuntimeError(f"Cloud points shader link failed: {info}")
glDeleteShader(vs)
glDeleteShader(fs)
self._program = prog
glUseProgram(prog)
self._locs = {
'mvp': glGetUniformLocation(prog, 'mvp'),
'pointSize': glGetUniformLocation(prog, 'pointSize'),
'pointColor': glGetUniformLocation(prog, 'pointColor'),
'pointStyle': glGetUniformLocation(prog, 'pointStyle'),
'pointFilled': glGetUniformLocation(prog, 'pointFilled'),
'useSymbolTex': glGetUniformLocation(prog, 'useSymbolTex'),
'symbolTex': glGetUniformLocation(prog, 'symbolTex'),
'symbolRotation': glGetUniformLocation(prog, 'symbolRotation'),
'symbolScale': glGetUniformLocation(prog, 'symbolScale'),
'perPointTransform': glGetUniformLocation(prog, 'perPointTransform'),
}
glUseProgram(0)
@staticmethod
[docs]
def _compile(shader_type: int, path: Path) -> int:
shader = glCreateShader(shader_type)
with open(path, 'r') as f:
glShaderSource(shader, f.read())
glCompileShader(shader)
if glGetShaderiv(shader, GL_COMPILE_STATUS) == GL_FALSE:
info = glGetShaderInfoLog(shader)
glDeleteShader(shader)
raise RuntimeError(f"Shader compile error ({path.name}): {info}")
return shader
[docs]
def _ensure_vao_vbo(self):
if self._vao is not None:
return
self._vao = glGenVertexArrays(1)
self._vbo = glGenBuffers(1)
glBindVertexArray(self._vao)
glBindBuffer(GL_ARRAY_BUFFER, self._vbo)
glEnableVertexAttribArray(0)
glVertexAttribPointer(0, 2, GL_FLOAT, GL_FALSE, 2 * 4, ctypes.c_void_p(0))
glBindVertexArray(0)
# Per-point transform VAO: layout [x, y, rotation, scale] — stride = 16 bytes
self._vao_pt = glGenVertexArrays(1)
glBindVertexArray(self._vao_pt)
glBindBuffer(GL_ARRAY_BUFFER, self._vbo)
glEnableVertexAttribArray(0)
glVertexAttribPointer(0, 2, GL_FLOAT, GL_FALSE, 4 * 4, ctypes.c_void_p(0))
glEnableVertexAttribArray(1)
glVertexAttribPointer(1, 1, GL_FLOAT, GL_FALSE, 4 * 4, ctypes.c_void_p(2 * 4))
glEnableVertexAttribArray(2)
glVertexAttribPointer(2, 1, GL_FLOAT, GL_FALSE, 4 * 4, ctypes.c_void_p(3 * 4))
glBindVertexArray(0)
glBindBuffer(GL_ARRAY_BUFFER, 0)
[docs]
def _upload(self, pts_xy: np.ndarray):
glBindBuffer(GL_ARRAY_BUFFER, self._vbo)
glBufferData(GL_ARRAY_BUFFER, pts_xy.nbytes, pts_xy, GL_DYNAMIC_DRAW)
glBindBuffer(GL_ARRAY_BUFFER, 0)
[docs]
def _get_symbol_texture_id(self, symbol_path: str | None,
symbol_raster_size: int = 128) -> int | None:
if not symbol_path:
return None
path = Path(symbol_path)
if not path.exists():
logging.warning('CloudPointsShader2D: symbol texture not found: %s', path)
return None
raster_size = int(max(16, min(int(symbol_raster_size), 2048)))
try:
key = (str(path.resolve()), float(path.stat().st_mtime), raster_size)
except OSError:
key = (str(path), 0.0, raster_size)
if key in self._texture_cache:
return self._texture_cache[key]
if path.suffix.lower() == '.svg':
try:
cairosvg = importlib.import_module('cairosvg')
svg2png = cairosvg.svg2png
except Exception:
logging.warning(
'CloudPointsShader2D: SVG symbol requires cairosvg (missing for %s)',
path,
)
return None
png_bytes = svg2png(
url=str(path),
output_width=raster_size,
output_height=raster_size,
)
with Image.open(io.BytesIO(png_bytes)) as src:
image = src.convert('RGBA')
else:
with Image.open(path) as src:
image = src.convert('RGBA')
tex_id = glGenTextures(1)
glBindTexture(GL_TEXTURE_2D, tex_id)
glTexImage2D(
GL_TEXTURE_2D,
0,
GL_RGBA,
image.width,
image.height,
0,
GL_RGBA,
GL_UNSIGNED_BYTE,
image.tobytes(),
)
glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MAG_FILTER, GL_LINEAR)
glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MIN_FILTER, GL_LINEAR_MIPMAP_LINEAR)
glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_WRAP_S, GL_CLAMP_TO_EDGE)
glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_WRAP_T, GL_CLAMP_TO_EDGE)
glGenerateMipmap(GL_TEXTURE_2D)
glBindTexture(GL_TEXTURE_2D, 0)
self._texture_cache[key] = tex_id
return tex_id
[docs]
def draw_points(self, pts_xy: np.ndarray, mvp: np.ndarray,
point_size_px: float,
color: tuple[float, float, float, float],
style: int,
filled: bool,
symbol_path: str | None = None,
symbol_raster_size: int = 128,
symbol_rotation: float = 0.0,
symbol_scale: float = 1.0,
pts_transform: np.ndarray | None = None):
"""Draw cloud points with the shader pipeline.
Args:
symbol_rotation: Per-cloud rotation in **radians** (CCW, Option A).
Ignored when *pts_transform* is provided.
symbol_scale: Per-cloud scale factor (Option A).
Ignored when *pts_transform* is provided.
pts_transform: Optional ``(N, 2)`` float32 array with per-point
``[rotation_rad, scale]`` columns (Option B).
When provided, overrides *symbol_rotation* and
*symbol_scale* on a per-point basis and uses the
dedicated 4-float VAO layout.
"""
if pts_xy.size == 0:
return
self._init_program()
self._ensure_vao_vbo()
mvp = np.ascontiguousarray(mvp, dtype=np.float32)
pts_xy = np.ascontiguousarray(pts_xy, dtype=np.float32)
per_point = pts_transform is not None
if per_point:
n = int(pts_xy.shape[0])
tr = np.ascontiguousarray(pts_transform, dtype=np.float32)
if tr.shape != (n, 2):
raise ValueError(
f'pts_transform shape {tr.shape!r} must be ({n}, 2)'
)
self._upload(np.concatenate([pts_xy, tr], axis=1)) # (N, 4)
else:
self._upload(pts_xy)
depth_was_enabled = glIsEnabled(GL_DEPTH_TEST)
if depth_was_enabled:
glDisable(GL_DEPTH_TEST)
glEnable(GL_BLEND)
glBlendFunc(GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA)
prog_point_size_was_enabled = glIsEnabled(GL_PROGRAM_POINT_SIZE)
if not prog_point_size_was_enabled:
glEnable(GL_PROGRAM_POINT_SIZE)
# Compatibility profiles may require POINT_SPRITE for valid gl_PointCoord.
point_sprite_was_enabled = False
try:
point_sprite_was_enabled = glIsEnabled(GL_POINT_SPRITE)
if not point_sprite_was_enabled:
glEnable(GL_POINT_SPRITE)
except Exception:
point_sprite_was_enabled = True
glUseProgram(self._program)
locs = self._locs
glUniformMatrix4fv(locs['mvp'], 1, GL_FALSE, mvp)
glUniform1f(locs['pointSize'], float(max(1.0, min(point_size_px, 256.0))))
glUniform4f(locs['pointColor'], *color)
glUniform1i(locs['pointStyle'], int(style))
glUniform1i(locs['pointFilled'], 1 if filled else 0)
glUniform1f(locs['symbolRotation'], float(symbol_rotation))
glUniform1f(locs['symbolScale'], float(max(0.01, symbol_scale)))
glUniform1i(locs['perPointTransform'], 1 if per_point else 0)
symbol_tex_id = (
self._get_symbol_texture_id(symbol_path, symbol_raster_size)
if int(style) == 4 else None
)
glUniform1i(locs['useSymbolTex'], 1 if symbol_tex_id is not None else 0)
glUniform1i(locs['symbolTex'], 0)
glActiveTexture(GL_TEXTURE0)
glBindTexture(GL_TEXTURE_2D, 0 if symbol_tex_id is None else symbol_tex_id)
active_vao = self._vao_pt if per_point else self._vao
glBindVertexArray(active_vao)
glDrawArrays(GL_POINTS, 0, int(pts_xy.shape[0]))
glBindVertexArray(0)
glBindTexture(GL_TEXTURE_2D, 0)
glUseProgram(0)
glDisable(GL_BLEND)
if not point_sprite_was_enabled:
glDisable(GL_POINT_SPRITE)
if not prog_point_size_was_enabled:
glDisable(GL_PROGRAM_POINT_SIZE)
if depth_was_enabled:
glEnable(GL_DEPTH_TEST)
[docs]
def destroy(self):
if self._texture_cache:
for tex_id in self._texture_cache.values():
try:
glDeleteTextures(1, [tex_id])
except Exception:
logging.debug('CloudPointsShader2D texture deletion failed', exc_info=True)
self._texture_cache = {}
if self._vao is not None:
try:
glDeleteVertexArrays(1, [self._vao])
except Exception:
logging.debug('CloudPointsShader2D VAO deletion failed', exc_info=True)
self._vao = None
if self._vao_pt is not None:
try:
glDeleteVertexArrays(1, [self._vao_pt])
except Exception:
logging.debug('CloudPointsShader2D VAO_pt deletion failed', exc_info=True)
self._vao_pt = None
if self._vbo is not None:
try:
glDeleteBuffers(1, [self._vbo])
except Exception:
logging.debug('CloudPointsShader2D VBO deletion failed', exc_info=True)
self._vbo = None
if self._program is not None:
try:
glDeleteProgram(self._program)
except Exception:
logging.debug('CloudPointsShader2D program deletion failed', exc_info=True)
self._program = None
self._locs = None
CloudPointsShader2D._instance = None