Source code for wolfhece.radar.wolfradar

"""
Author: HECE - University of Liege, Pierre Archambeau, Christophe Dessers
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import wradlib as wrl
import wx
from math import floor
from ..PyVertexvectors import *
from ..hydrology.read import *
from datetime import datetime as date
from datetime import timedelta as tdelta
from datetime import timezone
from os import path
from osgeo import gdal, osr
from ..drawing_obj import Element_To_Draw


[docs] class L08L72:
[docs] epsgL08:osr.SpatialReference
[docs] epsgL72:osr.SpatialReference
[docs] _conv_L08_2_L72:osr.CoordinateTransformation
[docs] _conv_L72_2_L08:osr.CoordinateTransformation
def __init__(self) -> None: self.epsgL08 = osr.SpatialReference() self.epsgL72 = osr.SpatialReference() self.epsgL08.ImportFromEPSG(3812) self.epsgL72.ImportFromEPSG(31370) self._conv_L08_2_L72 = osr.CoordinateTransformation(self.epsgL08,self.epsgL72) self._conv_L72_2_L08 = osr.CoordinateTransformation(self.epsgL72,self.epsgL08)
[docs] def L08_2_72(self, x:float, y:float): return self._conv_L08_2_L72.TransformPoint(x, y)[0:2]
[docs] def L72_2_08(self, x:float, y:float): return self._conv_L72_2_L08.TransformPoint(x, y)[0:2]
[docs] class RadarIRM(Element_To_Draw): def __init__(self, idx: str = '', plotted: bool = True, mapviewer=None, need_for_wx: bool = False) -> None: super().__init__(idx=idx, plotted=plotted, mapviewer=mapviewer, need_for_wx=need_for_wx) pass
[docs] def convert2rain(self, coordMin:tuple, coordMax:tuple, dateBegin:date, dateEnd:date, deltaT:tdelta, dirIn:str="", dirOut:str="", check_all_polygons:bool=False): from datetime import datetime as date # ================= # ALL VERIFICATIONS : # Checking the validity of the repository to read and write : isOk, dirIn = check_path(dirIn, applyCWD=True) if not isOk: logging.error("The directory chosen for IRM data does not exist. Please check your path! ") return isOk, dirOut = check_path(dirOut, applyCWD=True) if not isOk: logging.error("The directory chosen for IRM data does not exist. Please check your path! ") return # Checking the validity of the dates : dt = deltaT.total_seconds() nb_intervals = floor((dateEnd - dateBegin).total_seconds() / dt) time = [dateBegin + tdelta(seconds=dt*i) for i in range(nb_intervals + 1)] # Prefix for dates dt_str = "{:.0f}d".format(deltaT.days)*(deltaT.days>0) \ + "{:.0f}h".format(floor(deltaT.seconds)/3600)*(floor(deltaT.seconds/3600)>0) \ + "{:.0f}m".format(floor(deltaT.seconds%3600)/60)*(floor(deltaT.seconds%3600)/60>0) suffix = "".join([".radclim.accum", dt_str, ".hdf"]) all_files = [os.path.join(dirIn,"".join([t.strftime("%Y%m%d%H%M%S"),suffix])) for t in time] # are_present = np.all(np.array( \ # [os.path.exists(os.path.join(dirIn,"".join([t.strftime("%Y%m%d%H%M%S"),suffix]))) for t in time] \ # )) are_present = np.all(np.array( \ [os.path.exists(el) for el in all_files] \ )) if not are_present: logging.error("Rain files present in the selected directory are does not contain all the information between the desired interval.") return # Creating the direcory results # Directory of the shapefile shpDir = os.path.join(dirOut,'Grid') if not os.path.exists(shpDir): try: os.mkdir(shpDir) except OSError: print ("Creation of the directory %s failed" % shpDir) return else: print ("Successfully created the directory %s" % shpDir) shpFile = "Grid_radar.shp" fileOut = os.path.join(shpDir, shpFile) else: shpFile = "Grid_radar.shp" fileOut = os.path.join(shpDir, shpFile) # # Directory of the of the time series timeSeriesDir = os.path.join(dirOut,'IRM') if not os.path.exists(timeSeriesDir): try: os.mkdir(timeSeriesDir) except OSError: print ("Creation of the directory %s failed" % timeSeriesDir) else: print ("Successfully created the directory %s" % timeSeriesDir) # ================= # CORE PROCEDURE # After all verifications, the core procedure can now start : # extract all the points in all the .hdf files and their values -> check whether to crop during this process of after # Definition of the domaine zone limits = vector() limits.add_vertex(wolfvertex(coordMin[0],coordMin[1])) limits.add_vertex(wolfvertex(coordMax[0],coordMin[1])) limits.add_vertex(wolfvertex(coordMax[0],coordMax[1])) limits.add_vertex(wolfvertex(coordMin[0],coordMax[1])) # The shape file will be based on the first file read cur_file = all_files[0] hdf_ds = gdal.Open(cur_file, gdal.GA_ReadOnly) values, coord, proj = wrl.georef.raster.extract_raster_dataset(hdf_ds, mode="edge", nodata=0.0) # project the Lambert 2008 in Lambert 1972 coordinates -> let the possibility to crop either points or polygons coord72 = np.zeros_like(coord) proj72 = L08L72() nbI, nbJ, nbC = np.shape(coord) for i in range(nbI): for j in range(nbJ): coord72[i,j] = np.array(proj72.L08_2_72(coord[i,j,0],coord[i,j,1])) # Creation of a list of polygons containing a list of vertices grouped in tuples all_i, all_j = np.meshgrid(range(nbI-1),range(nbJ-1), indexing='ij') polygons_list = [ [tuple(coord72[i][j]), tuple(coord72[i+1][j]), tuple(coord72[i+1][j+1]), tuple(coord72[i][j+1]), tuple(coord72[i][j])] for i,j,v in zip(all_i.reshape(-1),all_j.reshape(-1),coord72[:-1, :-1].reshape(-1)) ] # create polygons out of the given points polygons = zone() zones_indices = [] i_zone = 0 for cur_poly in polygons_list: cur_vec = vector(name=" ".join(["Zone", str(polygons.nbvectors+1)])) is_inside = False for cur_point in cur_poly: cur_vec.add_vertex(wolfvertex(cur_point[0],cur_point[1])) if limits.isinside(cur_point[0], cur_point[1]): is_inside = True if is_inside: polygons.add_vector(cur_vec) zones_indices.append(i_zone) i_zone += 1 # save the polygons in .shp shapefile polygons.export_shape(fileOut) # Create a folder with the time serie for each polygone timeStps = [[str(t.day), str(t.month), str(t.year), str(t.hour), str(t.minute), str(t.second)] for t in time] all_values = np.zeros((len(all_files), polygons.nbvectors)) for i in range(len(all_files)): cur_file = all_files[i] try: hdf_ds = gdal.Open(cur_file, gdal.GA_ReadOnly) values, coord, proj = wrl.georef.raster.extract_raster_dataset(hdf_ds, mode="edge", nodata=0.0) vec_values = values.reshape(-1) all_values[i,:] = np.nan_to_num([vec_values[i] for i in zones_indices], copy=False, nan=0.0) # FIXME this following line -> to check !!!! -> Convert [mm/h] to accumulated rain [mm] at each time step all_values[i,:] = all_values[i,:]*(dt/3600.0) except: logging.error("".join(["Something bad happened while reading hdf file :", cur_file])) all_values[i,:] = 0.0 # Writing the file for iVec in range(polygons.nbvectors): with open(os.path.join(timeSeriesDir,"".join([str(iVec+1),".rain"])), 'w') as f: f.write("".join([str(iVec+1),"\n"])) f.write("".join([str(1),"\n"])) f.write("".join([str(7),"\n"])) f.write("".join([str(len(all_files)),"\n"])) for cur_t in range(len(timeStps)): f.write("\t".join(timeStps[cur_t] + [str(all_values[cur_t,iVec])]) + "\n") print(are_present)
# def plot(self): # pass
[docs] def _shapefile(fileName:str): pass
if __name__ == "__main__":
[docs] app = wx.App()
# Selection of the working directory idir=wx.DirDialog(None,"Please choose a IRM rain directory") if idir.ShowModal() == wx.ID_CANCEL: print("Operation cancelled!") idir.Destroy() readDir = idir.GetPath() idir.Destroy() idir=wx.DirDialog(None,"Please a directory to write results") if idir.ShowModal() == wx.ID_CANCEL: print("Operation cancelled!") idir.Destroy() writeDir = idir.GetPath() idir.Destroy() irm = RadarIRM() coord_min = (0.0, 0.0) coord_max = (0.0, 0.0) db = date(year=2021, month=7, day=1, tzinfo=timezone.utc) de = date(year=2021, month=7, day=31, hour=23, minute=55, tzinfo=timezone.utc) dt = tdelta(minutes=5) print(db) irm.convert2rain(coord_min, coord_max, dateBegin=db, dateEnd=de, deltaT=dt, dirIn=readDir, dirOut=writeDir) print("The End!")