Source code for wolfhece.radar.wolfradar

"""
Author: HECE - University of Liege, Pierre Archambeau, Christophe Dessers
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import wradlib as wrl
import wx
from math import floor
from ..PyVertexvectors import *
from ..hydrology.read import *
from datetime import datetime as date
from datetime import timedelta as tdelta
from datetime import timezone
from os import path
from osgeo import gdal, osr
from ..drawing_obj import Element_To_Draw
from tqdm import tqdm

[docs] RADQPE = 1
[docs] RADCLIM = 2
[docs] RADFLOOD = 3
[docs] FILL_NONE = 0
[docs] FILL_ZERO = 1
[docs] FILL_NAN = 2
[docs] FILL_INTERP = 3
[docs] class L08L72:
[docs] epsgL08:osr.SpatialReference
[docs] epsgL72:osr.SpatialReference
[docs] _conv_L08_2_L72:osr.CoordinateTransformation
[docs] _conv_L72_2_L08:osr.CoordinateTransformation
def __init__(self) -> None: self.epsgL08 = osr.SpatialReference() self.epsgL72 = osr.SpatialReference() self.epsgL08.ImportFromEPSG(3812) self.epsgL72.ImportFromEPSG(31370) self._conv_L08_2_L72 = osr.CoordinateTransformation(self.epsgL08,self.epsgL72) self._conv_L72_2_L08 = osr.CoordinateTransformation(self.epsgL72,self.epsgL08)
[docs] def L08_2_72(self, x:float, y:float): return self._conv_L08_2_L72.TransformPoint(x, y)[0:2]
[docs] def L72_2_08(self, x:float, y:float): return self._conv_L72_2_L08.TransformPoint(x, y)[0:2]
[docs] class RadarIRM(Element_To_Draw): def __init__(self, idx: str = '', plotted: bool = True, mapviewer=None, need_for_wx: bool = False) -> None: super().__init__(idx=idx, plotted=plotted, mapviewer=mapviewer, need_for_wx=need_for_wx) pass
[docs] def convert2rain(self, coordMin:tuple, coordMax:tuple, dateBegin:date, dateEnd:date, deltaT:tdelta, dirIn:str="", dirOut:str="", file_name_conv:str=".radclim.accum", type_rain:int=RADCLIM, recursive_dir="", check_all_polygons:bool=False, fill_data:int=FILL_NONE): # ================= # ALL VERIFICATIONS : logging.info("Starting the conversion of IRM data to rain data ...") logging.info("Verification ongoing ...") # Checking the validity of the repository to read and write : isOk, dirIn = check_path(dirIn, applyCWD=True) if not isOk: logging.error("The directory chosen for IRM data does not exist. Please check your path! ") return isOk, dirOut = check_path(dirOut, applyCWD=True) if not isOk: logging.error("The directory chosen for IRM data does not exist. Please check your path! ") return # Checking the validity of the dates : dt = deltaT.total_seconds() nb_intervals = floor((dateEnd - dateBegin).total_seconds() / dt) time = [dateBegin + tdelta(seconds=dt*i) for i in range(nb_intervals + 1)] if recursive_dir == "": # Prefix for dates -> only if it's not a recursive directory dt_str = "{:.0f}d".format(deltaT.days)*(deltaT.days>0) \ + "{:.0f}h".format(floor(deltaT.seconds)/3600)*(floor(deltaT.seconds/3600)>0) \ + "{:.0f}m".format(floor(deltaT.seconds%3600)/60)*(floor(deltaT.seconds%3600)/60>0) suffix = "".join([file_name_conv, dt_str, ".hdf"]) all_files = [os.path.join(dirIn,"".join([t.strftime("%Y%m%d%H%M%S"),suffix])) for t in time] else: suffix = "".join([file_name_conv, ".hdf"]) all_files = [os.path.join(dirIn, t.strftime("%Y"), t.strftime("%m"), t.strftime("%d"), recursive_dir, "".join([t.strftime("%Y%m%d%H%M%S"),suffix])) for t in time] # are_present = np.all(np.array( \ # [os.path.exists(os.path.join(dirIn,"".join([t.strftime("%Y%m%d%H%M%S"),suffix]))) for t in time] \ # )) are_present = np.all(np.array( \ [os.path.exists(el) for el in all_files] \ )) if not are_present: logging.error("Rain files present in the selected directory are does not contain all the information between the desired interval.") i_problem = np.where(np.array([os.path.exists(el) for el in all_files]) == False)[0] logging.error("The following files are missing :") for i in i_problem: logging.error(all_files[i]) if fill_data == FILL_NONE: logging.error("The process stops because there are missing files!") logging.error("To make an linear interpolation set to False 'no_missing_files' argument.") return # Creating the direcory results # Directory of the shapefile shpDir = os.path.join(dirOut,'Grid') if not os.path.exists(shpDir): try: os.mkdir(shpDir) except OSError: print ("Creation of the directory %s failed" % shpDir) return else: print ("Successfully created the directory %s" % shpDir) shpFile = "Grid_radar.shp" fileOut = os.path.join(shpDir, shpFile) else: shpFile = "Grid_radar.shp" fileOut = os.path.join(shpDir, shpFile) # # Directory of the of the time series timeSeriesDir = os.path.join(dirOut,'IRM') if not os.path.exists(timeSeriesDir): try: os.mkdir(timeSeriesDir) except OSError: print ("Creation of the directory %s failed" % timeSeriesDir) else: print ("Successfully created the directory %s" % timeSeriesDir) logging.info("Verification validated!") # ================= # CORE PROCEDURE # After all verifications, the core procedure can now start : # extract all the points in all the .hdf files and their values -> check whether to crop during this process of after logging.info("Creation of the shapefile ongoing ...") # Definition of the domaine zone limits = vector() limits.add_vertex(wolfvertex(coordMin[0],coordMin[1])) limits.add_vertex(wolfvertex(coordMax[0],coordMin[1])) limits.add_vertex(wolfvertex(coordMax[0],coordMax[1])) limits.add_vertex(wolfvertex(coordMin[0],coordMax[1])) # The shape file will be based on the first file read cur_file = all_files[0] hdf_ds = gdal.Open(cur_file, gdal.GA_ReadOnly) values, coord, proj = wrl.georef.raster.extract_raster_dataset(hdf_ds, mode="edge", nodata=0.0) # project the Lambert 2008 in Lambert 1972 coordinates -> let the possibility to crop either points or polygons coord72 = np.zeros_like(coord) proj72 = L08L72() nbI, nbJ, nbC = np.shape(coord) for i in range(nbI): for j in range(nbJ): coord72[i,j] = np.array(proj72.L08_2_72(coord[i,j,0],coord[i,j,1])) # Creation of a list of polygons containing a list of vertices grouped in tuples all_i, all_j = np.meshgrid(range(nbI-1),range(nbJ-1), indexing='ij') polygons_list = [ [tuple(coord72[i][j]), tuple(coord72[i+1][j]), tuple(coord72[i+1][j+1]), tuple(coord72[i][j+1]), tuple(coord72[i][j])] for i,j,v in zip(all_i.reshape(-1),all_j.reshape(-1),coord72[:-1, :-1].reshape(-1)) ] # create polygons out of the given points polygons = zone() zones_indices = [] i_zone = 0 for cur_poly in polygons_list: cur_vec = vector(name=" ".join(["Zone", str(polygons.nbvectors+1)])) is_inside = False for cur_point in cur_poly: cur_vec.add_vertex(wolfvertex(cur_point[0],cur_point[1])) if limits.isinside(cur_point[0], cur_point[1]): is_inside = True if is_inside: polygons.add_vector(cur_vec) zones_indices.append(i_zone) i_zone += 1 # save the polygons in .shp shapefile polygons.export_shape(fileOut) logging.info("Creation of the shapefile finished !") logging.info("Creation of polygon time series ongoing ...") # Create a folder with the time serie for each polygone timeStps = [[str(t.day), str(t.month), str(t.year), str(t.hour), str(t.minute), str(t.second)] for t in time] all_values = np.zeros((len(all_files), polygons.nbvectors)) for i in tqdm(range(len(all_files))): cur_file = all_files[i] exists = os.path.exists(cur_file) try: if exists: hdf_ds = gdal.Open(cur_file, gdal.GA_ReadOnly) values, coord, proj = wrl.georef.raster.extract_raster_dataset(hdf_ds, mode="edge", nodata=0.0) vec_values = values.reshape(-1) all_values[i,:] = np.nan_to_num([vec_values[i] for i in zones_indices], copy=False, nan=0.0) # FIXME this following line -> to check !!!! -> Convert [mm/h] to accumulated rain [mm] at each time step # radflood if type_rain == RADFLOOD or type_rain == RADCLIM: all_values[i,:] = all_values[i,:] elif type_rain == RADQPE: # radqpe # TODO : check the conversion for radclim all_values[i,:] = all_values[i,:]*(dt/3600.0) # all_values[i,:] = all_values[i,:]*(dt/3600.0) else: if fill_data == FILL_ZERO: all_values[i,:] = 0.0 else: all_values[i,:] = np.nan except: logging.error("".join(["Something bad happened while reading hdf file :", cur_file])) all_values[i,:] = 0.0 logging.info("Creation of polygon time series finished !") logging.info("Writing polygon time series ongoing ...") # Writing the file for iVec in tqdm(range(polygons.nbvectors)): # Clean the nan value with linear interpolation # FIXME : to check the interpolation if(fill_data == FILL_INTERP): logging.warning('Interpolating missing values in the time series -> Still to test!') valid = ~np.isnan(all_values[:,iVec]) invalid = np.isnan(all_values[:,iVec]) all_values[invalid,iVec] = np.interp(np.flatnonzero(invalid), np.flatnonzero(valid), all_values[valid,iVec]) # write in the file with open(os.path.join(timeSeriesDir,"".join([str(iVec+1),".rain"])), 'w') as f: f.write("".join([str(iVec+1),"\n"])) f.write("".join([str(1),"\n"])) f.write("".join([str(7),"\n"])) f.write("".join([str(len(all_files)),"\n"])) for cur_t in range(len(timeStps)): f.write("\t".join(timeStps[cur_t] + [str(all_values[cur_t,iVec])]) + "\n") logging.info("Writing polygon time series finished !") print(are_present)
# def plot(self): # pass
[docs] def _shapefile(fileName:str): pass
if __name__ == "__main__":
[docs] app = wx.App()
# Selection of the working directory idir=wx.DirDialog(None,"Please choose a IRM rain directory") if idir.ShowModal() == wx.ID_CANCEL: print("Operation cancelled!") idir.Destroy() readDir = idir.GetPath() idir.Destroy() idir=wx.DirDialog(None,"Please a directory to write results") if idir.ShowModal() == wx.ID_CANCEL: print("Operation cancelled!") idir.Destroy() writeDir = idir.GetPath() idir.Destroy() irm = RadarIRM() coord_min = (0.0, 0.0) coord_max = (0.0, 0.0) db = date(year=2021, month=7, day=1, tzinfo=timezone.utc) de = date(year=2021, month=7, day=31, hour=23, minute=55, tzinfo=timezone.utc) dt = tdelta(minutes=5) print(db) irm.convert2rain(coord_min, coord_max, dateBegin=db, dateEnd=de, deltaT=dt, dirIn=readDir, dirOut=writeDir) print("The End!")