Source code for wolfhece.lazviewer.viewer.viewer

import subprocess
import struct
import socket
import numpy
import os
import inspect
import warnings

# _viewer_dir = os.path.join(os.path.dirname(inspect.getfile(inspect.currentframe())),'..//libs')
_viewer_dir = os.path.dirname(inspect.getfile(inspect.currentframe()))

if ~os.path.isabs(_viewer_dir):
    _viewer_dir = os.path.abspath(_viewer_dir)

__all__ = ['viewer']


[docs] class viewer: def __init__(self, *args, **kwargs): """ Opens a point cloud viewer Examples: Create 100 random points >>> xyz = pptk.rand(100, 3) Visualize the points >>> pptk.viewer(xyz) Visualize points shaded by height >>> pptk.viewer(xyz, xyz[:, 2]) Visualize points shaded by random RGB color >>> rgb = pptk.rand(100, 3) >>> pptk.viewer(xyz, rgb) """ # ensure positions is 3-column array of float32s positions = numpy.asarray(args[0], dtype=numpy.float32).reshape(-1, 3) attr = args[1:] color_map = kwargs.get('color_map', 'jet') scale = kwargs.get('scale', None) debug = kwargs.get('debug', False) # start up viewer in separate process s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('localhost', 0)) s.listen(0) self._process = subprocess.Popen( [os.path.join(_viewer_dir, 'viewer'), str(s.getsockname()[1])], stdout=subprocess.PIPE, stderr=(None if debug else subprocess.PIPE)) if debug: print ('Started viewer process: %s' \ % os.path.join(_viewer_dir, 'viewer')) x = s.accept() self._portNumber = struct.unpack('H', self._process.stdout.read(2))[0] # self._portNumber = struct.unpack('H',x[0].recv(2))[0] # upload points to viewer self.__load(positions) self.attributes(*attr) self.color_map(color_map, scale)
[docs] def close(self): """ Closes the point cloud viewer Examples: >>> v.close() """ self._process.kill() pass
[docs] def clear(self): """ Removes the current point cloud in the viewer Examples: >>> v.clear() """ # construct message msg = struct.pack('b', 2) # send message to viewer self.__send(msg)
[docs] def reset(self): """ Resets the viewer """ # construct message msg = struct.pack('b', 3) # send message to viewer self.__send(msg)
[docs] def set(self, **kwargs): """ Sets viewer property ================= =============== ================================= Property Name Value Type Description ================= =============== ================================= bg_color 4 x float32 Background color in RGBA [0, 1] bg_color_top 4 x float32 Top background color bg_color_bottom 4 x float32 Bottom background color color_map ? x 4 x float32 Color map; array of RGBA's [0, 1] color_map_scale 2 x float32 Color map scaling interval curr_attribute_id uint Current attribute set index floor_level float32 Floor z-level floor_color 4 x float32 Floor color in RGBA [0, 1] lookat 3 x float32 Camera look-at position phi float32 Camera azimuthal angle (radians) point_size float32 Point size in world space r float32 Camera distance to look-at point selected ? x uint Indices of selected points show_grid bool Show floor grid show_info bool Show information text overlay show_axis bool Show axis / look-at cursor theta float32 Camera elevation angle (radians) ================= =============== ================================= (phi, theta, r) are spherical coordinates specifying camera position relative to the look at position. (right, up, view) are orthogonal vectors forming the camera coordinate frame, where view is pointed away from the look at position, and view is the cross product of up with right Examples: >>> v = pptk.viewer(xyz) >>> v.set(point_size = 0.01) """ for prop, val in kwargs.items(): self.__send(_construct_set_msg(prop, val))
[docs] def get(self, prop_name): """ Gets viewer property ================ ============= ================================ Property Name Return Type Description ================ ============= ================================ curr_atribute_id uint Current attribute set index eye 3 x float64 Camera position lookat 3 x float64 Camera look-at position mvp 4 x 4 float64 num_points uint Number of points loaded num_attributes uint Number of attribute sets loaded phi float64 Camera azimuthal angle (radians) r float64 Camera distance to look-at point right 3 x float64 Camera Right vector selected ? x int32 Indices of selected points theta float64 Camera elevation angle (radians) up 3 x float64 Camera up vector view 3 x float64 Camera view vector ================ ============= ================================ Examples: >>> v = pptk.viewer(xyz) >>> v.get('selected') """ return self.__query(_construct_get_msg(prop_name))
[docs] def load(self, *args, **kwargs): positions = numpy.asarray(args[0], dtype=numpy.float32).reshape(-1, 3) attr = args[1:] color_map = kwargs.get('color_map', 'jet') scale = kwargs.get('scale', None) self.__load(positions) self.attributes(*attr) self.color_map(color_map, scale)
[docs] def attributes(self, *attr): """ Loads point attributes The loaded attributes are used to clor the currently loaded point cloud. Supposing n points loaded, this function accepts attributes of the following forms: * scalars: 1-d array of length 1 or n * RGB colors: 2-d array of shape (1, 3) or (n, 3) * RGBA colors: 2-d array of shape (1, 4) or (n, 4) Passing in no arguments clears all existing attribute sets and colors all points white. Cycle through attribute sets via '[' and ']' keys. Examples: >>> xyz = pptk.rand(100, 3) >>> v = pptk.viewer(xyz) >>> attr1 = pptk.rand(100) # 100 random scalars >>> attr2 = pptk.rand(100, 3) # 100 random RGB colors >>> attr3 = pptk.rand(100, 4) # 100 random RGBA colors >>> attr4 = pptk.rand(1, 1) # 1 random scalar >>> attr5 = pptk.rand(1, 3) # 1 random RGB color >>> attr6 = pptk.rand(1, 4) # 1 random RGBA color >>> v.attributes(attr1, attr2, attr3, attr4, attr6) """ msg = struct.pack('Q', len(attr)) error_msg = '%d-th attribute array inconsistent with number of points' for i, x in enumerate(attr): x = numpy.asarray(x, dtype=numpy.float32) # TODO:warn if attribute array contains NaN # array of scalars if len(x.shape) == 1: if x.shape[0] != self.get('num_points') and x.shape[0] != 1: raise ValueError(error_msg % i) msg += struct.pack('QQ', x.shape[0], 1) + x.tostring() # array of rgb or rgba elif len(x.shape) == 2 and (x.shape[-1] == 4 or x.shape[-1] == 3): if x.shape[0] != self.get('num_points') and x.shape[0] != 1: raise ValueError(error_msg % i) if x.shape[-1] == 3: x = numpy.c_[x, numpy.ones(x.shape[0], dtype=numpy.float32)] msg += struct.pack('QQ', * x.shape) + x.tostring() else: raise ValueError('%d-th ' % i + 'attribute array shape is not supported') msg = struct.pack('b', 10) + struct.pack('Q', len(msg)) + msg self.__send(msg)
[docs] def color_map(self, c, scale=None): """ Specifies how scalar attributes are used to color points in the viewer. Input c is expected to be an array of n RGB (or RGBA) vectors (i.e. c is a n x 3 or n x 4 numpy array). Upon return, scalar values equal to scale 0 are colored with c[0], scale[1] with c[-1], and scalars in between appropriately interpolated. If scale is None, scale is automatically set as the minimum and maximum scalar values in the current attribute set. Alternatively, one can choose from a number of preset color maps by passing the corresponding string instead. ================= ===================================== Preset color maps ================= ===================================== 'jet' (default) .. image:: images/colormap_jet.png 'hsv' .. image:: images/colormap_hsv.png 'hot' .. image:: images/colormap_hot.png 'cool' .. image:: images/colormap_cool.png 'spring' .. image:: images/colormap_spring.png 'summer' .. image:: images/colormap_summer.png 'autumn' .. image:: images/colormap_autumn.png 'winter' .. image:: images/colormap_winter.png 'gray' .. image:: images/colormap_gray.png ================= ===================================== Examples: >>> xyz = np.c_[np.arange(10), np.zeros(10), np.zeros(10)] >>> scalars = np.arange(10) >>> v = pptk.viewer(xyz, scalars) >>> v.color_map('cool', scale=[0, 5]) >>> v.color_map([[0, 0, 0], [1, 1, 1]]) """ # accepts array of rgb or rgba vectors if isinstance(c, str): c = _color_maps[c] elif isinstance(c, list): c = numpy.array(c) if len(c.shape) != 2 or c.shape[1] != 3 and c.shape[1] != 4: raise ValueError('Expecting array of rgb/rgba vectors') if c.shape[1] == 3: c = numpy.c_[c, numpy.ones(c.shape[0])] self.set(color_map=c) if scale is None: self.set(color_map_scale=[0, 0]) else: self.set(color_map_scale=scale)
[docs] def capture(self, filename): """ Take screen shot of current view and save to filename Examples: >>> v = pptk.viewer(xyz) >>> v.capture('screenshot.png') """ msg = struct.pack('b', 6) + _pack_string(os.path.abspath(filename)) self.__send(msg)
[docs] def play(self, poses, ts=[], tlim=[-numpy.inf, numpy.inf], repeat=False, interp='cubic_natural'): """ Plays back camera path animation specified by poses Args: poses: Key poses. e.g. a list of 6-tuples (x, y, z, phi, theta, r) poses, or anything convertible to a 6-column array by np.array ts (optional): Key pose times. If unspecified key poses are placed at 1 second intervals. tlim (optional): Play back time range (in seconds) repeat (optional): Toggles infinite play back loop. Works well with interp='cubic_periodic'. interp (optional): Interpolation method. Should be one of 'constant', 'linear', 'cubic_natural', or 'cubic_periodic'. Examples: Rotate camera about origin at 1/8 Hz. >>> poses = [] >>> poses.append([0, 0, 0, 0 * np.pi/2, np.pi/4, 5]) >>> poses.append([0, 0, 0, 1 * np.pi/2, np.pi/4, 5]) >>> poses.append([0, 0, 0, 2 * np.pi/2, np.pi/4, 5]) >>> poses.append([0, 0, 0, 3 * np.pi/2, np.pi/4, 5]) >>> poses.append([0, 0, 0, 4 * np.pi/2, np.pi/4, 5]) >>> v.play(poses, 2 * np.arange(5), repeat=True, interp='linear') """ poses, ts = _fix_poses_ts_input(poses, ts) if poses.size == 0: return msg = struct.pack('b', 8) \ + struct.pack('i', poses.shape[0]) + poses.tostring() \ + struct.pack('i', ts.size) + ts.tostring() \ + struct.pack('b', _interp_code[interp]) self.__send(msg) msg = struct.pack('b', 9) \ + struct.pack('2f', *tlim) \ + struct.pack('?', repeat) self.__send(msg)
[docs] def record(self, folder, poses, ts=[], tlim=[-numpy.inf, numpy.inf], interp='cubic_natural', shutter_speed=numpy.inf, fps=24, prefix='frame_', ext='png'): """ Records camera animation to a sequence of images. Usage of this method is very similar to viewer.play(...). Args: folder: Folder to which images are saved poses: Same as in :meth:`pptk.viewer.play` ts: Same as in :meth:`pptk.viewer.play` tlim: Same as in :meth:`pptk.viewer.play` interp: Same as in :meth:`pptk.viewer.play` fps: Frames per second prefix: Resulting image file names are prefixed with this string ext: Image format Examples: Assuming poses defined as in the example for :meth:pptk.viewer.play >>> mkdir 'recording' >>> v.record('recording', poses) Tip: Uses ffmpeg to generate a video from the resulting image sequence >>> ffmpeg -i "frame_%03d.png" -c:v mpeg4 -qscale:v 0 -r 24 output.mp4 """ if not os.path.isdir(folder): raise ValueError('invalid folder provided') poses, ts = _fix_poses_ts_input(poses, ts) if poses.size == 0: return # load camera path msg = struct.pack('b', 8) + \ struct.pack('i', poses.shape[0])+poses.tostring() + \ struct.pack('i', ts.size)+ts.tostring() + \ struct.pack('b', _interp_code[interp]) self.__send(msg) # clamp tlim[0] and tlim[1] to [ts[0],ts[-1]] t_beg = numpy.minimum(numpy.maximum(ts[0], tlim[0]), ts[-1]) t_end = numpy.minimum(numpy.maximum(ts[0], tlim[1]), ts[-1]) # ensure t_beg <= t_end t_end = numpy.maximum(t_end, t_beg) # pose and capture num_frames = 1 + numpy.floor((t_end - t_beg) * fps) num_digits = 1 + numpy.floor(numpy.log10(num_frames)) for i in range(int(num_frames)): t = i * 1.0 / fps + t_beg msg = struct.pack('b', 9) + \ struct.pack('2f', t, t) + \ struct.pack('?', False) self.__send(msg) filename = prefix \ + ('%0' + str(num_digits) + 'd') % (i + 1) + '.' + ext filename = os.path.join(folder, filename) self.capture(filename)
# todo: need to check whether write succeeded # ideally, capture(...) should return filename
[docs] def wait(self): """ Blocks until Enter/Return key is pressed in viewer Examples: >>> v = pptk.viewer(xyz) >>> v.wait() Press enter in viewer to return control to python terminal. """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('localhost', self._portNumber)) s.send(struct.pack('b', 7)) s.setblocking(1) buf = b'' while len(buf) == 0: buf += s.recv(1) if buf != b'x': raise RuntimeError('expecting return code \'x\'') s.close()
[docs] def __load(self, positions): # if no points, then done if positions.size == 0: return # construct message numPoints = int(positions.size / 3) msg = struct.pack('b', 1) \ + struct.pack('i', numPoints) + positions.tostring() # send message to viewer self.__send(msg)
[docs] def __send(self, msg): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('localhost', self._portNumber)) totalSent = 0 while totalSent < len(msg): sent = s.send(msg) if sent == 0: raise RuntimeError("socket connection broken") totalSent = totalSent + sent s.close()
[docs] def __query(self, msg): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # s.setsockopt(socket.SOL_SOCKET,socket.TCP_NODELAY,1) s.connect(('localhost', self._portNumber)) totalSent = 0 while totalSent < len(msg): sent = s.send(msg) if sent == 0: raise RuntimeError("socket connection broken") totalSent = totalSent + sent # layout of response message: # 0: data type (0 - error msg, 1 - char, 2 - float, 3 - int, 4 - uint) # 1: number of dimensions (quint64) # 9: dimensions (quint64) # ?: body lookupSize = {0: 1, 1: 1, 2: 4, 3: 4, 4: 4} dataType = ord(s.recv(1)) numDims = struct.unpack('Q', _recv_from_socket(8, s))[0] dims = struct.unpack(str(numDims) + 'Q', _recv_from_socket(numDims * 8, s)) numElts = numpy.prod(dims) bodySize = lookupSize[dataType] * numElts body = _recv_from_socket(bodySize, s) s.close() if dataType == 0: raise ValueError(body) if dataType != 0 and dataType != 1: lookupCode = {1: 'c', 2: 'f', 3: 'i', 4: 'I'} body = struct.unpack(str(numElts) + lookupCode[dataType], body) body = numpy.array(list(body)).reshape(dims) # return body as is if type is char (0) return body
def _recv_from_socket(n, s): # receive n bytes from socket s buf = b'' while len(buf) < n: buf += s.recv(n - len(buf)) return buf def _fix_poses_ts_input(poses, ts): # ensure poses is 6 column array of floats poses = numpy.float32(numpy.array(poses).reshape(-1, 6)).copy() # ensure ts has the same number of timestamps as poses ts = numpy.float32(numpy.array(ts)) if ts.size == 0: ts = numpy.float32(numpy.arange(poses.shape[0])) elif ts.size != poses.shape[0]: raise ValueError('number of time stamps != number of key poses') # ensure ts is unique and ascending if numpy.any(numpy.diff(ts) <= 0): raise ValueError('time stamps must be unique and ascending') # ensure subsequent angle differences between -180 and +180 degrees def correct_angles(x): # note: mapping takes +180 + 360k to +180, # and -180 + 360k to -180, for any integer k d = numpy.diff(x) absd = numpy.abs(d) y = -absd - 2.0 * numpy.pi \ * numpy.floor((-absd + numpy.pi) / 2.0 / numpy.pi) y *= -numpy.sign(d) return x[0] + numpy.r_[0, numpy.cumsum(y)] poses[:, 3] = correct_angles(poses[:, 3]) poses[:, 4] = correct_angles(poses[:, 4]) return (poses, ts) def _encode_bool(x): try: y = struct.pack('?', x) except Exception: raise return y def _encode_float(x): try: y = struct.pack('f', x) except Exception: raise return y def _encode_floats(x): return numpy.asarray(x, dtype=numpy.float32).tostring() def _encode_uints(x): return numpy.asarray(numpy.uint32(x)).tostring() def _encode_uint(x): try: y = struct.pack('I', x) except Exception: raise return y def _encode_rgb(x): x = numpy.asarray(numpy.float32(x)) if x.size != 3 or numpy.any(numpy.logical_or(x < 0.0, x > 1.0)): raise ValueError('Expecting 3 values in [0,1]') return struct.pack('fff', x[0], x[1], x[2]) def _encode_rgba(x): x = numpy.asarray(numpy.float32(x)) if x.size != 4 or numpy.any(numpy.logical_or(x < 0.0, x > 1.0)): raise ValueError('Expecting 4 values in [0,1]') return struct.pack('ffff', x[0], x[1], x[2], x[3]) def _encode_rgbas(x): x = numpy.asarray(numpy.float32(x)) if x.shape[1] == 4 and numpy.all(numpy.logical_and(x >= 0.0, x <= 1.0)): return x.tostring() else: raise ValueError('Expecting 4 column array of values in [0,1]') def _encode_xyz(x): x = numpy.asarray(numpy.float32(x)) if x.size != 3: raise ValueError('Expecting 3 values') return struct.pack('fff', x[0], x[1], x[2]) def _init_properties(): _properties['point_size'] = _encode_float _properties['bg_color'] = _encode_rgba _properties['bg_color_top'] = _encode_rgba _properties['bg_color_bottom'] = _encode_rgba _properties['show_grid'] = _encode_bool _properties['show_info'] = _encode_bool _properties['show_axis'] = _encode_bool _properties['floor_level'] = _encode_float _properties['floor_color'] = _encode_rgba _properties['floor_grid_color'] = _encode_rgba _properties['lookat'] = _encode_xyz _properties['phi'] = _encode_float _properties['theta'] = _encode_float _properties['r'] = _encode_float _properties['selected'] = _encode_uints _properties['color_map'] = _encode_rgbas _properties['color_map_scale'] = _encode_floats _properties['curr_attribute_id'] = _encode_uint def _construct_get_msg(prop_name): return struct.pack('b', 5) + _pack_string(prop_name) def _construct_set_msg(prop_name, prop_value): if not _properties.get(prop_name): raise ValueError('Invalid property name encountered: %s' % prop_name) msg_header = struct.pack('b', 4) + _pack_string(prop_name) msg_payload = '' try: msg_payload = _properties[prop_name](prop_value) except BaseException as e: raise ValueError('Failed setting "%s": ' % prop_name + str(e)) return msg_header + struct.pack('Q', len(msg_payload)) + msg_payload def _pack_string(string): return struct.pack('Q', len(string)) + \ struct.pack(str(len(string)) + 's', string.encode('ascii')) def _init_color_maps(): _color_maps['jet'] = numpy.array( [[0, 0, 1], [0, 1, 1], [0, 1, 0], [1, 1, 0], [1, 0, 0]], dtype=numpy.float32) _color_maps['hsv'] = numpy.array( [[1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0]], dtype=numpy.float32) _color_maps['hot'] = numpy.array( [[0, 0, 0], [1, 0, 0], [1, 1, 0], [1, 1, 1]], dtype=numpy.float32) _color_maps['cool'] = numpy.array( [[0, 1, 1], [1, 0, 1]], dtype=numpy.float32) _color_maps['spring'] = numpy.array( [[1, 0, 1], [1, 1, 0]], dtype=numpy.float32) _color_maps['summer'] = numpy.array( [[0, .5, .4], [1, 1, .4]], dtype=numpy.float32) _color_maps['autumn'] = numpy.array( [[1, 0, 0], [1, 1, 0]], dtype=numpy.float32) _color_maps['winter'] = numpy.array( [[0, 0, 1], [0, 1, .5]], dtype=numpy.float32) _color_maps['gray'] = numpy.array( [[0, 0, 0], [1, 1, 1]], dtype=numpy.float32) _properties = dict() _init_properties() _color_maps = dict() _init_color_maps() # define codes for each interpolation scheme _interp_code = {'constant': 0, 'linear': 1, 'cubic_natural': 2, 'cubic_periodic': 3}