Source code for wolfgpu.results_store

"""
Author: HECE - University of Liege, Stéphane Champailler, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

"""
This modules contains classes to handle the results of a simulation: creating
them, loading them from disk, saving them.
"""

import io
import os
import csv
import sys
import json
import glob
import shutil
import zipfile
import logging
from datetime import datetime
from time import sleep
from pathlib import Path
from typing import Union

#import pandas as pd
import numpy as np
from scipy.sparse import load_npz, save_npz, csr_matrix
from tifffile import imwrite, imread

from enum import Enum
from .tile_packer import TilePacker, TilePackingMode

[docs] NB_RESULTS_FILE="nb_results.txt"
[docs] class SimulationEventType(Enum):
[docs] BathymetryUpdate = "BathymetryUpdate"
[docs] class ResultType(Enum): """Enumerate all the possible results types stored in the ``ResultsStore``. Some of these represents scalars, some are numpy arrays. """ # The number are columns indices in the CSV file (while we use CSV files)
[docs] T = 0
""" The simulation time (the time inside the simulation). """
[docs] LAST_DELTA_T = 1
""" The last Δt computed at the moment the record is taken. """
[docs] DRYUP_NITER = 2
[docs] NB_ACTIVE_TILES = 3
""" The number of active tiles in the simulation. An active tile is a tile where there is some water. """
[docs] STEP_NUM = 4
""" The number of the simulation step. """
[docs] CLOCK_T = 5
""" The time spent into the simulation (wall clock time) """
[docs] DELTA_EARLY_OUT=6
r""" The delta early out is an approximate measurement of the "stabilisation" of the simulation. It is computed like so: :math:`\Delta S = S_{i+1} - S_i` where :math:`S_i = \sum_{\text{m in all meshes}} |m_h| + |m_{q_x}| + |m_{q_y}|` In other words, the rate of change between two successive recordings computed over :math:`h,qx,qy` of each mesh individually. """
[docs] NB_MOSTLY_DRY_MESHES=7
""" Number of mostly dry mesh. A mostly dry mesh is a mesh where water height is not zero but very small (< 1e-5 m). """
[docs] NB_WET_MESHES=8
""" Number of wet meshes. A wet mesh is a mesh where water height is above zero. """ # These one are not part of the CSV file, so their indices don't represent # column numbers.
[docs] H = 106
""" Water height. """
[docs] QX = 107
""" Water discharge along X axis. """
[docs] QY = 108
""" Water discharge along Y axis. """
[docs] ALPHA = 109
""" Information about dry ups (α's and masks) """
[docs] DEBUG1 = 110
""" Debug arrays. """ @classmethod
[docs] def _from_str(kls, s): """:meta private:""" return { "h": ResultType.H, "qx": ResultType.QX, "qy": ResultType.QY, "t": ResultType.T, "dt": ResultType.LAST_DELTA_T, "last_delta_dt": ResultType.LAST_DELTA_T, "clock_t": ResultType.CLOCK_T, "step_num": ResultType.STEP_NUM, "alpha": ResultType.ALPHA, "debug1": ResultType.DEBUG1, "dryup_niter": ResultType.DRYUP_NITER, "nb_active_tiles":ResultType.NB_ACTIVE_TILES, "delta_early_out":ResultType.DELTA_EARLY_OUT, "nb_mostly_dry_meshes":ResultType.NB_MOSTLY_DRY_MESHES, "nb_wet_meshes":ResultType.NB_WET_MESHES }[s]
[docs] class SimulationEvent: def __init__(self, event_type:SimulationEventType, simulation_time:float, simulation_step:int, data=None): """ Create a simulation event. A simulation event is en event that occurs usually at a specific moment, in some "unplanned" way. - simulation_time: simulation time expressed in seconds. - simulation_step : an integer """ assert simulation_time >= 0 assert simulation_step >= 0 assert isinstance(event_type, SimulationEventType) self._event_type = event_type self._simulation_time = simulation_time self._simulation_step = simulation_step if self._event_type == SimulationEventType.BathymetryUpdate: assert isinstance(data, str) or isinstance(data, Path), f"Bad data: {data}. I expect just a filename with no directory whatsoever" self._data = str(data) else: assert data is None self._data = None self._parent_store = None
[docs] def file_content(self): """ Return the file content of the event. In case of a BathymetryUpdate event, returns the numpy array used for the update. Note: we return the file content because we make no guarantees on the fact that the file can actually be read from the disk like a regular file (because we may zip it). """ assert self._event_type == SimulationEventType.BathymetryUpdate, "this method only makes sense for bathymetry update" assert self._parent_store is not None, "Parent store must be set for this to work (internal issue)" return self._parent_store._read_numpy(self._data)
[docs] def _inject_parent_store(self, store): assert isinstance(store, ResultsStore) self._parent_store = store
def __str__(self) -> str: return f"{self._event_type} {self.simulation_time} {self.simulation_step} {self._data}" @property
[docs] def simulation_step(self): return self._simulation_step
@property
[docs] def simulation_time(self): return self._simulation_time
[docs] def to_dict(self): d = { "event_type" : self._event_type.value, "simulation_time": self._simulation_time, "simulation_step": self._simulation_step } if self._event_type == SimulationEventType.BathymetryUpdate: d["file"] = str(self._data) return d
[docs] def array_to_bytes(x: np.ndarray) -> bytes: np_bytes = io.BytesIO() np.save(np_bytes, x, allow_pickle=True) return np_bytes.getvalue()
[docs] def bytes_to_array(b: bytes) -> np.ndarray: np_bytes = io.BytesIO(b) return np.load(np_bytes, allow_pickle=True)
[docs] class ResultsStore: """ A `ResultsStore` is a tiny container meant to hold the results of a simulation. It gives several ways to query the results easily. """ # We thought abut HDF5 but it seems a tad too complex four our needs.
[docs] _dir: Path
def __init__(self, sim_path: Path, mode:str = "a", tile_packer: TilePacker = None): """ Open a results store for reading, writing or appending. :param sim_path: is the path to the result store. We support two kind of paths. Either we have a directory, in which case the result store will be collection files. Either we have a filename in which case it must be a zip-file name and results will be stored inside taht zip file. :param mode: is `r` read, `w` (over)write or `a` append. :param tile_packer: the tile packer is given only when one's opening the store in *write* mode and wants to store results that were packed with it. The tile packer won't be used by this results store to pack the data but only to remember its configuration along the data to be able to unpack later on, when the results store is read. """ assert sim_path is not None, "I need a path to a result store" assert isinstance(sim_path, Path), f"I only understand `Path` objects, you gave {type(sim_path)}" assert tile_packer is None or mode=="w", "Tile packer only in write mode" self._tile_packer = tile_packer self._dir = None self._path = None self._zip = None self._mode = mode self._sim_times = [] self._sim_events = list[SimulationEvent]() self._metadata = dict() if sim_path.suffix == ".zip": if mode != "w": if not sim_path.exists() or not sim_path.is_file(): raise Exception(f"You want to read results in {sim_path} but it doesn't exist or is not a file") self._path = sim_path else: # Anything that has no .zip suffix is considered a directory # (exisitng or to create) self._dir = sim_path if mode in ('r','a'): assert tile_packer is None,"TilePacker can be given when mode is 'w', not when reading (we'll read from the store ourselves)" self._init_filesystem_for_reading(mode) self._load_current_data() elif mode == 'w': self._init_filesystem_for_writing() self._number_of_results = 0 self._metadata["creation_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Record some information about what was run. # Useful when running lots of tests. self._metadata["python_cli"] = " ".join(sys.argv) self._metadata["python_wolfgpu"] = str(Path(__file__).parent.resolve()) self._metadata["python_working_directory"] = os.getcwd() if tile_packer and tile_packer.mode() != TilePackingMode.TRANSPARENT: self._write_numpy("nap", tile_packer._original_nap) self._metadata["tile_pack"] = { "nap_file" : "nap", "tile_size" : tile_packer.tile_size() } else: raise Exception(f"Unrecognized mode : {mode}") @property
[docs] def creation_time(self): if "creation_time" in self._metadata: return datetime.strptime( self._metadata["creation_time"], "%Y-%m-%d %H:%M:%S" ) else: return None
@property
[docs] def nb_results(self): """ The number of results currently stored in the result store. The last result's index is this value. """ return self._number_of_results
@property
[docs] def path(self) -> Path: """ Returns the path where the result store is located. It is either a directory or a path to a zip-file. """ if self._path is not None: return self._path else: return self._dir
[docs] def _load_current_data(self): self._sim_times = [] self._number_of_results = int(self._read(NB_RESULTS_FILE)) attempts_at_reading = 0 while True: s = self._read(f"sim_times.csv").decode() csv_reader = csv.reader(io.StringIO(s)) header = next(csv_reader) # Skip header self._supported_result_types = [x for x in ResultType if x.value < len(header)] # test useful for added columns in the future for line in csv_reader:#,lineterminator="\n"): l = list(map(float,line)) l[ResultType.STEP_NUM.value] = int(l[ResultType.STEP_NUM.value]) # step num self._sim_times.append(l) if len(self._sim_times) == self.nb_results: # file structure is correct so we conclude our data can be read. break else: # This is an *attempt* there's no guarantee that this # procedure will work. We just hope it'll reduce the probability # of a problem. attempts_at_reading += 1 if attempts_at_reading <= 3: sleep(0.1) else: raise Exception(f"ResultStore state is incoherent !? Number of records actually " f"in database{len(self._sim_times)} != announced number of records " f"{self.nb_results}. Are you reading while updating ?") metadata = json.loads( self._read("metadata.json")) for d in metadata["events"]: se = SimulationEvent(SimulationEventType(d["event_type"]), d["simulation_time"], d["simulation_step"], Path(d["file"])) se._inject_parent_store(self) self._sim_events.append(se) if "tile_pack" in metadata["metadata"]: md = metadata["metadata"]["tile_pack"] self._tile_packer = TilePacker( self._read_numpy(md["nap_file"]), md["tile_size"]) # FIXME There's metadata inside metadata, not exactly fine... self._metadata = metadata["metadata"]
[docs] def reload(self, only_if_changed: bool=True): """ Reloads this result store. If `only_if_changed` is `True` (the default), the store will be reloaded only if its number of results has changed (compared to what it was when first opened). If `False`, the store will be reloaded forcefully. Reloading only makes sense in 'r' (read) mode. This method can be used to track the growth of a result store while it is populated by the simulator. """ assert self._mode in ('r') # This one should be quick so that one can reload on a frequent basis. do_reload = not only_if_changed or int(self._read(NB_RESULTS_FILE)) != self._number_of_results if do_reload: self._init_filesystem_for_reading(self._mode) self._load_current_data()
[docs] def truncate_at_report(self, n: int): """ Kill records from n+1 (included) to the last one n is one based ! """ if self._dir is None: raise Exception("Truncating is not supported on zip based result store") logging.debug(f"Truncating result store at record {n} out of {self.nb_results}") assert n <= self.nb_results, f"When truncating result store: record number to start from is not right: {n} is greater than the number of records we have ({self.nb_results})" # if n <= 1: # if self.nb_results > 1: # raise Exception(f"Restarting at record {n} out of {self.nb_results} is not supported yet. Please restart at record 2 or more.") # else: # raise Exception(f"Restarting at record {n} is not supported yet.") assert self._mode in ('a',), "When truncating result store: truncation only authorized when result store is opened 'a' mode!" if n == self.nb_results: return # Clear regular files for i in range(n+1, self.nb_results+1): name_num = f"{i:07}" for fname in glob.glob(str(self._dir / f"*_{name_num}.npz")): self._delete_file(Path(os.path.basename(fname))) for fname in glob.glob(str(self._dir / f"*_{name_num}.tiff")): self._delete_file(Path(os.path.basename(fname))) self._number_of_results = n self._sim_times = self._sim_times[0:self._number_of_results] # Clear files related to events. last_t = self._sim_times[-1][0] #print(f"last_t {last_t}") to_delete = map(lambda e: e._data, filter(lambda e: e.simulation_time > last_t, self._sim_events)) for fname in to_delete: self._delete_file(Path(fname)) self._sim_events = list(filter(lambda e: e.simulation_time <= last_t, self._sim_events)) self._update_track_files()
[docs] def _update_track_files(self): # FIXME suboptimal (should append instead of re-writing) f = io.StringIO() csv_writer = csv.writer(f) #FIXME Why not using values from ResultType ? csv_writer.writerow(["t","last_delta_t","dryup_niter","nb_active_tiles","step_num","clock_t","delta_early_out","mostly_dry_meshes","nb_wet_meshes"]) def cleaner(tupl): return [v if v is not None else -1 for v in tupl] csv_writer.writerows(map(cleaner, self._sim_times)) self._writestr("sim_times.csv", f.getvalue()) self._writestr( "metadata.json", json.dumps({ "metadata" : self._metadata, "events" : [e.to_dict() for e in self._sim_events] }, indent=2)) # Do this here so that the CSV file has the same number or more. self._writestr(NB_RESULTS_FILE, str(self._number_of_results))
[docs] def close(self): if self._dir is None and self._zip is not None: self._zip.close() self._zip = None else: pass
[docs] def make_a_copy(self, dest_path: Union[str, Path]) -> "ResultsStore": """ Make a copy of the result store and put it in the given path. :param dest_path: where to put the copy. :return: the copy of the results store, open in read only mode. """ assert self._mode == "r", "The results store must be open in read-only mode to be copied" if isinstance(dest_path, str): dest_path = Path(dest_path) assert not dest_path.exists(), f"The destination path you gave {dest_path} already exists" if self._zip: shutil.copy(self.path, dest_path) elif self._dir: shutil.copytree(self.path, dest_path) else: raise Exception("Bad state ???") return ResultsStore(dest_path, mode="r")
def __del__(self): # Important for write operations (see zipfile documentation) self.close() if self._zip is not None: self._zip.close() # def append_results_multi(self, arrays: dict, values: dict): # assert self._mode in ('w','a') # n = f"{self._number_of_results+1:07}" # assert "t" in values and "dt" in values, "You must be backward compatible" # assert "h" in arrays and "qx" in arrays and "qy" in arrays, "You must be backward compatible" # v = [] # for k, value in values.items(): # v.append(v) # self._sim_times.append( v ) # for k, array in arrays.items(): # self._write_numpy(f"{k}_{n}", array) # self._number_of_results += 1 # # Do this lastly because ttracking files are here to interpret the # # content of the results. # self._update_track_files()
[docs] def append_result(self, step_num: int, sim_time: float, dt: float, niter: int, niter_rk: int, h:np.ndarray, qx:np.ndarray, qy:np.ndarray, dbg1:np.ndarray=None, dbg2:np.ndarray=None, clock_time:float=0, delta_early_out:float=0, nb_mostly_dry_meshes:int=0): assert self._mode in ('w','a') # FIXME int(step_num) dirty fix. Somehow I get step num which are float # down here. nb_wet_meshes = np.count_nonzero(h > 0) self._sim_times.append( (sim_time,dt,niter,niter_rk,int(step_num),clock_time, delta_early_out, nb_mostly_dry_meshes, nb_wet_meshes) ) n = f"{self._number_of_results+1:07}" self._write_numpy(f"h_{n}", h) self._write_numpy(f"qx_{n}", qx) self._write_numpy(f"qy_{n}", qy) if dbg1 is not None: # FIXME Remove that, put in in named results. self._write_numpy(f"dbg1_{n}", dbg1) if dbg2 is not None: self._write_numpy(f"dbg2_{n}", dbg2) self._number_of_results += 1 # Do this lastly because tracking files are here to interpret the # content of the results. self._update_track_files()
[docs] def append_additional_result(self, name, data:np.ndarray): """ One can store additional results which have their own names (so they don't belong to the ResultType). For now this is limited to numpy arrays. This function is a hack to help debugging. Don't use it !!! """ assert isinstance(name, str), "I only read str names because ResultType is for pre-defined data." n = f"{self._number_of_results:07}" self._write_numpy(f"{name}_{n}", data)
[docs] def add_event(self, event_type: SimulationEventType, simulation_time=float, simulation_step=int, data=None): """ Record a simulation event. - simulation_time is expressed in seconds - simulation_step is of course an unsigned integer For BathymetryUdate event you must pass the a Path to the bathymetry update file (which will be copied into the result store). """ logging.debug(f"Recording event {event_type} sim_time={simulation_time:.3f}s step={simulation_step}") if event_type == SimulationEventType.BathymetryUpdate: # In case of bathymetry update, the data is the name of the bathy update # file. We'll take care of making a copy of it here. assert isinstance(data, Path), f"Bad data: {data}" dest = data.with_suffix('.{}.npy'.format(simulation_step)).name self._write_file_copy(data, dest) data = dest se = SimulationEvent(event_type, simulation_time, simulation_step, data) se._inject_parent_store(self) self._sim_events.append( se) self._update_track_files()
[docs] def get_events_between_steps(self, first_step:int, last_step: int) -> list[SimulationEvent]: """ Query the events that sits between the given first/last simulation steps. :param first_step: the first simulation steps to consider (1-based) :param last_step: the last simulation steps to consider (1-based) """ assert first_step >= 1 assert last_step >= 1 assert first_step <= self._number_of_results assert last_step <= self._number_of_results assert first_step <= last_step r = [] for event in self._sim_events: if first_step <= event.simulation_step <= last_step: r.append(event) return r
[docs] def get_last_result_index(self) -> int: """ Get the index of the last result in in this result store. The index can be used to query data in the store. """ assert self._number_of_results >= 1, "This file has no results ?" return self.nb_results
[docs] def get_last_result(self, ndx=0) -> tuple[float,float,int,int,np.ndarray,np.ndarray,np.ndarray]: """ returns [t,dt,n_iter_dry_up_euler, n_iter_dry_up_rk, h,qx,qy] """ assert ndx <= 0, "-0 == last, -1=one before last, -2=..." assert self._mode in ["r","a"], "Only makes sense in read or update modes" # -1 to go fro one based to zero based return self.get_result(self.get_last_result_index() + ndx)
[docs] def get_result(self, ndx: int) -> tuple[float,float,int,int,np.ndarray,np.ndarray,np.ndarray]: """ returns [t, dt, n_iter_dry_up_euler, n_iter_dry_up_rk, h, qx, qy] """ assert ndx >= 1, "We're one based" assert ndx <= self.nb_results, f"You're past the last result: ndx={ndx} > {self.nb_results}" assert self._mode in ["r","a"], "Only makes sense in read or update modes" n = f"{ndx:07}" # Built this way so that one can look at results # without preventing write operations arrays = list(self._sim_times[ndx-1])[0:4] # Slice for backward compatibility arrays.extend( self.get_named_result([ResultType.H, ResultType.QX, ResultType.QY], ndx)) return tuple(arrays)
[docs] def get_last_named_result(self, name: Union[str,list[str]], delta_ndx:int = 0) -> Union[np.ndarray, list[np.ndarray]]: """ Get the last result by name(s). The names are those accepted by `get_named_result`. `delta_ndx` means an offset to be added to the last index so that you can count backward fro the last index (so it's always negative). For example a value of -2 means you'll take the third result starting from the last one. A value of 0 means you take the last one. You can call this like this: - get_last_named_result("qy") - get_last_named_result( "h", "qy") - get_last_named_result( [ "h", "qy" ]) - get_last_named_result( [ "h", "qy", ResultType.QX ]) """ # Parameters fiddling to allow the user to pass two parameters names if type(delta_ndx) == str: if type(name) == list: name += delta_ndx else: assert type(name) == str name = [name, delta_ndx] delta_ndx = 0 # back to default value assert delta_ndx <= 0, "You can only go backwards" assert delta_ndx + self.get_last_result_index() >= 0, "You went backwards too much !" return self.get_named_result(name, self.get_last_result_index() + delta_ndx)
# def get_named_results_chronology(self, name: Union[ResultType,list[ResultType]]): # assert self._mode in ["a","r"], "Only makes sense in read or update mode" # if name in (ResultType.T, ResultType.LAST_DELTA_T, ResultType.DRYUP_NITER, ResultType.NB_ACTIVE_TILES, # ResultType.STEP_NUM, ResultType.CLOCK_T): # return self._sim_times[ndx-1][0]
[docs] def _is_result_type_in_database(self, rt: ResultType): return self._mode != "r" or rt in self._supported_result_types
@property
[docs] def _first_row_len(self): """ Returns the number of columns in the first row of the CSV file. This is useful to know if the CSV file has been updated with new columns that we don't know about yet. """ if len(self._sim_times) == 0: logging.warning("No data in the result store") return 0 return len(self._sim_times[0])
[docs] def get_named_series(self, name:Union[str,ResultType]) -> list: """ Returns the series of all recordings of the value identified by the `name` parameter. Currently only works for scalar values (i.e. not matrices). """ if isinstance(name,str): typed_name = ResultType._from_str(name) else: typed_name = name if typed_name in (ResultType.T, ResultType.LAST_DELTA_T, ResultType.DRYUP_NITER, ResultType.NB_ACTIVE_TILES, ResultType.STEP_NUM, ResultType.CLOCK_T, ResultType.DELTA_EARLY_OUT, ResultType.NB_MOSTLY_DRY_MESHES, ResultType.NB_WET_MESHES): if typed_name.value < self._first_row_len: return [line[typed_name.value] for line in self._sim_times] else: logging.warning(f"Requested value {typed_name} is not in the database") logging.warning("Returning a list of zeros to avoid crashing") return [0] * self.nb_results else: raise NotImplementedError(f"The result type you requested ({name}) is not supported yet")
[docs] def get_additional_named_result(self, name: str, ndx:int) -> np.ndarray: n = f"{ndx:07}" ret = self._read_numpy(f"{name}_{n}") if self._tile_packer and name != "active_cells_packed": #print(f"Unpacked {name} {n}") return self._tile_packer.unpack_and_deshuffle_array( ret) else: return ret
[docs] def get_named_result(self, name: Union[str,list[str],ResultType,list[ResultType]], ndx:int) -> Union[np.ndarray, list[np.ndarray]]: """ Looks for result by names. :param name: Supported names are: t, last_delta_t, step_num, h, qx, qy. You can also use `ResultType`. :param ndx: Result number. 1-based. """ assert ndx >= 1, "We're one based" assert ndx <= self.nb_results, f"Your index {ndx} is too far away, max is: {self.nb_results}" assert self._mode in ["a","r"], "Only makes sense in read or update mode" if isinstance(name,str): name_typed = ResultType._from_str(name) else: name_typed = name if isinstance(name_typed, ResultType): if name_typed in (ResultType.T, ResultType.LAST_DELTA_T, ResultType.DRYUP_NITER, ResultType.NB_ACTIVE_TILES, ResultType.STEP_NUM, ResultType.CLOCK_T, ResultType.DELTA_EARLY_OUT, ResultType.NB_MOSTLY_DRY_MESHES, ResultType.NB_WET_MESHES): line = self._sim_times[ndx-1] if self._is_result_type_in_database(name_typed): return line[name_typed.value] else: raise KeyError(f"You're requesting a value not present in the " f"results ({name}). Maybe the results were generated " "with a previous version of the program?") elif name_typed in (ResultType.H,ResultType.QX,ResultType.QY,ResultType.DEBUG1): n = f"{ndx:07}" # FIXME We should be one based in our files too :-) prefix = name_typed.name.lower() # Win32 is case sensitive but linux and co aren't ! # Built this way so that one can look at results # without preventing write operations try: ret = self._read_numpy(f"{prefix}_{n}") if name_typed in (ResultType.H,ResultType.QX,ResultType.QY): if self._tile_packer: return self._tile_packer.unpack_and_deshuffle_array( ret) else: return ret else: return ret except Exception as ex: if self._dir and not self._dir.exists(): # File didn't exist...Something is wrong. When using a # with tempdir construct for the result store one # sometimes forget that once the context is left, the # tempdir is deleted and thus, the result store is # broken... raise Exception(f"The result store directory has disappeared ?! It should be there {self._source_path()}") from ex else: raise Exception(f"Unable to load result named '{prefix}_{n}' (data comes from {self._source_path()})") from ex else: raise KeyError(f"I don't know any values by the name of '{name}'") else: # Reuse this method recursively. return [self.get_named_result(n,ndx) for n in name]
[docs] def get_result_index_at_t(self, t): """ Get index of the result closest to time `t`. """ assert t >= 0, f"Time is in the past (t={t})" SIM_TIME_NDX=0 if t < self._sim_times[0][SIM_TIME_NDX]: Exception(f"Your t={t} is before the first recorded result time (={self._sim_times[-1][SIM_TIME_NDX]})") elif t == self._sim_times[-1][SIM_TIME_NDX]: return self.nb_results elif t > self._sim_times[-1][SIM_TIME_NDX]: raise Exception(f"Your t={t} is after the last recorded result time (={self._sim_times[-1][SIM_TIME_NDX]})") else: for i in range(len(self._sim_times)-1): if self._sim_times[i][SIM_TIME_NDX] <= t < self._sim_times[i+1][SIM_TIME_NDX]: # Are we closer to the beginning of the interval or closer to its end ? if t - self._sim_times[i][SIM_TIME_NDX] < self._sim_times[i+1][SIM_TIME_NDX] - t: return i + 1 else: return i + 2
[docs] def _init_filesystem_for_reading(self, mode): if self._dir is None: if not self._path.exists(): raise FileNotFoundError(f"The file {self._path} doesn't exists") if not self._path.is_file(): raise Exception(f"The path {self._path} is not a file") if not self._path.suffix == ".zip": raise Exception("If a file, a result store must be a zip file, with .zip extension") self._zip = zipfile.ZipFile(self._path, mode=mode) if NB_RESULTS_FILE not in self._zip.namelist(): raise Exception(f"The results store seems broken because it misses {NB_RESULTS_FILE}") else: if not self._dir.exists(): raise FileNotFoundError(f"The directory {self._dir} doesn't exist") if not self._dir.is_dir(): raise NotADirectoryError(f"The directory {self._dir} is not a directory") if not (self._dir / NB_RESULTS_FILE).exists(): raise Exception(f"The results store seems broken because it misses {NB_RESULTS_FILE}")
[docs] def _init_filesystem_for_writing(self): if self._dir is None: if self._path.exists(): assert self._path.is_file() self._path.unlink() self._zip = zipfile.ZipFile(self._path, mode="x") # x=exclusive create else: if self._dir.exists(): assert self._dir.is_dir(), f"If you want to write to {self._dir.is_dir()} \ then it must be a directory (it's not)" for f in os.listdir(self._dir): target = self._dir / f if target.is_file(): target.unlink() else: shutil.rmtree(target) else: os.makedirs(self._dir)
[docs] def _writestr(self, fname, s): if self._dir is None: assert type(s) == str self._zip.writestr(fname, s) else: with open(self._dir / fname,"wb") as fout: if type(s) == bytes: fout.write(s) elif type(s) == str: fout.write(s.encode("utf-8")) else: raise Exception(f"Unrecognized data type {type(s)}")
[docs] def _delete_file(self, fname: Path): assert isinstance(fname, Path) if self._dir is None: # See https://stackoverflow.com/questions/513788/delete-file-from-zipfile-with-the-zipfile-module # I make it a warning because we happen to delete files when truncating a ResultStore. logging.warning("Deleting file is not supported for the zip results format. Read comments in the code.") else: # Delete is used while truncating, which means stuff to delete # must exist. assert not fname.is_absolute(), "The path must be relative to the store directory" p = self._dir / fname p.unlink()
# def _delete_numpy(self, fname: str): # return self._delete_file( Path(fname).with_suffix(".tiff") )
[docs] def _write_file_copy(self, fname_src: Path, fname_dest: str): if self._dir is None: self._zip.write(fname_src, fname_dest) else: #print(self._dir / fname_dest) shutil.copy(fname_src, self._dir / fname_dest)
[docs] def _write_numpy(self, fname, a): assert not Path(fname).suffix, f"Please don't give suffix to filename, I'll handle that myself (you gave {fname})." if self._dir is None: assert type(a) == np.ndarray, f"Got {type(a)} ?" self._zip.writestr(fname, array_to_bytes(a)) else: # np.save((self._dir / fname).with_suffix(".npy"), a) # I need 3 compoents per mesh. That's because I store H,Qx,Qy :-) # (into tiff's RGB...) if len(a.shape) == 3 and a.shape[2] == 3: # Sparse matrices can be used ONLY with 2D matrices. # FIXME Should we deprecate this ? No because w imwrite( (self._dir / fname).with_suffix(".tiff"), np.flipud(a), compression='zlib', compressionargs={'level': 8}) elif len(a.shape) == 3: np.save(self._dir / fname, a) elif len(a.shape) == 2: # npz is cool and everything but it only works with 2D arrays :-( # Performance note # ---------------- # When we record tile packed arrays, we may think that it's useless # to add a CSR packing on top of that. That's not true. Indeed, # the tile packing works on the NAP array which is usually wider # than the wet area. Therefore, we may expect that there is a siginificant # proportion of zeros (dry) cells in the tile-packed data. So, CSR # may still be useful in terms of disk space. # The problem is that decompressing these data may take some time # especially if we want to get a regular (non-CSR) numpy array. save_npz(self._dir / fname, csr_matrix(a)) else: raise ValueError(f"The array you provided has a shape we don't support: {a.shape}")
[docs] def _read(self, fname): if self._dir is None: with zipfile.ZipFile(self._path, mode="r") as zip: with zip.open(fname, mode="r") as fin: return fin.read() else: with open(self._dir / fname, "rb") as fin: return fin.read()
[docs] def _read_numpy(self, fname): if self._dir is None: with zipfile.ZipFile(self._path, mode="r") as zip: with zip.open(fname, mode="r") as fin: return bytes_to_array( fin.read()) else: f = (self._dir / fname).with_suffix(".npz") if f.exists(): # I return a regular np.Array (instead of a csr matrix as given by load_npz) return load_npz(f).toarray() f = (self._dir / fname).with_suffix(".tiff") if f.exists(): return np.flipud(imread( f)) f = (self._dir / fname).with_suffix(".npy") if f.exists(): return np.load(f) raise FileNotFoundError(self._dir / fname)
[docs] def _source_path(self): # For error rerporting and such. Not for looking for data. if self._dir is not None: return self._dir else: return self._path