Source code for wolfgpu.results_store

"""
This modules contains classes to handle the results of a simulation: creating
them, loading them from disk, saving them.
"""

import io
import os
import csv
import sys
import json
import glob
import shutil
import zipfile
import logging
from datetime import datetime
from time import sleep
from pathlib import Path
from typing import Union

#import pandas as pd
import numpy as np
from scipy.sparse import load_npz, save_npz, csr_matrix
from tifffile import imwrite, imread

from enum import Enum
from .tile_packer import TilePacker, TilePackingMode

[docs] NB_RESULTS_FILE="nb_results.txt"
[docs] class SimulationEventType(Enum):
[docs] BathymetryUpdate = "BathymetryUpdate"
[docs] class ResultType(Enum): """Enumerate all the possible results types stored in the ``ResultsStore``. Some of these represents scalars, some are numpy arrays. """ # The number are columns indices in the CSV file (while we use CSV files)
[docs] T = 0
""" The simulation time (the time inside the simulation). """
[docs] LAST_DELTA_T = 1
""" The last Δt computed at the moment the record is taken. """
[docs] DRYUP_NITER = 2
[docs] NB_ACTIVE_TILES = 3
""" The number of active tiles in the simulation. An active tile is a tile where there is some water. """
[docs] STEP_NUM = 4
""" The number of the simulation step. """
[docs] CLOCK_T = 5
""" The time spent into the simulation (wall clock time) """
[docs] DELTA_EARLY_OUT=6
r""" The delta early out is an approximate measurement of the "stabilisation" of the simulation. It is computed like so: :math:`\Delta S = S_{i+1} - S_i` where :math:`S_i = \sum_{\text{m in all meshes}} |m_h| + |m_{q_x}| + |m_{q_y}|` In other words, the rate of change between two successive recordings computed over :math:`h,qx,qy` of each mesh individually. """
[docs] NB_MOSTLY_DRY_MESHES=7
""" Number of mostly dry mesh. A mostly dry mesh is a mesh where water height is not zero but very small (< 1e-5 m). """ # These one are not part of the CSV file, so their indices don't represent # column numbers.
[docs] H = 106
""" Water height. """
[docs] QX = 107
""" Water discharge along X axis. """
[docs] QY = 108
""" Water discharge along Y axis. """
[docs] ALPHA = 109
""" Information about dry ups (α's and masks) """
[docs] DEBUG1 = 110
""" Debug arrays. """ @classmethod
[docs] def _from_str(kls, s): """:meta private:""" return { "h": ResultType.H, "qx": ResultType.QX, "qy": ResultType.QY, "t": ResultType.T, "dt": ResultType.LAST_DELTA_T, "last_delta_dt": ResultType.LAST_DELTA_T, "clock_t": ResultType.CLOCK_T, "step_num": ResultType.STEP_NUM, "alpha": ResultType.ALPHA, "debug1": ResultType.DEBUG1, "dryup_niter": ResultType.DRYUP_NITER, "nb_active_tiles":ResultType.NB_ACTIVE_TILES, "delta_early_out":ResultType.DELTA_EARLY_OUT, "nb_mostly_dry_meshes":ResultType.NB_MOSTLY_DRY_MESHES }[s]
[docs] class SimulationEvent: def __init__(self, event_type:SimulationEventType, simulation_time:float, simulation_step:int, data=None): """ Create a simulation event. A simulation event is en event that occurs usually at a specific moment, in some "unplanned" way. - simulation_time: simulation time expressed in seconds. - simulation_step : an integer """ assert simulation_time >= 0 assert simulation_step >= 0 assert isinstance(event_type, SimulationEventType) self._event_type = event_type self._simulation_time = simulation_time self._simulation_step = simulation_step if self._event_type == SimulationEventType.BathymetryUpdate: assert isinstance(data, str) or isinstance(data, Path), f"Bad data: {data}. I expect just a filename with no directory whatsoever" self._data = str(data) else: assert data is None self._data = None self._parent_store = None
[docs] def file_content(self): """ Return the file content of the event. In case of a BathymetryUpdate event, returns the numpy array used for the update. Note: we return the file content because we make no guarantees on the fact that the file can actually be read from the disk like a regular file (because we may zip it). """ assert self._event_type == SimulationEventType.BathymetryUpdate, "this method only makes sense for bathymetry update" assert self._parent_store is not None, "Parent store must be set for this to work (internal issue)" return self._parent_store._read_numpy(self._data)
[docs] def _inject_parent_store(self, store): assert isinstance(store, ResultsStore) self._parent_store = store
def __str__(self) -> str: return f"{self._event_type} {self.simulation_time} {self.simulation_step} {self._data}" @property
[docs] def simulation_step(self): return self._simulation_step
@property
[docs] def simulation_time(self): return self._simulation_time
[docs] def to_dict(self): d = { "event_type" : self._event_type.value, "simulation_time": self._simulation_time, "simulation_step": self._simulation_step } if self._event_type == SimulationEventType.BathymetryUpdate: d["file"] = str(self._data) return d
[docs] def array_to_bytes(x: np.ndarray) -> bytes: np_bytes = io.BytesIO() np.save(np_bytes, x, allow_pickle=True) return np_bytes.getvalue()
[docs] def bytes_to_array(b: bytes) -> np.ndarray: np_bytes = io.BytesIO(b) return np.load(np_bytes, allow_pickle=True)
[docs] class ResultsStore: # We thought abut HDF5 but it seems a tad too complex four our needs.
[docs] _dir: Path
def __init__(self, sim_path: Path, mode:str = "a", tile_packer: TilePacker = None): """ * `sim_path` must include the beacon file name like `d:\alpha\bravo\simul` * `mode` is `r` read, `w` (over)write or `a` append. * `tile_packer`: the tile packer is given only when one's opening the store in *write* mode and wants to store results packed with a tile packer. The tile packer won't be used to pack the data but only to remember its configuration along the data (to be able to unpack later on). """ assert sim_path is not None, "I need a path to a result store" assert isinstance(sim_path, Path), f"I only understand `Path` objects, you gave {type(sim_path)}" self._tile_packer = tile_packer self._dir = None self._path = None self._zip = None self._mode = mode self._sim_times = [] self._sim_events = list[SimulationEvent]() self._metadata = dict() if sim_path.suffix == ".zip": self._path = sim_path else: self._dir = sim_path if mode in ('r','a'): assert tile_packer is None,"TilePacker can be given when mode is 'w', not when reading (we'll read from the store ourselves)" self._init_filesystem_for_reading(mode) self._load_current_data() elif mode == 'w': self._init_filesystem_for_writing() self._number_of_results = 0 self._metadata["creation_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Record some information about what was run. # Useful when running lots of tests. self._metadata["python_cli"] = " ".join(sys.argv) self._metadata["python_wolfgpu"] = str(Path(__file__).parent.resolve()) self._metadata["python_working_directory"] = os.getcwd() if tile_packer and tile_packer.mode() != TilePackingMode.TRANSPARENT: self._write_numpy("nap", tile_packer._original_nap) self._metadata["tile_pack"] = { "nap_file" : "nap", "tile_size" : tile_packer.tile_size() } else: raise Exception(f"Unrecognized mode : {mode}") @property
[docs] def creation_time(self): if "creation_time" in self._metadata: return datetime.strptime( self._metadata["creation_time"], "%Y-%m-%d %H:%M:%S" ) else: return None
@property
[docs] def nb_results(self): """ The number of results currently stored in the result store. The last result's index is this value. """ return self._number_of_results
@property
[docs] def path(self) -> Path: """ Returns the path where the result store is located. It is either a directory or a path to a zip-file. """ if self._path is not None: return self._path else: return self._dir
[docs] def _load_current_data(self): self._sim_times = [] self._number_of_results = int(self._read(NB_RESULTS_FILE)) attempts_at_reading = 0 while True: s = self._read(f"sim_times.csv").decode() csv_reader = csv.reader(io.StringIO(s)) header = next(csv_reader) # Skip header self._supported_result_types = [x for x in ResultType if x.value < len(header)] for line in csv_reader:#,lineterminator="\n"): l = list(map(float,line)) l[ResultType.STEP_NUM.value] = int(l[ResultType.STEP_NUM.value]) # step num self._sim_times.append(l) if len(self._sim_times) == self.nb_results: # file structure is correct so we conclude our data can be read. break else: # This is an *attempt* there's no guarantee that this # procedure will work. We just hope it'll reduce the probability # of a problem. attempts_at_reading += 1 if attempts_at_reading <= 3: sleep(0.1) else: raise Exception(f"ResultStore state is incoherent !? Number of records actually " f"in database{len(self._sim_times)} != announced number of records " f"{self.nb_results}. Are you reading while updating ?") metadata = json.loads( self._read("metadata.json")) for d in metadata["events"]: se = SimulationEvent(SimulationEventType(d["event_type"]), d["simulation_time"], d["simulation_step"], Path(d["file"])) se._inject_parent_store(self) self._sim_events.append(se) if "tile_pack" in metadata["metadata"]: md = metadata["metadata"]["tile_pack"] self._tile_packer = TilePacker( self._read_numpy(md["nap_file"]), md["tile_size"]) # FIXME There's metadata inside metadata, not exactly fine... self._metadata = metadata["metadata"]
[docs] def reload(self, only_if_changed: bool=True): """ Reloads this result store. If `only_if_changed` is `True` (the default), the store will be reloaded only if its number of results has changed (compared to what it was when first opened). If `False`, the store will be reloaded forcefully. Reloading only makes sense in 'r' (read) mode. This method can be used to track the growth of a result store while it is populated by the simulator. """ assert self._mode in ('r') # This one should be quick so that one can reload on a frequent basis. do_reload = not only_if_changed or int(self._read(NB_RESULTS_FILE)) != self._number_of_results if do_reload: self._init_filesystem_for_reading(self._mode) self._load_current_data()
[docs] def truncate_at_report(self, n: int): """ Kill records from n+1 (included) to the last one n is one based ! """ if self._dir is None: raise Exception("Truncating is not supported on zip based result store") logging.debug(f"Truncating result store at record {n} out of {self.nb_results}") assert n <= self.nb_results, f"When truncating result store: record number to start from is not right: {n} is greater than the number of records we have ({self.nb_results})" # if n <= 1: # if self.nb_results > 1: # raise Exception(f"Restarting at record {n} out of {self.nb_results} is not supported yet. Please restart at record 2 or more.") # else: # raise Exception(f"Restarting at record {n} is not supported yet.") assert self._mode in ('a',), "When truncating result store: truncation only authorized when result store is opened 'a' mode!" if n == self.nb_results: return # Clear regular files for i in range(n+1, self.nb_results+1): name_num = f"{i:07}" for fname in glob.glob(str(self._dir / f"*_{name_num}.npz")): self._delete_file(Path(os.path.basename(fname))) for fname in glob.glob(str(self._dir / f"*_{name_num}.tiff")): self._delete_file(Path(os.path.basename(fname))) self._number_of_results = n self._sim_times = self._sim_times[0:self._number_of_results] # Clear files related to events. last_t = self._sim_times[-1][0] #print(f"last_t {last_t}") to_delete = map(lambda e: e._data, filter(lambda e: e.simulation_time > last_t, self._sim_events)) for fname in to_delete: self._delete_file(Path(fname)) self._sim_events = list(filter(lambda e: e.simulation_time <= last_t, self._sim_events)) self._update_track_files()
[docs] def _update_track_files(self): # FIXME suboptimal (should append instead of re-writing) f = io.StringIO() csv_writer = csv.writer(f) csv_writer.writerow(["t","last_delta_t","dryup_niter","nb_active_tiles","step_num","clock_t","delta_early_out","mostly_dry_meshes"]) def cleaner(tupl): return [v if v is not None else -1 for v in tupl] csv_writer.writerows(map(cleaner, self._sim_times)) self._writestr("sim_times.csv", f.getvalue()) self._writestr( "metadata.json", json.dumps({ "metadata" : self._metadata, "events" : [e.to_dict() for e in self._sim_events] }, indent=2)) # Do this here so that the CSV file has the same number or more. self._writestr(NB_RESULTS_FILE, str(self._number_of_results))
[docs] def close(self): if self._dir is None and self._zip is not None: self._zip.close() self._zip = None else: pass
def __del__(self): # Important for write operations (see zipfile documentation) self.close() if self._zip is not None: self._zip.close() # def append_results_multi(self, arrays: dict, values: dict): # assert self._mode in ('w','a') # n = f"{self._number_of_results+1:07}" # assert "t" in values and "dt" in values, "You must be backward compatible" # assert "h" in arrays and "qx" in arrays and "qy" in arrays, "You must be backward compatible" # v = [] # for k, value in values.items(): # v.append(v) # self._sim_times.append( v ) # for k, array in arrays.items(): # self._write_numpy(f"{k}_{n}", array) # self._number_of_results += 1 # # Do this lastly because ttracking files are here to interpret the # # content of the results. # self._update_track_files()
[docs] def append_result(self, step_num: int, sim_time: float, dt: float, niter: int, niter_rk: int, h:np.ndarray, qx:np.ndarray, qy:np.ndarray, dbg1:np.ndarray=None, dbg2:np.ndarray=None, clock_time:float=0, delta_early_out:float=0, nb_mostly_dry_meshes:int=0): assert self._mode in ('w','a') # FIXME int(step_num) dirty fix. Somehow I get step num which are float # down here. self._sim_times.append( (sim_time,dt,niter,niter_rk,int(step_num),clock_time, delta_early_out, nb_mostly_dry_meshes) ) n = f"{self._number_of_results+1:07}" self._write_numpy(f"h_{n}", h) self._write_numpy(f"qx_{n}", qx) self._write_numpy(f"qy_{n}", qy) if dbg1 is not None: # FIXME Remove that, put in in named results. self._write_numpy(f"dbg1_{n}", dbg1) if dbg2 is not None: self._write_numpy(f"dbg2_{n}", dbg2) self._number_of_results += 1 # Do this lastly because tracking files are here to interpret the # content of the results. self._update_track_files()
[docs] def append_additional_result(self, name, data:np.ndarray): """ One can store additional results which have their own names (so they don't belong to the ResultType). For now this is limited to numpy arrays. This function is a hack to help debugging. Don't use it !!! """ assert isinstance(name, str), "I only read str names because ResultType is for pre-defined data." n = f"{self._number_of_results:07}" self._write_numpy(f"{name}_{n}", data)
[docs] def add_event(self, event_type: SimulationEventType, simulation_time=float, simulation_step=int, data=None): """ Record a simulation event. - simulation_time is expressed in seconds - simulation_step is of course an unsigned integer For BathymetryUdate event you must pass the a Path to the bathymetry update file (which will be copied into the result store). """ logging.debug(f"Recording event {event_type} sim_time={simulation_time:.3f}s step={simulation_step}") if event_type == SimulationEventType.BathymetryUpdate: # In case of bathymetry update, the data is the name of the bathy update # file. We'll take care of making a copy of it here. assert isinstance(data, Path), f"Bad data: {data}" dest = data.with_suffix('.{}.npy'.format(simulation_step)).name self._write_file_copy(data, dest) data = dest se = SimulationEvent(event_type, simulation_time, simulation_step, data) se._inject_parent_store(self) self._sim_events.append( se) self._update_track_files()
[docs] def get_events_between_steps(self, first_step:int, last_step: int) -> list[SimulationEvent]: """ Query the events that sits between the given first/last simulation steps. first_step, last_step : inclusive""" assert first_step >= 1 assert last_step >= 1 assert first_step <= self._number_of_results assert last_step <= self._number_of_results assert first_step <= last_step r = [] for event in self._sim_events: if first_step <= event.simulation_step <= last_step: r.append(event) return r
[docs] def get_last_result_index(self) -> int: """ Get the index of the last result in in this result store. The index can be used to query data in the store. """ assert self._number_of_results >= 1, "This file has no results ?" return self.nb_results
[docs] def get_last_result(self, ndx=0) -> tuple[float,float,int,int,np.ndarray,np.ndarray,np.ndarray]: """ returns [t,dt,n_iter_dry_up_euler, n_iter_dry_up_rk, h,qx,qy] """ assert ndx <= 0, "-0 == last, -1=one before last, -2=..." assert self._mode in ["r","a"], "Only makes sense in read or update modes" # -1 to go fro one based to zero based return self.get_result(self.get_last_result_index() + ndx)
[docs] def get_result(self, ndx: int) -> tuple[float,float,int,int,np.ndarray,np.ndarray,np.ndarray]: """ returns [t, dt, n_iter_dry_up_euler, n_iter_dry_up_rk, h, qx, qy] """ assert ndx >= 1, "We're one based" assert ndx <= self.nb_results, f"You're past the last result: ndx={ndx} > {self.nb_results}" assert self._mode in ["r","a"], "Only makes sense in read or update modes" n = f"{ndx:07}" # Built this way so that one can look at results # without preventing write operations arrays = list(self._sim_times[ndx-1])[0:4] # Slice for backward compatibility arrays.extend( self.get_named_result([ResultType.H, ResultType.QX, ResultType.QY], ndx)) return tuple(arrays)
[docs] def get_last_named_result(self, name: Union[str,list[str]], delta_ndx:int = 0) -> Union[np.ndarray, list[np.ndarray]]: """ Get the last result by name(s). The names are those accepted by `get_named_result`. `delta_ndx` means an offset to be added to the last index so that you can count backward fro the last index (so it's always negative). For example a value of -2 means you'll take the third result starting from the last one. A value of 0 means you take the last one. You can call this like this: - get_last_named_result("qy") - get_last_named_result( "h", "qy") - get_last_named_result( [ "h", "qy" ]) - get_last_named_result( [ "h", "qy", ResultType.QX ]) """ # Parameters fiddling to allow the user to pass two parameters names if type(delta_ndx) == str: if type(name) == list: name += delta_ndx else: assert type(name) == str name = [name, delta_ndx] delta_ndx = 0 # back to default value assert delta_ndx <= 0, "You can only go backwards" assert delta_ndx + self.get_last_result_index() >= 0, "You went backwards too much !" return self.get_named_result(name, self.get_last_result_index() + delta_ndx)
# def get_named_results_chronology(self, name: Union[ResultType,list[ResultType]]): # assert self._mode in ["a","r"], "Only makes sense in read or update mode" # if name in (ResultType.T, ResultType.LAST_DELTA_T, ResultType.DRYUP_NITER, ResultType.NB_ACTIVE_TILES, # ResultType.STEP_NUM, ResultType.CLOCK_T): # return self._sim_times[ndx-1][0]
[docs] def _is_result_type_in_database(self, rt: ResultType): return self._mode != "r" or rt in self._supported_result_types
[docs] def get_named_series(self, name:Union[str,ResultType]) -> list: """ Returns the series of all recordings of the value identified by the `name` parameter. Currently only works for scalar values (i.e. not matrices). """ if isinstance(name,str): typed_name = ResultType._from_str(name) else: typed_name = name if typed_name in (ResultType.T, ResultType.LAST_DELTA_T, ResultType.DRYUP_NITER, ResultType.NB_ACTIVE_TILES, ResultType.STEP_NUM, ResultType.CLOCK_T, ResultType.DELTA_EARLY_OUT, ResultType.NB_MOSTLY_DRY_MESHES): return [line[typed_name.value] for line in self._sim_times] else: raise NotImplementedError(f"The result type you requested ({name}) is not supported yet")
[docs] def get_additional_named_result(self, name: str, ndx:int) -> np.ndarray: n = f"{ndx:07}" ret = self._read_numpy(f"{name}_{n}") if self._tile_packer and name != "active_cells_packed": #print(f"Unpacked {name} {n}") return self._tile_packer.unpack_and_deshuffle_array( ret) else: return ret
[docs] def get_named_result(self, name: Union[str,list[str],ResultType,list[ResultType]], ndx:int) -> Union[np.ndarray, list[np.ndarray]]: """ Looks for result by names. Supported names are: t, last_delta_t, step_num, h, qx, qy. ndx: Result number. 1-based. """ assert ndx >= 1, "We're one based" assert ndx <= self.nb_results, f"Your index {ndx} is too far away, max is: {self.nb_results}" assert self._mode in ["a","r"], "Only makes sense in read or update mode" if isinstance(name,str): name_typed = ResultType._from_str(name) else: name_typed = name if isinstance(name_typed, ResultType): if name_typed in (ResultType.T, ResultType.LAST_DELTA_T, ResultType.DRYUP_NITER, ResultType.NB_ACTIVE_TILES, ResultType.STEP_NUM, ResultType.CLOCK_T, ResultType.DELTA_EARLY_OUT, ResultType.NB_MOSTLY_DRY_MESHES): line = self._sim_times[ndx-1] if self._is_result_type_in_database(name_typed): return line[name_typed.value] else: raise KeyError(f"You're requesting a value not present in the " f"results ({name}). Maybe the results were generated " "with a previous version of the program?") elif name_typed in (ResultType.H,ResultType.QX,ResultType.QY,ResultType.DEBUG1): n = f"{ndx:07}" # FIXME We should be one based in our files too :-) prefix = name_typed.name.lower() # Win32 is case sensitive but linux and co aren't ! # Built this way so that one can look at results # without preventing write operations try: ret = self._read_numpy(f"{prefix}_{n}") if name_typed in (ResultType.H,ResultType.QX,ResultType.QY): if self._tile_packer: return self._tile_packer.unpack_and_deshuffle_array( ret) else: return ret else: return ret except Exception as ex: raise Exception(f"Unable to load result named '{prefix}_{n}' (data comes from {self._source_path()})") else: raise KeyError(f"I don't know any values by the name of '{name}'") else: # Reuse this method recursively. return [self.get_named_result(n,ndx) for n in name]
[docs] def get_result_index_at_t(self, t): """ Get index of the result closest to time `t`. """ assert t >= 0, f"Time is in the past (t={t})" SIM_TIME_NDX=0 if t < self._sim_times[0][SIM_TIME_NDX]: Exception(f"Your t={t} is before the first recorded result time (={self._sim_times[-1][SIM_TIME_NDX]})") elif t == self._sim_times[-1][SIM_TIME_NDX]: return self.nb_results elif t > self._sim_times[-1][SIM_TIME_NDX]: raise Exception(f"Your t={t} is after the last recorded result time (={self._sim_times[-1][SIM_TIME_NDX]})") else: for i in range(len(self._sim_times)-1): if self._sim_times[i][SIM_TIME_NDX] <= t < self._sim_times[i+1][SIM_TIME_NDX]: # Are we closer to the beginning of the interval or closer to its end ? if t - self._sim_times[i][SIM_TIME_NDX] < self._sim_times[i+1][SIM_TIME_NDX] - t: return i + 1 else: return i + 2
[docs] def _init_filesystem_for_reading(self, mode): if self._dir is None: if not self._path.exists(): raise FileNotFoundError(f"The file {self._path} doesn't exists") if not self._path.is_file(): raise Exception(f"The path {self._path} is not a file") self._zip = zipfile.ZipFile(self._path, mode=mode) if NB_RESULTS_FILE not in self._zip.namelist(): raise Exception(f"The results store seems broken because it misses {NB_RESULTS_FILE}") else: if not self._dir.exists(): raise FileNotFoundError(f"The directory {self._dir} doesn't exist") if not self._dir.is_dir(): raise NotADirectoryError(f"The directory {self._dir} is not a directory") if not (self._dir / NB_RESULTS_FILE).exists(): raise Exception(f"The results store seems broken because it misses {NB_RESULTS_FILE}")
[docs] def _init_filesystem_for_writing(self): if self._dir is None: if self._path.exists(): assert self._path.is_file() self._path.unlink() self._zip = zipfile.ZipFile(self._path, mode="x") # x=exclusive create else: if self._dir.exists(): assert self._dir.is_dir(), f"If you want to write to {self._dir.is_dir()} \ then it must be a directory (it's not)" for f in os.listdir(self._dir): target = self._dir / f if target.is_file(): target.unlink() else: shutil.rmtree(target) else: os.makedirs(self._dir)
[docs] def _writestr(self, fname, s): if self._dir is None: assert type(s) == str self._zip.writestr(fname, s) else: with open(self._dir / fname,"wb") as fout: if type(s) == bytes: fout.write(s) elif type(s) == str: fout.write(s.encode("utf-8")) else: raise Exception(f"Unrecognized data type {type(s)}")
[docs] def _delete_file(self, fname: Path): assert isinstance(fname, Path) if self._dir is None: # See https://stackoverflow.com/questions/513788/delete-file-from-zipfile-with-the-zipfile-module # I make it a warning because we happen to delete files when truncating a ResultStore. logging.warning("Deleting file is not supported for the zip results format. Read comments in the code.") else: # Delete is used while truncating, which means stuff to delete # must exist. assert not fname.is_absolute(), "The path must be relative to the store directory" p = self._dir / fname p.unlink()
# def _delete_numpy(self, fname: str): # return self._delete_file( Path(fname).with_suffix(".tiff") )
[docs] def _write_file_copy(self, fname_src: Path, fname_dest: str): if self._dir is None: self._zip.write(fname_src, fname_dest) else: #print(self._dir / fname_dest) shutil.copy(fname_src, self._dir / fname_dest)
[docs] def _write_numpy(self, fname, a): assert not Path(fname).suffix, f"Please don't give suffix to filename, I'll handle that myself (you gave {fname})." if self._dir is None: assert type(a) == np.ndarray, f"Got {type(a)} ?" self._zip.writestr(fname, array_to_bytes(a)) else: # np.save((self._dir / fname).with_suffix(".npy"), a) if len(a.shape) == 3 and a.shape[2] == 3: # Sparse matrices can be used ONLY with 2D matrices. # FIXME Should we deprecate this ? No because w imwrite( (self._dir / fname).with_suffix(".tiff"), np.flipud(a), compression='zlib', compressionargs={'level': 8}) elif len(a.shape) == 3: np.save(self._dir / fname, a) elif len(a.shape) == 2: # npz is cool and everything but it only works with 2D arrays :-( save_npz(self._dir / fname, csr_matrix(a)) else: raise ValueError(f"The array you provided has a shape we don't support: {a.shape}")
[docs] def _read(self, fname): if self._dir is None: with zipfile.ZipFile(self._path, mode="r") as zip: with zip.open(fname, mode="r") as fin: return fin.read() else: with open(self._dir / fname, "rb") as fin: return fin.read()
[docs] def _read_numpy(self, fname): if self._dir is None: with zipfile.ZipFile(self._path, mode="r") as zip: with zip.open(fname, mode="r") as fin: return bytes_to_array( fin.read()) else: f = (self._dir / fname).with_suffix(".npz") if f.exists(): # I return a regular np.Array (instead of a csr matrix as given by load_npz) return load_npz(f).toarray() f = (self._dir / fname).with_suffix(".tiff") if f.exists(): return np.flipud(imread( f)) f = (self._dir / fname).with_suffix(".npy") if f.exists(): return np.load(f) raise FileNotFoundError(self._dir / fname)
[docs] def _source_path(self): # For error rerporting and such. Not for looking for data. if self._dir is not None: return self._dir else: return self._path