Source code for wolfgpu.test_scenarios

"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import io
import json
import os
import logging
from pathlib import Path
from math import ceil, floor, sqrt

import numpy as np

from wolfhece.mesh2d.wolf2dprev import prev_parameters_simul, prev_sim2D, prev_parameters_blocks
from wolfhece.wolf_array import WolfArray, getkeyblock
# from wolfhece.PyGui import Wolf2DModel # Deprecated because Wolf2DModel is used only for GUI

# from wolfgpu.gl_utils import Path, logging, np
from .glsimulation import GLSimulation
from .simple_simulation import BoundaryConditionsTypes

[docs] DATA_DIR = Path(__file__).parent.resolve() / "data"
[docs] NPAS_BASE=1_000_000
[docs] def create_swimming_pool(destination_dir: Path, width: int = 100, height: int = 50, x_aligned_borders: bool = True, # can be True, False (=> y aligned), or "squared" water_height=np.float32(5.0), wall_height=np.float32(100.0), bathy_min=np.float32(0.0), napbin: np.ndarray = None, top: np.ndarray = None, frott:float = 0.0, suxsuy=True ) -> prev_sim2D: """ Creates a monobloc prev_sim2D (WOLF 2D) that represents a kind of swimming pool. The pool is open on two of its sides (you **have to** provide boundary conditions to handle that) or fully closed (no b.c. necessary). The pool is filled with water. The water is still (no movement). It is frictionless or not. This class is used for testing purposes. Its base name will be `simul` :param x_aligned_borders: True= borders x aligned, False = borders y aligned, "squared" : both borders. :param width: width of the pool (number of cells along X axis == Nbx == number of lines of the WolfArray) - default: 100 :param height: height of the pool (number of cells along Y axis == Nby == number of columns of the WolfArray) - default: 50 :param water_height: water depth in the pool - default: 5.0 :param wall_height: height of the walls of the pool -- Must be higher than water_height - default: 100 :param bathy_min: minimum depth of the pool (the bottom of the pool) - default: 0.0 :param napbin: a numpy array that will be used to fill the `napbin` array of the simulation :param top: a numpy array that will be used to fill the `top` array of the simulation :param frott: friction coefficient of the pool - scalar value Swimming pool (width*height cells) structure: ////////// with: '/' = masked cell; 'W':wall cell, '.':wet cell /WWWWWWWW/ /......../ /......../ /......../ /WWWWWWWW/ ////////// """ # Global parameters # ----------------- params = prev_parameters_simul() # parameters are initialized to default values during initiliazation routine # Set the geometry of the pool - size and World position params.set_params_geometry(dx = 1., dy = 1., nbx = width, nby = height, origx=0., origy=0., translx=0., transly=0.) # Set default parameters for the simulation # # A test will override them if necessary params.set_params_time_iterations(nb_timesteps= 10_000, optimize_timestep=False, first_timestep_duration= 0.01, writing_frequency=10, writing_mode='Iterations', writing_type='Binary compressed', initial_cond_reading_mode='Binary', ) # Block parameters # ---------------- params_block = prev_parameters_blocks() # Add the block to the simulation params.add_block(params_block) params_block.set_params_flux_type(flux_type='HECE original') params_block.set_params_reconstruction_type(reconstruction_frontier='linear limited', frontier_treatment='Nothing', number_neighbors_in_limiting_phase=5) params_block.set_params_conflict_resolution(mode='Centered') params_block.set_evolutive_domain(evolutive_domain='Yes') if destination_dir is None: m = prev_sim2D() m._from_params_gpu(imported_params = params) else: m = prev_sim2D(fname = str(destination_dir / "simul"), clear = True) m._from_params_gpu(imported_params = params) # ------------------------------------------------------------------------ # First we build "napbin" because other informations depend on it. # We're going to build it as a rectangle that covers the wole matrix # except for a once-cell border (that's needed because suxy construction, # it's also useful so we can put boundary conditions on the border's # cells) if napbin is not None: assert m.napbin.array.shape == napbin.shape, "napbin shape mismatch" m.napbin.array.data[:, :] = napbin[:, :] else: m.napbin.array[1 : m.napbin.array.shape[0] - 1, 1 : m.napbin.array.shape[1] - 1] = 1 m.napbin.mask_lower(1) # Determine the area which may receive water. if x_aligned_borders == "squared": # Borders all around the block wet_vrange = slice(2, m.nby - 1 - 1) wet_hrange = slice(2, m.nbx - 1 - 1) elif x_aligned_borders == True: # FIXME use enums instead of different values # There will be horizontal borders (at top and bottom of the swimming pool) # There won't be vertical borders (so you need boundary conditions there) wet_vrange = slice(2, m.nby - 1 - 1) wet_hrange = slice(1, m.nbx - 1) elif x_aligned_borders == False: # There will be vertical borders (at top and bottom of the swimming pool) # There won't be horizontal borders (so you need boundary conditions there) wet_vrange = slice(1, m.nby - 1) wet_hrange = slice(2, m.nbx - 1 - 1) else: raise Exception(f"I don't recognize this border config: {x_aligned_borders}") # Pay attention! We use `np.float32` because that's # the `dtype` of the array. This way we make sure # there's no type conversion (for example `float` # to `np.float32` is lossy). I apply that also to # zeros to make the point clear (although, of course, # zero is safe to convert in float1,32,64) # Swimming pool # i,j are in cell coordinates (not numpy => -1 to convert) if top is not None: m.top.array.data[:, :] = top[:, :] else: m.top.array.data[:, :] = wall_height m.top.array.data[wet_hrange, wet_vrange] = bathy_min m.hbin.array.data[:, :] = np.float32(0) m.hbin.array.data[wet_hrange, wet_vrange] = water_height m.qxbin.array.data[:, :] = np.float32(0) m.qxbin.array.data[wet_hrange, wet_vrange] = np.float32(0) m.qybin.array.data[:, :] = np.float32(0) m.qybin.array.data[wet_hrange, wet_vrange] = np.float32(0) m.frot.array.data[:, :] = np.float32(0) m.frot.array.data[wet_hrange, wet_vrange] = np.float32(frott) if m.filenamegen is not None: m.save() if suxsuy: m.create_sux_suy() return m
[docs] def scenario_cube_drop(destination_dir: Path, width=None, height=None): """ A cube of water spreads into a filled swimming pool. No boundary conditions. """ if width is None and height is None: m: prev_sim2D = create_swimming_pool(destination_dir, water_height=np.float32(1.0), x_aligned_borders="squared", frott= 0.0) else: m: prev_sim2D = create_swimming_pool(destination_dir, water_height=np.float32(1.0), width=width, height=height, x_aligned_borders="squared", frott = 0.0) # Set usual parameters # - optimizing time step # - ste dt to 0. cause it will be computed by the simulation # - reset boundary conditions m.set_gpu_test(optimize_dt=True, dt = 0., reset_bc=True) hw = m.nbx // 2 hh = m.nby // 2 m.hbin.array.data[ (hw-hw//2):(hw+hw//2), (hh-hh//2):(hh+hh//2)] += np.float32(0.5) return m
[docs] def scenario_still_water(destination_dir: Path, width=10, height=10, rockyness=0, water_height=np.float32(5.0)): """ A frictionless swimming pool filled with completely still, motionless, water. rockyness = 0 = flat bed. 1: maximum randomness on bed height """ wh = water_height m: prev_sim2D = create_swimming_pool(destination_dir, wall_height=wh+1., width=width, height=height, bathy_min=0.0, x_aligned_borders="squared") m.set_gpu_test(reset_bc=True) def LCG(seed, n, a=1664525, c=1013904223, m=2**32) -> np.ndarray: numbers = [] for i in range(n): seed = (a * seed + c) % m numbers.append(seed) return np.array(numbers,dtype=np.float32) / (2**32) rnd = LCG(123, m.top.array.size).reshape(m.top.array.shape) # create a random bed if needed m.top.array[2:-2,2:-2] += rnd[2:-2,2:-2] * wh * rockyness # water level is flat # water depth is water level minus bed height m.hbin.array[2:-2,2:-2] = wh - m.top.array[2:-2,2:-2] m.frot.array[:,:] = np.float32(0.) return m
[docs] def scenario_still_water_rocky_bed(destination_dir: Path, width=10, height=10): """ A swimming pool filled with completely still, motionless, water. Bed is rocky, with a random height between 0 and 1. """ m = scenario_still_water(destination_dir, width=width, height=height, rockyness=0.5) a = m.top.array + m.hbin.array assert np.alltrue( a[2:-2,2:-2] - np.float32(5.) == 0), "Water surface is not flat" assert np.alltrue( m.qxbin.array[2:-2,2:-2] == 0), "Water is moving along X" assert np.alltrue( m.qybin.array[2:-2,2:-2] == 0) , "Water is moving along Y" return m
[docs] def scenario4(destination_dir: Path, N:int = 2830, active_surface:float = 0.25, optim_pas:bool = False, npas:int = 100, hini:float= 10.) -> prev_sim2D: """ A block of water at the center of an empty (dry) pool. Scenario without boundary conditions and friction set to 0.01. :param N: size of the squared pool :param active_surface: proportion of the pool that is active (filled with water) :param optim_pas: optimize time step :param npas: number of time steps to compute """ m: prev_sim2D = create_swimming_pool(destination_dir, width = N, height = N, water_height=np.float32(0)) m.set_gpu_test(n_timesteps=npas, reset_bc=True, writing_frequency=npas//10, optimize_dt= optim_pas, dt = 0. if optim_pas else 0.01,) # proportion of active surface wall_size = 2 half_extent = int(round(sqrt(N*N*active_surface)) / 2) c = N // 2 # center assert c - half_extent >= wall_size, f"Given the active portion of meshes {active_surface}, " \ "I'm afraid I can't build a simulation because I don't have room to put walls around it. " \ "Either make the size (-size) bigger or the active portion (-ap) smaller." m.hbin.array[:,:] = np.float32(0.) K2 = half_extent//2 m.hbin.array[c-K2:c+K2, c-K2:c+K2] = np.float32(hini) m.top.array[:,:] = np.float32(10000.) m.top.array[c-half_extent:c+half_extent, c-half_extent:c+half_extent] = np.float32(0.) m.frot.array[:,:] = np.float32(0.01) m.qxbin.array[:,:] = np.float32(0) m.qybin.array[:,:] = np.float32(0) m.napbin.array[:,:] = 0 logging.debug(f"scenario4 : NAP extent: {c-half_extent-wall_size}:{c+half_extent+wall_size}") m.napbin.array[(c-half_extent-wall_size):(c+half_extent+wall_size), (c-half_extent-wall_size):(c+half_extent+wall_size)] = 1 # I also use this scenario to test symetry, so it must be symetric. assert np.all( m.top.array[:,:] == np.transpose(m.top.array[:,:])), "Not symetric ?" assert np.all( m.hbin.array[:,:] == np.transpose(m.hbin.array[:,:])), "Not symetric ?" assert np.all( m.qxbin.array[:,:] == np.transpose(m.qxbin.array[:,:])), "Not symetric ?" assert np.all( m.qybin.array[:,:] == np.transpose(m.qybin.array[:,:])), "Not symetric ?" assert np.all( m.napbin.array[:,:] == np.transpose(m.napbin.array[:,:])), "Not symetric ?" assert np.all( m.frot.array[:,:] == np.transpose(m.frot.array[:,:])), "Not symetric ?" return m
[docs] def scenario4_odd(destination_dir: Path, N:int = 2830, active_surface:float = 0.25, optim_pas:bool = False, npas:int = 100, hini:float = 1.) -> prev_sim2D: """ A block of water at the center of an empty (dry) pool. Scenario without boundary conditions and friction set to 0.01. :param N: size of the squared pool :param active_surface: proportion of the pool that is active (filled with water) :param optim_pas: optimize time step :param npas: number of time steps to compute """ m: prev_sim2D = create_swimming_pool(destination_dir, width = N, height = N, water_height=np.float32(0)) m.set_gpu_test(n_timesteps=npas, reset_bc=True, writing_frequency=npas//10, optimize_dt= optim_pas, dt = 0. if optim_pas else 0.01,) # proportion of active surface assert N % 2 == 1, "N must be odd" wall_size = 2 half_extent = int(round(sqrt(N*N*active_surface)) / 2) c = N // 2 # center assert c - half_extent >= wall_size, f"Given the active portion of meshes {active_surface}, " \ "I'm afraid I can't build a simulation because I don't have room to put walls around it. " \ "Either make the size (-size) bigger or the active portion (-ap) smaller." m.hbin.array[:,:] = np.float32(0.) K2 = half_extent//2 m.hbin.array[c-K2:c+K2+1, c-K2:c+K2+1] = np.float32(hini) m.top.array[:,:] = np.float32(10000.) m.top.array[c-half_extent:c+half_extent+1, c-half_extent:c+half_extent+1] = np.float32(0.) m.frot.array[:,:] = np.float32(0.01) m.qxbin.array[:,:] = np.float32(0) m.qybin.array[:,:] = np.float32(0) m.napbin.array[:,:] = 0 logging.debug(f"scenario4 : NAP extent: {c-half_extent-wall_size}:{c+half_extent+1+wall_size}") m.napbin.array[(c-half_extent-wall_size):(c+half_extent+1+wall_size), (c-half_extent-wall_size):(c+half_extent+1+wall_size)] = 1 # I also use this scenario to test symetry, so it must be symetric. assert np.all( m.top.array[:,:] == np.transpose(m.top.array[:,:])), "Not symetric ?" assert np.all( m.hbin.array[:,:] == np.transpose(m.hbin.array[:,:])), "Not symetric ?" assert np.all( m.qxbin.array[:,:] == np.transpose(m.qxbin.array[:,:])), "Not symetric ?" assert np.all( m.qybin.array[:,:] == np.transpose(m.qybin.array[:,:])), "Not symetric ?" assert np.all( m.napbin.array[:,:] == np.transpose(m.napbin.array[:,:])), "Not symetric ?" assert np.all( m.frot.array[:,:] == np.transpose(m.frot.array[:,:])), "Not symetric ?" return m
[docs] def scenario_sine_ground_sine_water(destination_dir: Path, N=100, freq=10.0, translate=0): W = 1 m: prev_sim2D = create_swimming_pool(destination_dir, water_height=np.float32(0.0), wall_height=np.float32(2.5), bathy_min=2.5, width=W+4, height=N+4, x_aligned_borders="squared") m.set_gpu_test(dt = 0.01, n_timesteps=NPAS_BASE, optimize_dt=False, Runge_Kutta_pond=0.3) m.frot.array[:,:] = np.float32(0.04) x_slice = slice(2,2+W) h1 = 1.0 p1 = 2 h2 = 0.0 p2 = (N - 4) // 2 h3 = 2.0 p3 = N - 4 for i in range(p1,p2+1): h = h1 + (h2-h1)/((p2-p1+1))*((i-p1)) m.top.array[(x_slice,i+2)] = np.float32(h) for i in range(p2,p3+1): h = h2 + (h3-h2)/((p3-p2+1))*((i-p2)) m.top.array[(x_slice,i+2)] = np.float32(h) for i in range(2,N-2): b = 0.2 * np.sin( (freq * 2.0 * 3.1415 * i / N)) m.top.array[(x_slice,i)] += b m.top.array[m.top.array < 0] = np.float32(0) # Fill the center of the pool with water for i in range(2,N-2): h = 0.5 - m.top.array[(x_slice,i)] h[h<0]=0 m.hbin.array[(x_slice,i+2)] = np.float32(h) for i in range(2,N-2): if 10 < i < N-10: m.hbin.array[(x_slice,i+2)] += 0.2 if translate > 0: m.top.array[:, translate:] = m.top.array[:, :-translate] m.hbin.array[:, translate:] = m.hbin.array[:, :-translate] m.top.array[x_slice, :translate+3] = 5 m.top.array[x_slice, -6:] = 5 return m
[docs] def scenario_small_movement(destination_dir: Path): N = 21 W = 1 m: prev_sim2D = create_swimming_pool(destination_dir, water_height=np.float32(0.0), wall_height=np.float32(2.5), bathy_min=2.5, width=W+4, height=N+4, x_aligned_borders="squared") m.set_gpu_test(dt = 0.01, n_timesteps=NPAS_BASE, optimize_dt=False, Runge_Kutta_pond=0.3) m.frot.array[:,:] = np.float32(0.04) x_slice = slice(2,2+W) for i in range(2,N+4-2): m.top.array[(x_slice,i)] = np.float32(1.) m.hbin.array[(x_slice,i)] = np.float32(0.5+i/N/2.0) m.top.array[(x_slice,1)] = np.float32(3.) m.top.array[(x_slice,N+4-2)] = np.float32(3.) #quick_view(m) return m
[docs] def scenario_multiple_infiltration( dest_dir: Path) -> prev_sim2D: size = 64 nx = size ny = size m = create_swimming_pool(dest_dir, x_aligned_borders="squared", width=nx, height=ny, water_height=0.0, wall_height=1000.0) m.set_gpu_test(optimize_dt=True, dt = 0., n_timesteps=10_000, writing_frequency=1, max_Froude=10., Runge_Kutta_pond=0.3) m.top.array[int(nx/3), :] = np.float32(100.) m.top.array[int(2*nx/3), :] = np.float32(100.) m.top.array[:, int(ny/3)] = np.float32(100.) m.top.array[:, int(2*ny/3)] = np.float32(100.) m.frot.array[:,:] = np.float32(0.1) m.qxbin.array[:,:] = np.float32(0.) m.qybin.array[:,:] = np.float32(0.) #m.hbin.array[m.top.array == 0] = 0.000001 # m.parameters._scheme_optimize_timestep = 1 # m.parameters._timestep_duration = 0 # The first result of the reuslt store is the initali situation, before # any step is done. So if wants to see the result of 10 iterations # it has to wait until the 11-th record... # m.parameters._nb_timesteps = 10000 # 1800*120 # about 100 iteration per seconds of simulations # m.parameters._writing_frequency = 1 # m.parameters.blocks[0]._froude_max = 10 # m.parameters._scheme_cfl = 0.3 # Courant # No zones first ... (-1 == no zone) zones = np.zeros((ny,nx), dtype=np.int32) lo_x,mid_x,hi_x = int(nx*1/6),int(nx/2),int(nx*5/6) lo_y,mid_y,hi_y = int(ny*1/6),int(ny/2),int(ny*5/6) lo_x = slice(lo_x-2,lo_x+2) mid_x = slice(mid_x-2,mid_x+2) hi_x = slice(hi_x-2,hi_x+2) lo_y = slice(lo_y-2,lo_y+2) mid_y = slice(mid_y-2,mid_y+2) hi_y = slice(hi_y-2,hi_y+2) zones[lo_y, lo_x] = 1 zones[lo_y, hi_x] = 2 zones[hi_y, hi_x] = 3 zones[hi_y, lo_x] = 4 zones[mid_y, mid_x] = 5 timings = np.array( [ [ 1, 0.1, 0.0, 0.0, 0.0, 0.0], [ 10, 0.1, 0.1, 0.0, 0.0, 0.0], [ 20, 0.1, 0.1, 0.1, 0.0, 0.0], [ 30, 0.1, 0.1, 0.1, 0.1, 0.0], [ 40, 0.0, 0.0, 0.1, 0.1, 0.1], [ 999, 0.0, 0, 0, 0, 0 ] ], dtype=np.float32) m.infiltration.my_Q = timings # At this point m.infiltration.zoning.array is None. m.inf.array = zones.transpose() return m
[docs] def scenario1(destination_dir: Path, size=100, bc_qx_factor = 1.1, bc_hmod_factor = 1.05, friction = 0.04, WATER_HEIGHT = 10.0, IC_SPEED = 1.0): """ Water moving left to right """ m: prev_sim2D = create_swimming_pool(destination_dir, water_height=np.float32(WATER_HEIGHT), width=size, height=size//2) m.set_gpu_test(reset_bc=True) wet_vrange = range(2, m.parameters.nby - 2) # Numpy coordinates m.qxbin.array[1:m.parameters.nbx -1,wet_vrange] = np.float32(IC_SPEED) m.frot.array[:,:] = np.float32(friction) for y in wet_vrange: # Left of swimming pool: water comes in m.parameters.add_weak_bc_x(2, y+1, # grid coordinates ntype=BoundaryConditionsTypes.QX, value=np.float32(IC_SPEED*bc_qx_factor)) m.parameters.add_weak_bc_x(2, y+1, # grid coordinates ntype=BoundaryConditionsTypes.QY, value=0) # Right of swimming pool: height is set m.parameters.add_weak_bc_x(m.parameters.nbx, y+1, ntype=BoundaryConditionsTypes.HMOD, value=np.float32(WATER_HEIGHT*bc_hmod_factor)) return m
[docs] def create_steady_flow_swimming_pool_bottom_to_top(destination_dir: Path, WATER_HEIGHT = np.float32(10.0), width=50, height=100) -> prev_sim2D: """ Water flows steadily from bottom to top (positive direction). :param IC: Water level is flat across the pool and water is moving at some speed. :param BC: water gets in the simulation at the same speed as the IC. h is set at the same height as IC where the water leaves the simulation. There's no friction. """ assert destination_dir.is_dir() IC_SPEED = 2 m: prev_sim2D = create_swimming_pool(destination_dir, width=width, height=height, x_aligned_borders=False, water_height=WATER_HEIGHT) m.set_gpu_test(n_timesteps=10_000, reset_bc=True, optimize_dt=True) # Don't forget, there's dead border and then there's some walls. wet_vrange = slice(1, m.nby-1) # Numpy coordinates wet_hslice = slice(2, m.nbx-2) wet_hrange = range(2, m.nbx-2) m.qybin.array[wet_hslice,wet_vrange] = np.float32(IC_SPEED) m.frot.array[wet_hslice,wet_vrange] = np.float32(0.) for x in wet_hrange: m.parameters.add_weak_bc_y(x+1, 2, # grid coordinates ntype=BoundaryConditionsTypes.QY, value=np.float32(IC_SPEED)) m.parameters.add_weak_bc_y(x+1, 2, # grid coordinates ntype=BoundaryConditionsTypes.QX, value=np.float32(0.)) m.parameters.add_weak_bc_y(x+1, m.nby, ntype=BoundaryConditionsTypes.HMOD, value=np.float32(WATER_HEIGHT)) return m
[docs] def create_steady_flow_swimming_pool_top_to_bottom(destination_dir: Path, WATER_HEIGHT = np.float32(10.0), width=50, height=100) -> prev_sim2D: """ Water flows steadily from bottom to top (positive direction). IC: Water level is flat across the pool and water is moving at some speed. BC: water gets in the simulation at the same speed as the IC. h is set at the same height as IC where the water leaves the simulation. There's no friction. """ assert destination_dir.is_dir() IC_SPEED = -2 m: prev_sim2D = create_swimming_pool(destination_dir, width=width, height=height, x_aligned_borders=False, water_height=WATER_HEIGHT) m.set_gpu_test(n_timesteps=10000, reset_bc=True, optimize_dt=True) # Don't forget, there's dead border and then there's some walls. wet_vrange = slice(1, m.nby-1) # Numpy coordinates wet_hslice = slice(2, m.nbx-2) wet_hrange = range(2, m.nbx-2) m.qybin.array[wet_hslice,wet_vrange] = np.float32(IC_SPEED) m.frot.array[wet_hslice,wet_vrange] = np.float32(0.) for x in wet_hrange: m.parameters.add_weak_bc_y(x+1, m.nby, # grid coordinates ntype=BoundaryConditionsTypes.QY, value=np.float32(IC_SPEED)) m.parameters.add_weak_bc_y(x+1, m.nby, # grid coordinates ntype=BoundaryConditionsTypes.QX, value=np.float32(0.)) m.parameters.add_weak_bc_y(x+1, 2, ntype=BoundaryConditionsTypes.HMOD, value=np.float32(WATER_HEIGHT)) return m
[docs] def scenario_drying(destination_dir: Path, size=100, friction = 0.04, WATER_HEIGHT = 1.0, top_topo=2.0): """ Water moving left to right """ m: prev_sim2D = create_swimming_pool(destination_dir, x_aligned_borders="squared", water_height=np.float32(0), width=size//4, height=size) m.set_gpu_test(reset_bc=True) m.hbin.array[2:m.parameters.nbx-2, 2:size//8] = np.float32(WATER_HEIGHT) m.frot.array[:,:] = np.float32(friction) m.qybin.array[2:m.parameters.nbx-2, 2:size//8] = np.float32(0.1) r = np.tile(np.linspace(top_topo,0,m.parameters.nby-4).transpose(),(m.parameters.nbx-4, 1)) r2 = np.tile(np.linspace(0,top_topo*2,m.parameters.nby-4).transpose(),(m.parameters.nbx-4, 1)) r = np.maximum(r, r2) # np.flip(r, axis=1)) m.top.array[2:m.parameters.nbx-2, 2:m.parameters.nby-2] = np.float32(r) return m
[docs] def save_short_format(wolf: prev_sim2D, glsim: GLSimulation, filename: Path, row_slice: slice = None, col_slice: slice = None, transpose=False, hbin=None, qxbin=None, qybin=None): assert wolf is not None assert glsim is not None from zipfile import ZipFile import json import io logging.info(f"Saving short form scenario to {filename.absolute()}") assert wolf.napbin is not None if row_slice is None: row_slice=slice(0, wolf.napbin.array.shape[0]) if col_slice is None: col_slice=slice(0, wolf.napbin.array.shape[1]) with ZipFile(Path(filename).with_suffix(".zip"), 'w') as zip_object: s = np.sum(wolf.frot.array.mask) if glsim.ponderation_runge_kutta is None: ponderation = 0.0 else: ponderation = glsim.ponderation_runge_kutta # There are casts because Wolf sometimes gives np.int64 instead of int. params = { "npas": int(wolf.parameters._nb_timesteps), "rk-ponderation": float(ponderation), # wolf.parameters._scheme_rk "manning": float(np.max(wolf.frot.array)), "freq" : int(wolf.parameters._writing_frequency), "report-freq" : int(wolf.parameters._writing_frequency), "courant": float(wolf.parameters._scheme_cfl), "epsilon" : glsim.epsilon, "froude-max" : glsim.froude_max } if wolf.parameters._scheme_optimize_timestep == 0: params["timestep"]= float(wolf.parameters._timestep_duration) zip_object.writestr( "params.json", json.dumps(params,indent=2)) def demask_wolf(a: WolfArray, fill_value=0): # filled: masked values replaced by given value z = a.array.filled(fill_value) if transpose: z = z.transpose() return z[row_slice, col_slice] # print(a.array.shape) # r = a.array.data[1:-1,1:-1] # print(r.shape) # print(f"{np.sum(r):.25f}") # return r with io.BytesIO() as f: np.save(f, demask_wolf(wolf.top, np.max(wolf.top.array))) f.seek(0) zip_object.writestr("top.npy", f.read()) with io.BytesIO() as f: if wolf.hbin is not None: np.save(f, wolf.hbin.array) else: wolf.hbin = wolf.read_fine_array("hbin") np.save(f, demask_wolf(wolf.hbin)) f.seek(0) zip_object.writestr("hbin.npy", f.read()) with io.BytesIO() as f: if wolf.qxbin is not None: np.save(f, wolf.qxbin.array) else: wolf.qxbin = wolf.read_fine_array("qxbin") np.save(f, demask_wolf(wolf.qxbin)) f.seek(0) zip_object.writestr("qxbin.npy", f.read()) with io.BytesIO() as f: if wolf.qybin is not None: np.save(f, wolf.qybin.array) else: wolf.qybin = wolf.read_fine_array("qybin") np.save(f, demask_wolf(wolf.qybin)) f.seek(0) zip_object.writestr("qybin.npy", f.read()) with io.BytesIO() as f: np.save(f, demask_wolf(wolf.frot)) f.seek(0) zip_object.writestr("frot.npy", f.read())