Source code for wolfhece.tools2d_dll

from pathlib import Path
import numpy as np
import ctypes as ct
import logging

from .wolf_array import header_wolf, getkeyblock

try:
    import pefile
except ImportError:
    logging.warning("pefile module not found. Exported functions will not be listed.")


[docs] class Tools2DFortran: """ Fortran routines/functions available in "2d_cpu_tools.f90" in Wolf_OO Ref : https://docs.python.org/3/library/ctypes.html et https://gcc.gnu.org/onlinedocs/gfortran/Interoperability-with-C.html Ref : https://stackoverflow.com/questions/59330863/cant-import-dll-module-in-python """ def __init__(self, fn_simul:str | Path, debugmode:bool = False, path_to_dll:Path = None): if debugmode: if path_to_dll is None: self.dll_file = Path(__file__).parent / "libs" / "Wolf_tools_debug.dll" else: self.dll_file = path_to_dll / "Wolf_tools_debug.dll" else: if path_to_dll is None: self.dll_file = Path(__file__).parent / "libs" / "Wolf_tools.dll" else: self.dll_file = path_to_dll / "Wolf_tools.dll" self.dll_file = self.dll_file.absolute() if not Path(self.dll_file).exists(): logging.error(f"File {self.dll_file} does not exist.") return # Load the DLL try: self.lib = ct.CDLL(str(self.dll_file)) except OSError as e: logging.error(f"Could not load the DLL: {e}") return fn_simul = Path(fn_simul).absolute() self.fn_simul = str(fn_simul) if not Path(self.fn_simul).exists(): logging.error(f"File {self.fn_simul} does not exist.") return # Convert to ANSI encoding - this is important for Fortran on Windows and Latin-1 encoding self.fn_simul = self.fn_simul.encode('ansi') # res2D_init self.lib.r2D_init.restype = None self.lib.r2D_init.argtypes = [ct.c_char_p, ct.c_int] self.lib.r2D_nbblocks.restype = ct.c_int self.lib.r2D_nbblocks.argtypes = [] self.lib.r2D_header_block.restype = ct.c_int self.lib.r2D_header_block.argtypes = [ ct.POINTER(ct.c_int), # nbx ct.POINTER(ct.c_int), # nby ct.POINTER(ct.c_double), # ox ct.POINTER(ct.c_double), # oy ct.POINTER(ct.c_double), # tx ct.POINTER(ct.c_double), # ty ct.POINTER(ct.c_double), # dx ct.POINTER(ct.c_double), # dy ct.c_int # which_block ] self.lib.r2D_getsizes.restype = None self.lib.r2D_getsizes.argtypes = [ ct.POINTER(ct.c_int), # nbx ct.POINTER(ct.c_int), # nby ct.c_int # which_block ] self.lib.r2D_getnbresults.restype = ct.c_int self.lib.r2D_getnbresults.argtypes = [] self.lib.r2D_getresults.restype = None self.lib.r2D_getresults.argtypes = [ ct.c_int, # which ct.c_int, # nbx ct.c_int, # nby ct.POINTER(ct.c_float), # waterdepth ct.POINTER(ct.c_float), # qx ct.POINTER(ct.c_float), # qy ct.c_int # which_block ] self.lib.r2D_getturbresults.restype = None self.lib.r2D_getturbresults.argtypes = [ ct.c_int, # which ct.c_int, # nbx ct.c_int, # nby ct.POINTER(ct.c_float), # k ct.POINTER(ct.c_float), # e ct.c_int # which_block ] self.lib.r2D_get_times_steps.restype = None self.lib.r2D_get_times_steps.argtypes = [ ct.POINTER(ct.c_float), # times ct.POINTER(ct.c_int), # steps ct.c_int # nb ] self.lib.r2D_get_conv_border.restype = None self.lib.r2D_get_conv_border.argtypes = [ ct.c_char_p, # ptr_fnsim ct.c_char_p, # ptr_fnvec ct.c_int, # simul_type ct.c_char_p, # ptr_fnres ct.c_char_p, # ptr_fncut ct.c_int, # len_fnsim ct.c_int, # len_fnvec ct.c_int, # len_fnres ct.c_int # len_fncut ]
[docs] def _list_exported_functions(self): """ Fortran routines/functions available in """ pe = pefile.PE(self.dll_file) if not hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'): print("No exported functions found.") return for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols: print(f"Function: {exp.name.decode('utf-8') if exp.name else 'None'}, Address: {hex(exp.address)}")
[docs] def _r2D_init(self): """ Alias for the Fortran function r2D_init. subroutine r2D_init(ptr_path, len_path) bind(C, name="r2d_init") !DEC$ ATTRIBUTES DLLEXPORT :: r2d_init type(c_ptr), value, intent(in) :: ptr_path integer(c_int), value, intent(in) :: len_path end subroutine """ self.lib.r2D_init(self.fn_simul, ct.c_int(len(self.fn_simul)))
[docs] def r2D_get_number_of_blocks(self): """ Alias for the Fortran function r2D_nbblocks. function r2D_nbblocks() result(nb_blocks) bind(C, name="r2D_nbblocks") !DEC$ ATTRIBUTES DLLEXPORT :: r2D_nbblocks integer(c_int) :: nb_blocks end function """ self._r2D_init() nb_blocks = self.lib.r2D_nbblocks() return nb_blocks
[docs] def r2D_get_header_one_block(self, which_block:int): """ Alias for the Fortran function r2D_header_block. function r2D_header_block(nbx, nby, dx,dy,ox,oy,tx,ty,which_block) result(ret) bind(C, name="r2D_header_block") !DEC$ ATTRIBUTES DLLEXPORT :: r2D_header_block !DEC$ attributes REFERENCE :: nbx, nby !DEC$ attributes REFERENCE :: dx, dy, ox, oy, tx, ty !DEC$ attributes VALUE :: which_block integer(c_int) :: nbx,nby, ret double precision :: ox,oy,tx,ty,dx,dy integer(c_int), intent(in) :: which_block end function """ self._r2D_init() nbx = ct.c_int() nby = ct.c_int() ox = ct.c_double() oy = ct.c_double() tx = ct.c_double() ty = ct.c_double() dx = ct.c_double() dy = ct.c_double() self.lib.r2D_header_block(ct.byref(nbx), ct.byref(nby), ct.byref(dx), ct.byref(dy), ct.byref(ox), ct.byref(oy), ct.byref(tx), ct.byref(ty), ct.c_int(which_block)) return (nbx.value, nby.value, dx.value, dy.value, ox.value, oy.value, tx.value, ty.value)
[docs] def _r2D_get_header_block_python(self, which_block:int) -> header_wolf: """ Return a header_wolf object with the header of the block number which_block.""" nbx, nby, dx, dy, ox, oy, tx, ty = self.r2D_get_header_one_block(which_block) newhead = header_wolf() newhead.nbx = nbx newhead.nby = nby newhead.dx = dx newhead.dy = dy newhead.origin = ox, oy newhead.translation = tx, ty return newhead
[docs] def r2D_get_header_allblocks(self): nbblocks = self.r2D_get_number_of_blocks() blocks = {} for i in range(1, nbblocks+1): key = getkeyblock(i, addone=False) blocks[key] = self._r2D_get_header_block_python(i) return blocks
[docs] def r2D_get_shape(self, which_block:int): """ Alias for the Fortran function r2D_getsizes. subroutine r2D_getsizes(nbx,nby,which_block) bind(C, name="r2D_getsizes") !DEC$ ATTRIBUTES DLLEXPORT :: r2D_getsizes integer(c_int), intent(out) :: nbx,nby integer(c_int), intent(in) :: which_block end subroutine """ self._r2D_init() nbx = ct.c_int() nby = ct.c_int() self.lib.r2D_getsizes(ct.byref(nbx), ct.byref(nby), ct.c_int(which_block)) return nbx.value, nby.value
[docs] def r2D_get_number_of_results(self): """ Alias for the Fortran function r2D_getnbresults. function r2D_getnbresults() result(nb) bind(C, name="r2D_getnbresults") !DEC$ ATTRIBUTES DLLEXPORT :: r2D_getnbresults integer(c_int) :: nb end function """ self._r2D_init() nb = self.lib.r2D_getnbresults() return nb
[docs] def r2D_get_one_result(self, which:int, which_block:int): """ Alias for the Fortran function r2D_getresults. subroutine r2D_getresults(which,nbx,nby,waterdepth,qx,qy,which_block) bind(C, name="r2D_getresults") !DEC$ ATTRIBUTES DLLEXPORT :: r2D_getresults integer(c_int), intent(in) :: nbx,nby,which integer(c_int), intent(in) :: which_block real, dimension(nbx,nby), intent(out) :: waterdepth,qx,qy end subroutine """ self._r2D_init() nbx, nby = self.r2D_get_shape(which_block) waterdepth = np.zeros((nbx, nby), dtype=np.float32, order='F') qx = np.zeros((nbx, nby), dtype=np.float32, order='F') qy = np.zeros((nbx, nby), dtype=np.float32, order='F') self.lib.r2D_getresults(ct.c_int(which), ct.c_int(nbx), ct.c_int(nby), waterdepth.ctypes.data_as(ct.POINTER(ct.c_float)), qx.ctypes.data_as(ct.POINTER(ct.c_float)), qy.ctypes.data_as(ct.POINTER(ct.c_float)), ct.c_int(which_block)) return waterdepth, qx, qy
[docs] def r2D_get_one_turbulent_result(self, which:int, which_block:int): """ Alias for the Fortran function r2D_getturbresults. subroutine r2D_getturbresults(which,nbx,nby,k,e,which_block) bind(C, name="r2D_getturbresults") !DEC$ ATTRIBUTES DLLEXPORT :: r2D_getturbresults integer(c_int), intent(in) :: nbx,nby,which integer(c_int), intent(in) :: which_block real, dimension(nbx,nby), intent(out) :: k,e end subroutine """ self._r2D_init() nbx, nby = self.r2D_get_shape(which_block) k = np.zeros((nbx, nby), dtype=np.float32, order='F') e = np.zeros((nbx, nby), dtype=np.float32, order='F') self.lib.r2D_getturbresults(ct.c_int(which), ct.c_int(nbx), ct.c_int(nby), k.ctypes.data_as(ct.POINTER(ct.c_float)), e.ctypes.data_as(ct.POINTER(ct.c_float)), ct.c_int(which_block)) return k, e
[docs] def r2D_get_times_steps(self): """ Alias for the Fortran function get_times_steps. subroutine get_times_steps(times, steps, nb) bind(C, name="get_times_steps") !DEC$ ATTRIBUTES DLLEXPORT :: r2D_gettimes_steps integer(c_int), intent(in) :: nb real, dimension(nb), intent(out) :: times integer, dimension(nb), intent(out) :: steps end subroutine """ self._r2D_init() nb = self.r2D_get_number_of_results() times = np.zeros((nb), dtype=np.float32, order='F') steps = np.zeros((nb), dtype=np.int32, order='F') self.lib.r2D_get_times_steps(times.ctypes.data_as(ct.POINTER(ct.c_float)), steps.ctypes.data_as(ct.POINTER(ct.c_int)), ct.c_int(nb)) return times, steps
[docs] def r2D_create_convergence_border(self, vec:str='', simtype:int=0, res:str='', cutcell:str=''): """ Alias for the Fortran function get_conv_border. subroutine get_conv_border(ptr_fnsim, ptr_fnvec, simul_type, ptr_fnres, ptr_fncut, len_fnsim, len_fnvec, len_fnres, len_fncut) bind(C, name="get_conv_border") !DEC$ ATTRIBUTES DLLEXPORT :: r2D_get_conv_border type(c_ptr), value, intent(in) :: ptr_fnsim, ptr_fnvec, ptr_fnres, ptr_fncut integer(c_int), value, intent(in) :: len_fnsim, len_fnvec, len_fnres, len_fncut end subroutine """ fn_vec = str(vec).encode('ansi') fn_res = str(res).encode('ansi') fn_cutcell = str(cutcell).encode('ansi') self._r2D_init() self.lib.r2D_get_conv_border(self.fn_simul, fn_vec, simtype, fn_res, fn_cutcell, ct.c_int(len(self.fn_simul)), ct.c_int(len(fn_vec)), ct.c_int(len(fn_res)), ct.c_int(len(fn_cutcell))) return 0