Source code for wolfgpu.tile_packer

"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

from enum import Enum
import logging
from math import sqrt, ceil
import numpy as np
from numba import jit
[docs] class TilePackingMode(Enum):
[docs] REGULAR = 1
#SHUFFLED = 2
[docs] TRANSPARENT = 3 # For debugging: set it to disable the tile packing.
def __str__(self): if self == TilePackingMode.REGULAR: return "Regular" elif self == TilePackingMode.TRANSPARENT: return "Transparent" else: return str(self)
# nopython: A Numba compilation mode that generates code that does not access # the Python C API. This compilation mode produces the highest performance code, # but requires that the native types of all values in the function can be # inferred. Will raise exception if it can't compile aith these assumptions. @jit(nopython=True)
[docs] def _unpack_and_deshuffle_array(a: np.ndarray, shape:tuple, height:int, width:int, active_tiles:np.ndarray, tile_size:int, tile_indirection_map:np.ndarray) -> np.ndarray: r = np.zeros(shape, dtype=a.dtype) for j, i in zip(active_tiles[0], active_tiles[1]): decalw = max(0, (i+1)*tile_size - width) tw = tile_size - decalw decalh = max(0, (j+1)*tile_size - height) th = tile_size - decalh dest_slice_j = slice(j*tile_size, j*tile_size + th) dest_slice_i = slice(i*tile_size, i*tile_size + tw) tc = tile_indirection_map[j,i,:] source_slice_i = slice(tc[0], tc[0]+tw) source_slice_j = slice(tc[1], tc[1]+th) r[dest_slice_j, dest_slice_i, ...] = a[source_slice_j, source_slice_i, ...] return r
[docs] class TilePacker: def __init__(self, nap: np.array, tile_size: int, mode: TilePackingMode = TilePackingMode.REGULAR): """ Initialize the tile indirection system. The motivation is that usually, due to the shape of our rivers, the computation domain (a matrix) is mostly empty. Therefore we don't use the memory very wisely if we keep a plain rectangular domain to store a riverbed. What we do is: - cut the computation domain in tiles. A tile is either active or inactive. An active tile is one that contains meshes which are marke active in the NAP map. - translate all the active tiles so that they are close together ("packing") Of course, once the tiles are packed, the whole hydro computation must still continue to work. So each time the hydro computation looks for a mesh at (i,j), we must translate those coordinates to the "packed" domain. So in this function we also compute an "indirection" map that tells us where a (t_i, t_j) tile falls in the packed domain. :param nap: the NAP array - 1 is active, 0 is inactive. :param tile_size: the size of a tile (tiles are squared). :param mode: The tile packing mode (default: REGULAR) """ assert tile_size >= 1
[docs] self._tile_size = tile_size
[docs] self._mode = mode
self._height, self._width = nap.shape # Save the NAP for reuse later on (in our case, saving it to disk in the ResultStore) # ATTENTION don't build upon it because it eats a lot of memory. It's just here # as a cheap placeholder. # FIXME Later on one should store it directly from the sim to the disk
[docs] self._original_nap = nap
# 0 means inactive mesh in the NAP array. nap = self._pad_array_to_tiles(nap, 0) nb_tiles_x, nb_tiles_y = self.size_in_tiles() # Here we cut the original NAP array into tiles. # (I(ve tried to do it with reshape, but it seems to not being able to do the # job. E.g. cut a 4x4 array in 2x2 tiles: np.arange(16).reshape(4,4).T.reshape(2,2,2,2) fail...) # In the end we get a (nb tiles x, nb tiles y) array of which # each element is a (_tile_size, _tile_size) tile array. # To understand the stride trick, do this: # - read the shape parameter as the shape you *want*. # - read the strides as strides over the flattened array. The first # two strides are to choose the tile. The other two strides are # to enumerate the elements inside a tile. TS = self._tile_size # # ATTENTION stride_tricks comes with tons of gotcha's ! # # array item size (in bytes) # ais = nap.dtype.itemsize # tiled = np.lib.stride_tricks.as_strided( # np.ascontiguousarray(nap), # This one is really necessary (see warning in side_tricks documentation) # shape=((nap.shape[0]//TS,nap.shape[1]//TS,TS,TS)), # strides=(nap.shape[1]*TS*ais, TS*ais, # choose tile # nap.shape[1]*ais, 1*ais), # enumerate meshes inside tile # writeable=False) # # Determine if a tile has some active meshes inside it # # We do that by summing the NAP values inside each tile. # tiles_sums = np.sum(tiled, axis=(2,3)) # Replacing the above lines by more numpy-ish code. tiles_sums = np.sum(nap.reshape(nap.shape[0]//TS,TS,nap.shape[1]//TS,TS).swapaxes(1,2), axis=(2,3)) # At this point, tiles_sums[i,j] > 0 if the corresponding tile has some # non zero values inside it. Now we transform that information into # an array of zeros and ones (kind of a normalisation if you will). # FIXME I think it's useless, the code after this will trash these data. active_tiles = np.zeros_like(tiles_sums, np.uint32) assert active_tiles.shape == (nb_tiles_y,nb_tiles_x), f"{active_tiles.shape} ?" active_tiles[tiles_sums > 0] = 1 # Numbering active cells (0 == inactive, >= 1 == active) # Note that we have an edge case. # This case occurs when we compute meshes located on the border of a # tile of which the neighbouring border tile (call it n_t) is not # active. When we compute that mesh, we usually have a look at its # neighbours. In this edge case, one of the neighbours in question (call # it n_m) will be inside the neighbouring tile (n_t) which is inactive. # Since this tile is inactive, it will not be part of the "packed tiles" # and thus has no representation whatsoever. Therefore, the values # h,qx,qy,bathy,... of n_m will be unknown. # On a regular (non packed) computation domain it's not a problem # because we put default/neutral values (such as water height == 0) in # inactive meshes. So when we look outside the domain, we get these safe # values. # Once tiles are packed, as we have seen, we may reach out to a not # existing, or worse, random tile. To avoid that, we choose to have an # "emtpy" tile and number it zero. Any lookup outside the regular active # tiles will fall on the empty tile. Adding a tile imply that if we have # N active tiles in the computation domain we'll store N+1 tiles. So we # have to make enough room to allow that.
[docs] self._active_tiles_ndx = np.nonzero(active_tiles) # Only indices (don't multiply by tile_size)
[docs] self._nb_active_tiles = len(self._active_tiles_ndx[0])
# Active tiles is the map of active tiles (non packed) # We update the active tile markers by replacing it with a # tile number. # So [ 1 1 ] becomes [ 1 2 ] # [ 0 1 ] [ 0 3 ] active_tiles[self._active_tiles_ndx] = np.arange(1, self._nb_active_tiles+1) #+1 for the "empty" tile # Now transforming the numbers of the numbered cells in coordinates # Indir will map a tile index (ty,tx) into its indirected x # (indir[ty,tx,0]) and y (indir[ty,tx,1]) coordinates # More exactly, a tile index (ty,tx) will be mapped to # the bottom/left corner of the corresponding tile in the # virtual domain. indir = np.zeros( (nb_tiles_y,nb_tiles_x,2), dtype=np.uint16) # The set of active tiles will be "reorganized" as a more or less square # shape where they're all packed. # We chose that shape because it's easy to build. used_tiles = self._nb_active_tiles + 1
[docs] self._packed_nb_tiles_x = int(sqrt(used_tiles))
[docs] self._packed_nb_tiles_y = int(ceil(used_tiles / self._packed_nb_tiles_x))
indir[:,:,0] = active_tiles % self._packed_nb_tiles_x indir[:,:,1] = active_tiles // self._packed_nb_tiles_x # from tests.arrayview import array_view # array_view(indir[:, :, 0]) # from matplotlib import pyplot as plt # plt.imshow(indir[:, :, 1], origin="lower") # plt.show() # Convert from tile index to mesh position (this spares a multiplication # in the shader code) # FIXME We have uint16 for these, so the maximum dimension of our domain # will be 65536 !!! assert nb_tiles_x * self._tile_size < 65536, "Using uint16, we're limited to a domain width of 65536 meshes" assert nb_tiles_y * self._tile_size < 65536, "Using uint16, we're limited to a domain height of 65536 meshes" #print(indir) indir *= self._tile_size # When shuffling arrays, it should be faster to read directly # the active cells. if self._mode == TilePackingMode.TRANSPARENT: # For debugging purpose: create some "ineffective" indirection maps. # This is basically a transparent map nb_tiles_x, nb_tiles_y = self.size_in_tiles() self._packed_nb_tiles_x, self._packed_nb_tiles_y = nb_tiles_x, nb_tiles_y xs = np.repeat( np.atleast_2d( np.arange(nb_tiles_x)), nb_tiles_y, axis=0) ys = np.repeat( np.atleast_2d( np.arange(nb_tiles_y)), nb_tiles_x, axis=0).T indir = np.zeros( (nb_tiles_y,nb_tiles_x,2), dtype=np.uint16) indir[:,:,0] = xs indir[:,:,1] = ys indir *= self._tile_size self._active_tiles_ndx = None self._nb_active_tiles = None #indir = np.roll(indir, shift=nbx//4, axis = 1) #indir = np.roll(indir, shift=nby//4, axis = 0) self._packed_to_domain_indir = np.ascontiguousarray(indir) #print(indir)
[docs] self._tile_indirection_map = indir
# from matplotlib import pyplot as plt # fig, axs = plt.subplots(1, 2) # axs[0].imshow(indir[:,:,0] // 16, origin="lower") # axs[1].imshow(indir[:, :, 1] // 16, origin="lower") # plt.show() if self._active_tiles_ndx is not None: # Build the reverse mapping: from a packed tile 2D-index to a domain # tile 2D-index. # Indices of active tiles, shape is (N,2). We transform it to the # more-or-less square of the packed tiles. We also swap # self._active_tiles_ndx coordinates to get back x,y instead of y,x. crds = np.vstack([self._active_tiles_ndx[1], self._active_tiles_ndx[0]]).transpose() # This value (set in both coordinates) represents a tile that is # a padding one. PADDING_COORDINATE_MARKER = 2**16 - 1 # crds is matrix of N rows (N == # of active tiles) and 2 columns (i/j coordinates # of each actile tiles. crds = np.pad( crds, [ # We expand the tiles. # - We add one column in front of the coordinate. That # column represents the dummy/padding tile. As this is the # first tile, it means the (0,0) tile will be mapped to # a padding tile too. # - We add several columns at the end of the coordinates. # These columns will make sure we end up with a total # number of tiles that cover the whole packed domain (1, self._packed_nb_tiles_x * self._packed_nb_tiles_y - crds.shape[0] - 1), # We leave the coordinates as they are. (0, 0), ], constant_values=PADDING_COORDINATE_MARKER, # Will be mutliplied by tile size! ) self._packed_to_domain_indir = np.ascontiguousarray(crds.reshape( (self._packed_nb_tiles_y, self._packed_nb_tiles_x, 2) ).astype(np.uint16)) # from matplotlib import pyplot as plt # plt.imshow(self._packed_to_domain_indir[:, :, 0], origin="lower") # plt.show() # print(self._packed_to_domain_indir[-1, :, 0].tolist()) # from tests.arrayview import array_view # array_view(self._packed_to_domain_indir[:, :, 0]) for i in range(self._packed_nb_tiles_x): for j in range(self._packed_nb_tiles_y): p = self._packed_to_domain_indir[j,i] if p[0] != PADDING_COORDINATE_MARKER: ind = self._tile_indirection_map[p[1], p[0]] assert ind[0] / self._tile_size == i assert ind[1] / self._tile_size == j, f"{ind[1]} / {self._tile_size} != {j} ?" #print(self._packed_to_domain_indir) # self._packed_to_domain_indir *= self._tile_size # Spare a multiplication in the sahder # assert ( # self._packed_to_domain_indir.shape == self._tile_indirection_map.shape # ), f"{self._packed_to_domain_indir.shape} != {self._tile_indirection_map.shape}, packed domain is {self._packed_nb_tiles_x} w, {self._packed_nb_tiles_y} h tiles"
[docs] def tile_reversed_indirection_map(self): return self._packed_to_domain_indir
[docs] def tile_indirection_map(self): """ The tile indirection map. Its shape is (nb_tiles_y,nb_tiles_x,2) (so it's defined in term of the original, non-tiled domain). The z-axis contains the indirected coordinates of the bottom-left corner of the tile denoted by the x and y axis values. For example, if you have the tile coordinates (t_i, t_j), then map[t_i, t_j, :] is a 2-tuple containing the coordinates (in meshes) of the bottom-left corner of that tile, on the tiled, packed domain. Note that the (i,j) is (horizontal, vertical) coordinates but the indirection matrix is (vertical horizontal). FIXME That's completely awkward. I should keep (j,i) as (vert, horiz) """ return self._tile_indirection_map
[docs] def mode(self) -> TilePackingMode: return self._mode
[docs] def packed_size(self): """ Size of the arrays after padding them and packing them in tiles, expressed in meshes. Size is a (width, height) tuple. Note that this size can be very different than the actual computation domain size (because tiles are heavily reorganized). """ if self._mode != TilePackingMode.TRANSPARENT: return (self._packed_nb_tiles_x * self._tile_size, self._packed_nb_tiles_y * self._tile_size) else: return (self._width, self._height)
[docs] def packed_size_in_tiles(self): """ Size of the arrays after padding them and packing them in tiles, expressed in tiles. Size is a (width, height) tuple. Note that this size can be very different than the actual computation domain size. """ return (self._packed_nb_tiles_x, self._packed_nb_tiles_y)
[docs] def size_in_tiles(self): """ Size of the (original, non packed, non tiled) computation domain, in tiles. Note that we count full tiles. So if one dimension of the domain is not a multiple of the tile size, then we round upwards. Size is a (width, height) tuple. """ return ((self._width +self._tile_size-1) // self._tile_size, (self._height+self._tile_size-1) // self._tile_size)
[docs] def tile_size(self) -> int: """ The tile size. Note that tiles are squared. """ return self._tile_size
# def active_tiles_ndx(self): # return self._active_tiles_ndx
[docs] def unpack_array(self, a: np.ndarray) -> np.ndarray: """ De-shuffle and un-pad an array of tiles that was shuffled and padded. """ psw, psh = self.packed_size() assert a.shape[0] == psh and a.shape[1] == psw, \ f"Seems like the array you gave is not shuffled/padded. Its shape is {a.shape}. " \ f"I was expecting something with a shape like ({psh},{psw},...)" if self._mode == TilePackingMode.TRANSPARENT: return a s = tuple([self._height, self._width] + list(a.shape[2:])) #print(_unpack_and_deshuffle_array) # double check if numba actually compiled return _unpack_and_deshuffle_array(a, s, self._height, self._width, self._active_tiles_ndx, self._tile_size, self._tile_indirection_map)
[docs] def _unpad_array(self, a: np.array) -> np.array: """ Undo `_pad_array_to_tiles`. """ ntx, nty = self.size_in_tiles() assert a.shape[0] == nty*self._tile_size, "Seems like the array you gave is not padded" assert a.shape[1] == ntx*self._tile_size, "Seems like the array you gave is not padded" return a[0:self._height, 0:self._width]
[docs] def _pad_array_to_tiles(self, a: np.array, neutral_values) -> np.array: """ Make an array fit in a given number of tiles (on x and y axis). After this, the array's dimensions are multiple of the tile_size. :param neutral_values: The value used to pad. """ assert a.shape[0] == self._height, "The array seems to have nothing to do with the computation domain which I'm representing" assert a.shape[1] == self._width, "The array seems to have nothing to do with the computation domain which I'm representing" ntx, nty = self.size_in_tiles() mesh_to_add_on_y = nty*self._tile_size - a.shape[0] mesh_to_add_on_x = ntx*self._tile_size - a.shape[1] assert mesh_to_add_on_y >= 0, "Your array doesn't fit the computation domain vertically" assert mesh_to_add_on_x >= 0, "Your array doesn't fit the computation domain horizontally" if len(a.shape) == 3: return np.pad(a, [(0,mesh_to_add_on_y), (0,mesh_to_add_on_x), (0,0)], constant_values=neutral_values) elif len(a.shape) == 2: return np.pad(a, [(0,mesh_to_add_on_y), (0,mesh_to_add_on_x)], constant_values=neutral_values) else: raise Exception(f"Array shape {a.shape} is not not supported")
[docs] def pack_array(self, a: np.array, neutral_values = None, debug=False) -> np.array: """ Reorganize an array by moving tiles around to follow the ordering given by `self._tile_indirection_map` The array is resized in order to be just as large as needed to hold the active tiles plus the "empty" tile. Note: the packed array has the same dtype as :param a: `neutral_values`: value to fill the empty tile with. """ # Padding to avoid tricky computations over "incomplete" tiles. a = self._pad_array_to_tiles(a, neutral_values or 0) if self._mode == TilePackingMode.TRANSPARENT: return a logging.debug(f"Packing {a.shape}") packed_shape = list(a.shape) # Preserve the third dimension, if any. packed_shape[0] = self._packed_nb_tiles_y * self._tile_size packed_shape[1] = self._packed_nb_tiles_x * self._tile_size # Clearing non used tiles because they're acutally use while computing # max step size ('cos that computation doesn't use the indirection mechanism) # FIXME We clear too much, only the last row of tiles and the "neutral" tile # should be cleared. # The array containing the active tiles, packed. if neutral_values is None: packed_tiles = np.zeros(tuple(packed_shape), dtype=a.dtype) else: packed_tiles = np.empty(tuple(packed_shape), dtype=a.dtype) packed_tiles[:,:,...] = neutral_values # There's the 0-th tile which is meant to be neutral. So we clear it # because 0 is mostly neutral (it depends on what `a` (the input array) # represents. If it's h,qx,qy then it's neutral, but for bathymetry it # may be different. # FIXME For the moment, I believe that if h,qx,qy then, the mesh is # neutral, regardless of the other params such as bathymetry. if neutral_values is None: packed_tiles[0:self._tile_size, 0:self._tile_size, ...] = 0 else: packed_tiles[0:self._tile_size, 0:self._tile_size, ...] = neutral_values # Go over all NAP-active tiles and pack each of them. for tile_coord in zip(self._active_tiles_ndx[0], self._active_tiles_ndx[1]): j,i = tile_coord source_i = slice(i*self._tile_size, (i+1)*self._tile_size) source_j = slice(j*self._tile_size, (j+1)*self._tile_size) # Remember that the active tiles are numbered 1-based. The 0-th tile # is the "neutral value" tile (used to represent out of domain, neutral data) tc = self._tile_indirection_map[j,i,:] assert tc[0] < self.packed_size()[0] assert tc[1] < self.packed_size()[1] dest_i = slice(tc[0], tc[0]+self._tile_size) dest_j = slice(tc[1], tc[1]+self._tile_size) # if debug and tc[1] >= self.packed_size()[1]-30: # print(a.dtype) # print(packed_tiles.dtype) # print(a[source_j, source_i, ...]) # print(tc) # print(dest_i) # print(dest_j) #logging.trace(f"{a.shape} -> {packed_shape}: {dest_i}, {dest_j}") packed_tiles[dest_j, dest_i, ...] = a[source_j, source_i, ...] # if debug: # from matplotlib import pyplot as plt # fig, axs = plt.subplots(1, 2) # axs[0].imshow(a, origin="lower") # axs[1].imshow(packed_tiles, origin="lower") # plt.show() return packed_tiles
[docs] def unpack_subarray_xy_view(self, packed_array, x1, y1, x2, y2): """ Extract a rectangular subarray from a packed tile array and return it as a regular array. :param packed_array: The packed array from which to extract the subarray. :param x1: The left (inclusive) coordinate of the region to extract (in user/original domain coordinates). :param y1: The top (inclusive) coordinate of the region to extract (in user/original domain coordinates). :param x2: The right (exclusive) coordinate of the region to extract (in user/original domain coordinates). :param y2: The bottom (exclusive) coordinate of the region to extract (in user/original domain coordinates). :return: The unpacked subarray corresponding to the requested region. :rtype: numpy.ndarray """ # Our goal here is to extract the rectangle defined by x1/x2,y1/y2 # coordinates (user coordinates) from the packed array of tiles (which # is in packed coordinates). # Extend to get full tiles and get the coordinates expressed in tiles. ts = self._tile_size x1t = x1 // ts x2t = (x2 + ts - 1) // ts y1t = y1 // ts y2t = (y2 + ts - 1) // ts fragment = np.empty(((y2t-y1t)*ts, (x2t-x1t)*ts), dtype=packed_array.dtype) # print(f"fragment: {fragment.shape}") # print(f"packed: {packed_array.shape}") indir = self.tile_indirection_map() # print(indir[:,:,0]/ts) # print(indir[:,:,1]/ts) # Coordinates of the tiles in the tile-packed array (tile-packed coordinates) for y in range(y1t, y2t): for x in range(x1t, x2t): left = indir[y,x,0] bottom = indir[y,x,1] tile = packed_array[bottom:bottom+ts, left:left+ts] fragment[(y-y1t)*ts:(y-y1t+1)*ts, (x-x1t)*ts:(x-x1t+1)*ts] = tile # Now we must undo the padding we did to fit the tiles size. def unpad_slice(c1, c2): return slice(c1 % ts, c1 % ts + c2 - c1) yslice = unpad_slice(y1, y2) xslice = unpad_slice(x1, x2) return fragment[yslice, xslice]
[docs] def unpack_subarray_matrix_view(self, packed_array, i1, j1, i2, j2): """ Extract a rectangular subarray from a packed tile array and return it as a regular array. :param packed_array: The packed array from which to extract the subarray. :param i1: The left (inclusive) coordinate of the region to extract (in matrix convention). :param j1: The top (inclusive) coordinate of the region to extract (in matrix convention). :param i2: The right (exclusive) coordinate of the region to extract (in matrix convention). :param j2: The bottom (exclusive) coordinate of the region to extract (in matrix convention). :return: The unpacked subarray corresponding to the requested region. :rtype: numpy.ndarray """ # Our goal here is to extract the rectangle defined by x1/x2,y1/y2 # coordinates (user coordinates) from the packed array of tiles (which # is in packed coordinates). x1, y1, x2, y2 = j1, i1, j2, i2 # Extend to get full tiles and get the coordinates expressed in tiles. ts = self._tile_size x1t = x1 // ts x2t = (x2 + ts - 1) // ts y1t = y1 // ts y2t = (y2 + ts - 1) // ts fragment = np.empty(((y2t-y1t)*ts, (x2t-x1t)*ts), dtype=packed_array.dtype) # print(f"fragment: {fragment.shape}") # print(f"packed: {packed_array.shape}") indir = self.tile_indirection_map() # print(indir[:,:,0]/ts) # print(indir[:,:,1]/ts) # Coordinates of the tiles in the tile-packed array (tile-packed coordinates) for y in range(y1t, y2t): for x in range(x1t, x2t): left = indir[y,x,0] bottom = indir[y,x,1] tile = packed_array[bottom:bottom+ts, left:left+ts] fragment[(y-y1t)*ts:(y-y1t+1)*ts, (x-x1t)*ts:(x-x1t+1)*ts] = tile # Now we must undo the padding we did to fit the tiles size. def unpad_slice(c1, c2): return slice(c1 % ts, c1 % ts + c2 - c1) yslice = unpad_slice(y1, y2) xslice = unpad_slice(x1, x2) return fragment[yslice, xslice]
[docs] def unpack_subarrays_xy_view(self, packed_array, xys: dict[str, tuple[int,int,int,int]]) -> dict[str, np.ndarray]: """ Extract multiple rectangular subarrays from a packed tile array and return them as regular arrays. :param packed_array: The packed array from which to extract the subarrays. :param xys: A dictionary where each key is a label and each value is a tuple of four integers representing the coordinates (x1, y1, x2, y2) of the region to extract (in user/original domain coordinates). :return: A dictionary of unpacked subarrays corresponding to the requested regions. :rtype: dict[str, numpy.ndarray] """ results = {} for label, (x1, y1, x2, y2) in xys.items(): subarray = self.unpack_subarray_xy_view(packed_array, x1, y1, x2, y2) results[label] = subarray return results
[docs] def _pad_subarray_to_tiles(self, a: np.array, i1:int, j1:int, neutral_values) -> np.array: """ Make an array fit in a given number of tiles (on x and y axis). After this, the array's dimensions are multiple of the tile_size. :param a: The subarray to pad. :param i1: The top coordinate of the subarray in the original domain. :param j1: The left coordinate of the subarray in the original domain. :param neutral_values: The value used to pad. """ # We need to pad according to the position of the subarray # in the original domain. mesh_to_add_on_j_before = j1 % self._tile_size mesh_to_add_on_i_before = i1 % self._tile_size ntj_sub = (mesh_to_add_on_j_before + a.shape[1] + self._tile_size - 1) // self._tile_size nti_sub = (mesh_to_add_on_i_before + a.shape[0] + self._tile_size - 1) // self._tile_size mesh_to_add_on_i_after = nti_sub*self._tile_size - (mesh_to_add_on_i_before + a.shape[0]) mesh_to_add_on_j_after = ntj_sub*self._tile_size - (mesh_to_add_on_j_before + a.shape[1]) assert mesh_to_add_on_i_before + mesh_to_add_on_i_after >= 0, "Your array doesn't fit the computation domain vertically" assert mesh_to_add_on_j_before + mesh_to_add_on_j_after >= 0, "Your array doesn't fit the computation domain horizontally" if len(a.shape) == 3: return (np.pad(a, [(mesh_to_add_on_i_before, mesh_to_add_on_i_after), (mesh_to_add_on_j_before, mesh_to_add_on_j_after), (0,0)], constant_values=neutral_values), i1 - mesh_to_add_on_j_before, j1 - mesh_to_add_on_i_before, i1 - mesh_to_add_on_j_before + ntj_sub * self._tile_size, j1 - mesh_to_add_on_i_before + nti_sub * self._tile_size ) elif len(a.shape) == 2: return (np.pad(a, [(mesh_to_add_on_i_before, mesh_to_add_on_i_after), (mesh_to_add_on_j_before, mesh_to_add_on_j_after)], constant_values=neutral_values), i1 - mesh_to_add_on_i_before, j1 - mesh_to_add_on_j_before, i1 - mesh_to_add_on_i_before + nti_sub * self._tile_size, j1 - mesh_to_add_on_j_before + ntj_sub * self._tile_size ) else: raise Exception(f"Array shape {a.shape} is not not supported")
[docs] def pack_subarray(self, packed_array, new_subarray, i1, j1): """ Update the packed_array with a rectangular region of a regular new_array. :param packed_array: The packed array to update. :param new_subarray: The regular array containing the new data to insert. :param i1: The left indice of the region to update - packed_array[i1,j1] == new_subarray[0,0] - in matrix convention. :param j1: The top indice of the region to update - packed_array[i1,j1] == new_subarray[0,0] - in matrix convention. :return: The original packed array with the updated subarray. :rtype: numpy.ndarray """ # Our goal here is to extract the rectangle defined by x1/x2,y1/y2 # coordinates (user coordinates) from the regular array and pack it. a, i1, j1, i2, j2 = self._pad_subarray_to_tiles(new_subarray, i1, j1, np.nan) # Extend to get full tiles and get the coordinates expressed in tiles. ts = self._tile_size j1t = j1 // ts j2t = (j2 + ts - 1) // ts i1t = i1 // ts i2t = (i2 + ts - 1) // ts indir = self.tile_indirection_map() # Coordinates of the tiles in the tile-packed array (tile-packed coordinates) for i in range(i1t, i2t): for j in range(j1t, j2t): left = indir[i,j,0] bottom = indir[i,j,1] source_j = slice((j- j1t)*ts, (j+1 - j1t)*ts) source_i = slice((i - i1t)*ts, (i+1 - i1t)*ts) tile = a[source_i, source_j] packed_array[bottom:bottom+ts, left:left+ts] = np.where( np.isnan(tile), packed_array[bottom:bottom+ts, left:left+ts], tile ) return packed_array
[docs] def pack_subarrays(self, packed_array, new_subarrays: dict[str, tuple[np.ndarray, tuple[int,int]]]) -> np.ndarray: """ Update the packed_array with multiple rectangular regions of regular new_arrays. :param packed_array: The packed array to update. :param new_subarrays: A dictionary where each key is a label and each value is a regular array containing the new data to insert and the position (i1, j1). :param neutral_values: value to fill the empty tile with. :return: The original packed array with the updated subarrays. :rtype: numpy.ndarray """ for label, (new_subarray, (i, j)) in new_subarrays.items(): packed_array = self.pack_subarray(packed_array, new_subarray, i, j) return packed_array
[docs] def get_ij_packed_from_ij_domain(self, i: int, j: int) -> tuple[int, int]: """ Get the packed array coordinates (i_packed, j_packed) corresponding to the original domain coordinates (i, j). :param i: The horizontal coordinate in the original domain. :param j: The vertical coordinate in the original domain. :return: A tuple (i_packed, j_packed) representing the coordinates in the packed array. :rtype: tuple[int, int] """ ts = self._tile_size tile_i = i // ts tile_j = j // ts offset_i = i % ts offset_j = j % ts indir = self.tile_indirection_map() packed_tile_coords = indir[tile_i, tile_j] i_packed = packed_tile_coords[1] + offset_i j_packed = packed_tile_coords[0] + offset_j return (i_packed, j_packed)
[docs] def get_ijs_packed_from_ijs_domain(self, ijs: tuple[np.ndarray]) -> tuple[np.ndarray]: """ Get the packed array coordinates for multiple points corresponding to the original domain coordinates. :param ijs: A tuple of two Numpy arrays (each of shape (N,)) where the first array contains i coordinates and the second array contains j coordinates in the original domain. The ijs is like a np.where result: first row is i's, second row is j's. :return: A tuple of two Numpy arrays (each of shape (N,)) where the first array contains i_packed coordinates and the second array contains j_packed coordinates in the packed array. :rtype: tuple[np.ndarray] """ packed_ijs = (np.empty_like(ijs[0]), np.empty_like(ijs[1])) for idx in range(len(ijs[0])): i = ijs[0][idx] j = ijs[1][idx] i_packed, j_packed = self.get_ij_packed_from_ij_domain(i, j) packed_ijs[0][idx] = i_packed packed_ijs[1][idx] = j_packed return packed_ijs