Source code for wolfhece.lagrangian.particle_system

"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import numpy as np
from OpenGL.GL  import *
import logging
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from matplotlib.axes import Axes
import json
from pathlib import Path
from typing import Union
from multiprocessing import Pool

# from wxasync import StartCoroutine
# import asyncio

from .advection import advection_xy

from .emitter import Emitter
from .particles import Particles
from .velocity_field import Velocity_Field
from ..PyVertexvectors import vector, wolfvertex


# async def _async_load_particle_file(x:tuple[float, int, dict, Path]) -> tuple[float, int, Particles]:
#     """
#     Asynchronous function to load the particles from file.
#     """
#     time, key, part, dir = x

#     mypart = Particles(dir=dir)
#     mypart.deserialize(part)

#     return time, key, mypart


[docs] def _load_particles_files(x:tuple[float, int, dict, Path]) -> tuple[float, int, Particles]: """ Parallel/Multiprocessor function to load the particles from file. """ time, key, part, dir = x mypart = Particles(dir=dir) mypart.deserialize(part) return time, key, mypart
[docs] class Particle_system(): def __init__(self, domain:np.ndarray = None, emitters:list[Emitter] = None, uv:list[Velocity_Field] = None, times:list[float] = None) -> None: if times is None: times = [0.] else: assert len(times) == len(uv), 'times and uv must have the same length.' return self._filename = None self._time = 0. self._particles:dict[float, dict[int, Particles]] = {} self.totaltime = 0. self.dt = 0. self.scheme = 'RK22' self._current = None self._previous = None self._current_step = 0. self._current_step_idx = 0 self._dir = None self.origx, self.origy = 0., 0. self.dx, self.dy = 1., 1. self.nbx, self.nby = 1, 1 self._emitters = emitters self._inside_array = domain == 1 if domain is not None else None self._velocity_fields:dict[float:Velocity_Field] = {} if uv is not None: onefield = uv[0] self.origx, self.origy, self.dx, self.dy = onefield.origx, onefield.origy, onefield.dx, onefield.dy self.nbx, self.nby = onefield.u.shape self._velocity_fields = { curtime:curuv for curtime, curuv in zip(times,uv) } self.sort_vf()
[docs] def reset(self) -> None: """ Reset the particles. """ self._time = 0. self._particles = {} self._current = None self._previous = None
[docs] def check(self) -> tuple[bool, str]: """ Check if the particle system is valid. """ check = True message = '' if len(self._velocity_fields) == 0 : check = False message += 'No velocity field found.\n' if self._emitters is None: check = False message += 'No emitter found.\n' if self._inside_array is None: check = False message += 'No domain found.\n' if len(self._velocity_fields) >0: for idx, (keytime, field) in enumerate(self._velocity_fields.items()): loccheck, msg = field.check() if not loccheck: check = False message += f'Velocity field {idx} is not valid.\n' + msg if self.domain.shape != self.u.shape: check = False message += 'Domain and velocity field must have the same shape.\n' if self.get_header() != (self.origx, self.origy, self.dx, self.dy, self.nbx, self.nby): check = False message += 'Header of the velocity field is not valid.\n' for curtime, curfield in self._velocity_fields.items(): if self.get_header() != curfield.get_header(): check = False message += 'Header of the velocity field is not valid.\n' if self._emitters is not None: for idx, emitter in enumerate(self._emitters): loccheck, msg = emitter.check() if not loccheck: check = False message += f'Emitter {idx+1} is not valid.\n'+ msg if self.totaltime <= 0.: check = False message += 'totaltime <= 0.\n' if self.dt <= 0.: check = False message += 'dt <= 0.\n' if self.dt > self.totaltime: check = False message += 'dt > totaltime\n' if self.scheme not in ['Euler_expl', 'RK22', 'RK4', 'RK45']: check = False message += 'scheme not in [Euler_expl, RK22, RK4, RK45]\n' return check, message
[docs] def append_velocity_field(self, uv:Velocity_Field, time:float) -> None: """ Append a velocity field to the list. """ self._velocity_fields[time] = uv self.sort_vf()
[docs] def load_domain_uv_from_npz(self, filename:str): """ Init the particle system from a npz file. the file must contain: - u: x velocity component - np.ndarray - float64 - v: y velocity component - np.ndarray - float64 - domain: array containing the domain - np.ndarray - int8 - (optionl) header: tuple containing the header of the file (origx, origy, dx, dy, nbx, nby) - tuple[float] If haeder is not found, the header is set to (0., 0., 1., 1., u.shape[0], u.shape[1]). All arrays must have the same shape. After that, you have to define the emitters before baking the particles. """ with np.load(filename) as data: if 'u' not in data.keys(): raise KeyError('u not found in the npz file.') return if 'v' not in data.keys(): raise KeyError('v not found in the npz file.') return if 'domain' not in data.keys(): raise KeyError('domain not found in the npz file.') return assert data['u'].shape == data['v'].shape, 'u and v must have the same shape.' assert data['u'].shape == data['domain'].shape, 'u and domain must have the same shape.' if 'header' in data.keys(): header = data['header'] self.origx, self.origy, self.dx, self.dy, self.nbx, self.nby = header else: self.origx, self.origy = 0., 0. self.dx, self.dy = 1., 1. self.nbx, self.nby = data['u'].shape u = data['u'] v = data['v'] domain = data['domain'] self.domain = domain self._velocity_fields = {0.: Velocity_Field(u, v, origx=self.origx, origy=self.origy, dx=self.dx, dy=self.dy)}
@property
[docs] def domain(self) -> np.ndarray: """ Return the domain. """ return self._inside_array
@domain.setter def domain(self, domain:np.ndarray) -> None: """ Set the domain. """ self._inside_array = domain.copy() @property
[docs] def u(self) -> np.ndarray: """ Return the u component of the velocity field at time 0. """ return self._velocity_fields[0.].u
@property
[docs] def v(self) -> np.ndarray: """ Return the v component of the velocity field at time 0. """ return self._velocity_fields[0.].v
@property
[docs] def blend(self) -> bool: """ Return the blend property. """ blending = np.asarray([part.blend for particles in self._particles.values() for part in particles.values()]) return blending.all()
@blend.setter def blend(self, value:bool) -> None: """ Set the blend property. """ for particles in self._particles.values(): for part in particles.values(): part.blend = value @property
[docs] def times_vf(self): """ Return the times of the velocity fields. """ return list(self._velocity_fields.keys())
@property
[docs] def times(self) -> np.ndarray: """ Return the times. """ return np.asarray(list(self._particles.keys()))
@property
[docs] def sorted_times_vf(self): """ Return the times of the velocity fields sorted. """ return sorted(self._velocity_fields.keys())
[docs] def find_uv1_uv2_t1_t2(self, curtime:float, first_idx:int=0) -> tuple[Velocity_Field, Velocity_Field, float, float, int]: """ Find the two closest velocity fields. """ times = self.times_vf if len(times) == 1: return self._velocity_fields[times[0]], self._velocity_fields[times[0]], times[0], -1., 0 else: idx = min(first_idx, len(times)) while curtime >= times[idx]: idx += 1 if idx == len(times): idx -= 1 break if idx == len(times) - 1: return self._velocity_fields[times[idx]], self._velocity_fields[times[idx]], times[idx], -1., idx else: return self._velocity_fields[times[idx-1]], self._velocity_fields[times[idx]], times[idx-1], times[idx], idx
[docs] def sort_vf(self): """ Sort the velocity fields and update times. """ self._velocity_fields = {curtime:curvf for curtime, curvf in sorted(self._velocity_fields.items(), key=lambda item: item[0])}
@property
[docs] def colors(self) -> list[tuple[float]]: """ Return the colors of the particles. """ return [(step, key, part.color) for step, particles in self._particles.items() for key,part in particles.items()]
@colors.setter def colors(self, colors:list[tuple[float, int, tuple[float]]]) -> None: """ Set the colors of the particles. """ for step, key, color in colors: try: self._particles[step][key].color = color except: logging.warning(f'Particles color not found at step {step} and key {key}.') @property
[docs] def nb_steps(self) -> int: """ Return the number of steps. """ return len(self._particles)
@property
[docs] def keys(self) -> list[float]: """ Return the keys of the particles. """ return list(self._particles.keys())
@property
[docs] def number_of_vf(self) -> int: """ Return the number of velocity fields. """ return len(self._velocity_fields)
@property
[docs] def number_of_emitters(self) -> int: """ Return the number of emitters. """ if self._emitters is None: return 0 return len(self._emitters)
@property
[docs] def current_step(self) -> float: """ Return the current step. """ return self._current_step
@property
[docs] def current_step_idx(self) -> int: """ Return the current step index. """ return self._current_step_idx
@current_step.setter def current_step(self, value:Union[float, int]) -> None: """ Set the current step. """ if isinstance(value, int): if value < 0: value = 0 if value >= len(self.keys): value = len(self.keys) - 1 key = self.keys[value] self._current_step_idx = value elif isinstance(value, float): assert value in self.keys, f'Key {value} not found.' key = value self._current_step_idx = self.keys.index(key) else: raise TypeError('value must be an int or a float.') self._current_step = key self._current = self._particles[key] @current_step_idx.setter def current_step_idx(self, value:int) -> None: """ Set the current step index. """ self.current_step = int(value) @property
[docs] def previous_step(self) -> float: """ Return the previous step. """ idx = self.current_step_idx idx -= 1 if idx < 0: idx = 0 return self.keys[idx]
[docs] def n_previous_step(self, n:int=1) -> list[float]: """ Return n previous steps. """ prev=[] if len(self.keys)==0: return prev idx = self.current_step_idx for i in range(n): idx -= 1 if idx < 0: idx = 0 prev.append(self.keys[idx]) return prev
@property
[docs] def dir(self) -> str: """ Return the directory. """ if dir is None: return '' else: return str(self._dir)
@dir.setter def dir(self, dir:Union[str,Path]) -> None: """ Set the directory. """ self._dir = Path(dir) @property
[docs] def bounds(self) -> tuple[float]: """ Return the bounds of the object. """ ox,oy,dx,dy,nbx,nby = self.get_header() return (ox, oy, ox+dx*float(nbx), oy+dy*float(nby))
@property
[docs] def path_emit(self) -> Path: """ Return the path of the emitters. """ return Path(self.dir) / 'emitters'
@property
[docs] def path_vf(self) -> Path: """ Return the path of the velocity field. """ return Path(self.dir) / 'velocity_fields'
@property
[docs] def path_particles(self) -> Path: """ Return the path of the particles. """ return Path(self.dir) / 'particles'
@property
[docs] def path_domain(self) -> Path: """ Return the path of the domain. """ return Path(self.dir) / 'domain.npz'
[docs] def serialize(self) -> dict: """ Serialize the object. """ np.savez_compressed(self.path_domain, domain=self._inside_array) self.path_emit.mkdir(parents=True, exist_ok=True) self.path_vf.mkdir(parents=True, exist_ok=True) return {#'dir':self.dir, 'origx':self.origx, 'origy':self.origy, 'dx':self.dx, 'dy':self.dy, 'nbx':self.nbx, 'nby':self.nby, 'totaltime':self.totaltime, 'dt':self.dt, 'scheme':self.scheme, 'nb_emitters':self.number_of_emitters, 'emitters':{idx:emitter.save(self.path_emit / 'emitter_{}'.format(idx)) for idx, emitter in enumerate(self._emitters)}, 'nb_vf':self.number_of_vf, 'velocity_fields':{idx:vf.save(self.path_vf / 'vf_{}'.format(idx)) for idx, vf in enumerate(self._velocity_fields.values())}, 'times_vf': list(self._velocity_fields.keys()), 'particles':{time: {key: part.serialize() for key, part in allparts.items()} for time, allparts in self._particles.items()} }
[docs] def deserialize(self, data:dict) -> None: """ Deserialize the object. """ self._particles = {} self._current = None self._previous = None # self.dir = data['dir'] self.origx = data['origx'] self.origy = data['origy'] self.dx = data['dx'] self.dy = data['dy'] self.nbx = data['nbx'] self.nby = data['nby'] try: self.totaltime = data['totaltime'] except: pass try: self.dt = data['dt'] except: pass try: self.scheme = data['scheme'] except: pass self._inside_array = np.load(self.path_domain)['domain'] self._emitters = [Emitter().load(self.path_emit / data['emitters'][str(idx)]) for idx in range(data['nb_emitters'])] times = data['times_vf'] uv = [Velocity_Field().load(self.path_vf / data['velocity_fields'][str(idx)]) for idx in range(data['nb_vf'])] self._velocity_fields = { curtime:curuv for curtime, curuv in zip(times,uv) } # for time, allparts in data['particles'].items(): # self._particles[float(time)] = {} # for key, part in allparts.items(): # self._particles[float(time)][int(key)] = Particles(dir=self.path_particles) # self._particles[float(time)][int(key)].deserialize(part) locdir = self.path_particles #FIXME: this is not working # # asynchoronous loading of the particles # self.tasks = [asyncio.create_task(_async_load_particle_file((time, key, part, locdir))) for time, allparts in data['particles'].items() for key, part in allparts.items()] # loop = asyncio.get_event_loop() # self.async_load = [await task for task in self.tasks] # multiprocess loading of the particles with Pool() as pool: all_parts = pool.map(_load_particles_files, [(time, key, part, locdir) for time, allparts in data['particles'].items() for key, part in allparts.items()]) self._particles = {float(time):{int(key):part} for time, key, part in all_parts} pass
[docs] def save(self, f:str='', save_particles:bool = True)-> None: """ Save the particles to file. """ if f == '' and self._filename is None: return if f == '': f = self._filename else: self._filename = f self.dir = Path(f).parent self.path_particles.mkdir(parents=True, exist_ok=True) f_only = Path(f).name if save_particles: # save the particles in multiple files for time, particles in self._particles.items(): for idx, particle in particles.items(): particle.save(self.path_particles / f'{f_only}_{time}_{idx}.npz') #serialize the object and save it serdict = self.serialize() f = Path(f).with_suffix('.json') json.dump(serdict, open(f'{f}', 'w'),indent=2)
[docs] def load(self, f:str) -> None: """ Load the particles from file. """ f = Path(f).with_suffix('.json') self.dir = f.parent # load data and deserialize it to the object serdict = json.load(open(f'{f}', 'r')) self.deserialize(serdict) self._filename = f
[docs] def get_header(self) -> tuple[float]: """ Return the header of the file. """ return (self.origx, self.origy, self.dx, self.dy, self.nbx, self.nby)
[docs] def bake(self, total_time:float, dt:float, scheme:str = None, callback = None) -> None: """ Bake the particles. """ # sort velocity fields self.sort_vf() self._time = 0. self.totaltime = total_time self.dt = dt if scheme is not None: assert isinstance(scheme, str), 'scheme must be a string in [Euler_expl, RK22, RK4, RK45].' self.scheme = scheme assert isinstance(self.scheme, str), 'scheme must be a string in [Euler_expl, RK22, RK4, RK45].' # first step self._current = self._particles[self._time] = {} # pointer to the previous step self._previous = self._current # emit the particles for idx, emitter in enumerate(self._emitters): self._current[idx] = Particles(*emitter.emit(self._time)) # iterate over the time time_reportin = total_time / 100. idx_time = 0 while self._time < total_time: if self._time % time_reportin < dt: logging.info(f'Time: {self._time} / {total_time}') if callback is not None: callback(self._time) uv1, uv2, t1, t2, idx_time = self.find_uv1_uv2_t1_t2(self._time, idx_time) # update the particles for particles in self._current.values(): particles:Particles # test if there are particles if len(particles.start_position[0]) > 0: particles.update((uv1, uv2, t1, t2), self._time, dt, self.scheme) # update the time self._time += dt # create particles for the next time step self._current = self._particles[self._time] = {} for idx, particles in enumerate(self._previous.values()): # copy the particles that are still alive x, y = particles.get_alive(self._inside_array, self.get_header()) # emit new particles if any xnew, ynew = self._emitters[idx].emit(self._time) # concatenate the arrays self._current[idx] = Particles(np.concatenate((x, xnew)), np.concatenate((y, ynew))) # update the pointers self._previous = self._current # copy the properties of the previous step to the particles of the current one for cur, prev in zip(self._current.values(), self._previous.values()): cur:Particles cur.copy_prop_from(prev)
[docs] def plot(self, time:float = -1., alpha:float=1.) -> None: """ Plot the particles. """ glPolygonMode(GL_FRONT_AND_BACK,GL_LINE) glLineWidth(float(1.)) if time == -1.: curdict = self._current else: if time not in self.keys: logging.warning('Time not found.') return curdict = self._particles[time] for particles in curdict.values(): particles:Particles particles.plot(alpha=alpha)
# glFlush()
[docs] def plot_mpl(self, time:float = -1.) -> tuple[Figure, Axes]: """ Plot the particles using Matplotlib. """ fig, ax = plt.subplots() if time == 'all': for curdict in self._particles.values(): for particles in curdict.values(): particles:Particles particles.plot_mpl(ax) return fig, ax elif time == -1.: curdict = self._current else: if time not in list(self._particles.keys()): logging.warning('Time not found.') return curdict = self._particles[time] for particles in curdict.values(): particles:Particles particles.plot_mpl(ax) ax.set_aspect('equal') fig.tight_layout() return fig, ax