import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.widgets import Slider
from numpy import random as rnd
import math
from pathlib import Path
import json
import timeit
import multiprocessing
import cProfile, pstats
import wx
from os.path import join
import threading
from sklearn.neighbors import KernelDensity
from scipy.ndimage import maximum_filter
from scipy.spatial.distance import cdist
from datetime import datetime, timedelta
try:
from ..PyParams import *
from ..drawing_obj import Element_To_Draw
from ..Results2DGPU import getkeyblock
from ..PyTranslate import _
from ..PyVertex import cloud_vertices
from ..wolf_array import WolfArray, header_wolf
from ..PandasGrid import PandasGrid
except:
from wolfhece.PyParams import *
from wolfhece.drawing_obj import Element_To_Draw
from wolfhece.Results2DGPU import getkeyblock
from wolfhece.PyTranslate import _
from wolfhece.PyVertex import cloud_vertices
from wolfhece.wolf_array import WolfArray
from wolfhece.PandasGrid import PandasGrid
try:
from .drowning_functions import *
except:
from wolfhece.drowning_victims.drowning_functions import *
#index 0 1 2 3 4 5 6 7
[docs]
COLUMN_Z_PARAM = ['vertical','U_z','z_0','mu_stat','Time_float','T_w','ADD','ADD_resurface']
[docs]
COLUMNS_HUMAN = ['Age','BMI','BSA','CAM','CDA','CLA','Death','dm','eps','fp_x','fp_y','fp_z','gender','height','lungs_volume_FRC','lungs_volume_TLC','mass','rho','Volume','V_clothes_o','V_clothes_one','V_clothes_two','error_perc_fat','CSA']
[docs]
class Drowning_victim:
def __init__(self,Path_dir:str = None):
"""
Initialize the simulation parameters.
:param Path_loading:
Path of the simulation loaded.
Attributes:
Profile_this (bool): Binary parameter to activate the profiling of the code.
saving (bool): Binary parameter to save your results.
file_name (str): Name of the file to be saved.
Path_saving (str): Path where you want the file saved.
loading (bool): Binary parameter to load previous results and start from them.
Path_loading (str): Path of the simulation loaded.
Path_Wolf (str): Path of the WolfGPU simulation.
plot_pos (bool): Binary parameter to plot your results.
CFL (float): CFL number to calculate the time step of your simulation.
dt_min (float): Minimum time step for your variable time step.
dt_max (float): Maximum time step for your variable time step.
t_initial (float): Initial time of the simulation.
Days (int): Number of days of the simulation.
Hours (int): Number of hours of the simulation.
Minutes (int): Number of minutes of the simulation.
Seconds (int): Number of seconds of the simulation.
wanted_time (list): Array with all the times at which we want a save.
n_t (int): Length of wanted_time.
count_initial (int): Initial step of the simulation.
count_pre (int): Initial step of the simulation - 1.
n_b (int): Number of simulated bodies.
n_parallel (int): Number of times the process is parallelized (number of cores used).
random_IP (float): Radius of the uncertainty area of the drowning point (in cells).
T_water (float): Average water temperature in °C.
vertical (bool): Binary parameter to consider vertical motion.
DZ (float): Step size for vertical motion (used in simulation).
Z_param (pd.DataFrame): Dataframe holding the parameters for vertical motion simulation.
"""
self.Default_values()
self.from_attributes_to_dictionnary()
## Loads all the parameters from the parameters.param file
if Path_dir is not None:
self.Path_saving = Path(Path_dir)
self.from_dot_param_to_dictionnary(store_dir=self.Path_saving)
self.from_dictionnary_to_attributes()
# self.update_params(str(self.Path_saving))
[docs]
def Default_values(self):
"""
Sets the default values for each parameter by creating a first parameter.param
"""
import pandas as pd
import math
import numpy.random as rnd
self.Profile_this = 0
self.Redraw = 0#[1,2,3] #0 for no
current_dir = Path(__file__).resolve().parent
self.current_dir = current_dir
self.saving = 1
self.file_name = 'Test'
self.loading = 0
self.Path_loading = None
self.Path_saving = None
self.Path_Wolf = None
self.plot_pos = 0
self.a_RK = 0.5
self.image = 0
self.CFL = 0.01
self.dt_min = 0.01
self.dt_max = 1 #s
self.t_initial = 0*60*60*24
self.i_initial = math.floor(self.t_initial/self.dt_max)+1
self.Days = int(0) #days
self.Hours = int(1) #h
self.Minutes = int(0) #min
self.Seconds = int(0) #s
time_goal = self.Days*24*60*60 + self.Hours*60*60 + self.Minutes*60 + self.Seconds #s
self.time_goal = time_goal
self.ind_pos_0_x = 0 #For L14: 3983, L_30: 4303
self.ind_pos_0_y = 0 #For L14: 3780, L_30: 3902
self.origx = 215702
self.origy = 130000
self.dx = 5
self.dy = 5
self.nbx = 8460
self.nby = 10416
self.n_saved = 1
self.n_b = 10000
n_b = self.n_b
self.n_parallel = 2 #Number of processes to be ran in parallel
Z_param = pd.DataFrame(data=None,columns=COLUMN_Z_PARAM,dtype=np.int32)
Z_param.vertical = 1 # 1 = Consider the vertical motion, 0 = not considered
Z_param.U_z = 0*np.ones((n_b)) #0 = U constant on depth, 1 = U varies with the depth (log law)
d_50 = 2*2*40 *10**-3 #to be confirmed
Z_param.z_0 = d_50/30*np.ones((n_b)) #experimental results of Nikuradse (not found if published in 1933 or 1950 but nobody seems to care)
Z_param.mu_stat = 1 * np.ones((n_b)) #rnd.beta(1,1,size=(n_b))*(1-0.3)+0.3
Z_param.Time_float = 0*np.ones((n_b))
Z_param.T_w = 15*np.ones((n_b))
Z_param.ADD = time_goal/60/60/24*15
Z_param.ADD_resurface = 5250/15 * rnd.beta(4,4,size=n_b) #source: Heaton 2011 considering a TADS between 14 and 15 as maximum expension
self.Z_param = Z_param
## Let the viewer edit the parameters
self.victim()
self.path = Path(current_dir)
# self.save_json(Path(self.Path_saving))
[docs]
def victim(self):
"""
Definition of the victim's caracteristics
gender : Gender of the victim, 1 for man, 2 for women
Age : Age of the victim in years
height : Height of the victim in m
mass : Mass of the victim in kg
BMI : BMI of the victim in kg/m²
clothes : clothing type of the victim (0 for naked, 1 for summer clothes, 2 for spring clothes, 3 for winter clothes)
T_w : Average water temperature in °C
ini_drowning : Time at which the victim drowned in the day (format 24H)
"""
self.gender = -1
self.Age = -1
self.height = -1
self.mass = -1
self.BMI = -1
self.clothes = -1
self.T_w = 15
self.ini_drowning = 10 #simpledialog.askinteger('Hour at which the victim fell in the water','Time of drowning in hours: \nExample: 2 AM (2h00) being 2 \n5 PM (17h00) being 17',minvalue=0,maxvalue=23,parent=root)
self.m_b_add = 0 #mass of added accessories
[docs]
def from_attributes_to_dictionnary(self):
"Create a dictionnary from the attributes of the class"
param_dict = {}
# Dictionnaire des sections et paramètres à ajouter
param_dict = {
"Options": {
"Profile": {
"value": self.Profile_this,
"explicit name": "Profile",
"description": "Do you want to profile your code?",
"type": "Integer",
"choices": {
"Don't profile the code":0,
"Profile the code":1
},
"mandatory": False
},
"Save": {
"value": self.saving,
"explicit name": "Save",
"description": "Enable saving of results?",
"type": "Integer",
"choices": {
"Don't save":0,
"Save the results":1
},
"mandatory": True
},
"Load": {
"value": self.loading,
"explicit name": "Load",
"description": "Enable loading of previous results?",
"type": "Integer",
"choices": {
"Don't load the results from a previous simulation":0,
"Load":1
},
"mandatory": True
},
"Plot": {
"value": self.plot_pos,
"explicit name": "Plot",
"description": "Enable plotting of results?",
"type": "Integer",
"choices": {
"Don't plot":0,
"Plot the results when simulation is over":1
},
"mandatory": False
},
"n_parallel": {
"value": self.n_parallel,
"explicit name": "Number of parallel processes",
"description": "Number of parallel processes to use",
"type": "Integer",
"choices": None,
"mandatory": True
},
"vertical": {
"value": 1, # Assuming vertical motion is always enabled
"explicit name": "Vertical motion",
"description": "Consider vertical motion in the simulation?",
"type": "Integer",
"choices": {
"No vertical motion allowed":0,
"With vertical motion allowed":1
},
"mandatory": False
},
"a_RK": {
"value": self.a_RK,
"explicit name": "Runge-Kutta ponderation coefficient",
"description": "Coefficient for RK22 integration",
"type": "Float",
"choices": None,
"mandatory": False
},
"image": {
"value": self.image,
"explicit name": "Progression bar",
"description": "Enable image generation",
"type": "Integer",
"choices": {
"Plot progress with loading bar":0,
"Plot progress with progress image":1
},
"mandatory": False
}
},
"Paths": {
"File": {
"value": self.file_name,
"explicit name": "File name",
"description": "Name of the file to save",
"type": "String",
"choices": None,
"mandatory": True
},
"Save": {
"value": self.Path_saving,
"explicit name": "Save path",
"description": "Path where results will be saved",
"type": "Directory",
"choices": None,
"mandatory": True
},
"Load": {
"value": self.Path_loading,
"explicit name": "Load path",
"description": "Path to load previous results",
"type": "Directory",
"choices": None,
"mandatory": False
},
"Wolf": {
"value": self.Path_Wolf,
"explicit name": "Results of Wolf GPU simulation path",
"description": "Path to the WolfGPU simulation",
"type": "Directory",
"choices": None,
"mandatory": True
}
},
"DT": {
"CFL": {
"value": self.CFL,
"explicit name": "CFL",
"description": "Courant number for time step calculation",
"type": "Float",
"choices": None,
"mandatory": False
},
"dt_min": {
"value": self.dt_min,
"explicit name": "Minimum time step",
"description": "Minimum time step in seconds",
"type": "Float",
"choices": None,
"mandatory": False
},
"dt_max": {
"value": self.dt_max,
"explicit name": "Maximum time step",
"description": "Maximum time step in seconds",
"type": "Float",
"choices": None,
"mandatory": False
}
},
"Duration": {
"t_d": {
"value": self.Days,
"explicit name": "Days",
"description": "Number of days for the simulation",
"type": "Integer",
"choices": None,
"mandatory": True
},
"t_h": {
"value": self.Hours,
"explicit name": "Hours",
"description": "Number of hours for the simulation",
"type": "Integer",
"choices": None,
"mandatory": True
},
"t_min": {
"value": self.Minutes,
"explicit name": "Minutes",
"description": "Number of minutes for the simulation",
"type": "Integer",
"choices": None,
"mandatory": True
},
"t_s": {
"value": self.Seconds,
"explicit name": "Seconds",
"description": "Number of seconds for the simulation",
"type": "Integer",
"choices": None,
"mandatory": True
}
},
"Victim (-1 for unknown)": {
"n_b": {
"value": self.n_b,
"explicit name": "Number of bodies",
"description": "Number of simulated bodies",
"type": "Integer",
"choices": None,
"mandatory": True
},
"gender": {
"value": self.gender,
"explicit name": "Gender",
"description": "Gender of the victim (1 for male, 2 for female)",
"type": "Integer",
"choices": {
"Unknown":-1,
"Man":1,
"Woman":2
},
"mandatory": True
},
"age": {
"value": self.Age,
"explicit name": "Age [years]",
"description": "Age of the victim in years",
"type": "Integer",
"choices": None,
"mandatory": True
},
"h_b": {
"value": self.height,
"explicit name": "Height [m]",
"description": "Height of the victim in meters",
"type": "Float",
"choices": None,
"mandatory": True
},
"m_b": {
"value": self.mass,
"explicit name": "Mass [kg]",
"description": "Mass of the victim in kilograms",
"type": "Float",
"choices": None,
"mandatory": True
},
"BMI": {
"value": self.BMI,
"explicit name": "BMI [kg/m²]",
"description": "Body Mass Index of the victim, needed only if the mass is unknown",
"type": "Float",
"choices": None,
"mandatory": True
},
"clothes": {
"value": self.clothes,
"explicit name": "Clothing type",
"description": "Clothing type of the victim",
"type": "Integer",
"choices": {
"Unknown":-1,
"Naked":0,
"Short and T-shirt (Summer clothes)":1,
"Sweater and trousers (Spring/Fall)":2,
"Sweater, trousers and heavy warm jacket (Winter clothes)":3
},
"mandatory": True
},
"T_w": {
"value": self.T_w,
"explicit name": "Water temperature [°C]",
"description": "Average water temperature in °C",
"type": "Float",
"choices": None,
"mandatory": True
},
"ini_drowning": {
"value": self.ini_drowning,
"explicit name": "Initial drowning time",
"description": "Time at which the victim drowned",
"type": "Integer",
"choices": None,
"mandatory": True
},
"m_b_add": {
"value": self.m_b_add,
"explicit name": "Added mass of accessories [kg]",
"description": "Mass of added accessories, like a backpack or lifting weights",
"type": "Float",
"choices": None,
"mandatory": True
}
},
"Initial_drowning_point": {
"x": {
"value": self.ind_pos_0_x,
"explicit name": "X-cell",
"description": "X-coordinate of the initial drowning point",
"type": "Integer",
"choices": None,
"mandatory": True
},
"y": {
"value": self.ind_pos_0_y,
"explicit name": "Y-cell",
"description": "Y-coordinate of the initial drowning point",
"type": "Integer",
"choices": None,
"mandatory": True
}
},
"Grid": {
"origx": {
"value": self.origx,
"explicit name": "Origin X",
"description": "Origin of the matrix in X",
"type": "Float",
"choices": None,
"mandatory": True
},
"origy": {
"value": self.origy,
"explicit name": "Origin Y",
"description": "Origin of the matrix in Y",
"type": "Float",
"choices": None,
"mandatory": True
},
"dx": {
"value": self.dx,
"explicit name": "Delta X",
"description": "Spatial step of the matrix in X",
"type": "Float",
"choices": None,
"mandatory": True
},
"dy": {
"value": self.dy,
"explicit name": "Delta Y",
"description": "Spatial step of the matrix in Y",
"type": "Float",
"choices": None,
"mandatory": True
},
"nbx": {
"value": self.nbx,
"explicit name": "Nb X",
"description": "Number of steps of the matrix in X",
"type": "Integer",
"choices": None,
"mandatory": True
},
"nby": {
"value": self.nby,
"explicit name": "Nb Y",
"description": "Number of steps of the matrix in Y",
"type": "Integer",
"choices": None,
"mandatory": True
}
}
}
self.param_dict = param_dict
return
[docs]
def from_dictionnary_to_attributes(self):
"""
Update the attributes of the class based on the values in self.param_dict.
"""
# Parcourir les sections et les paramètres dans param_dict
for section, params in self.param_dict.items():
for key, param_data in params.items():
# Récupérer la valeur du paramètre
value = param_data.get("value", None)
if param_data.get("type", None)=='Integer':
value = int(value)
# Mettre à jour l'attribut correspondant dans self
if section == "Options":
if key == "Profile":
self.Profile_this = value
elif key == "Save":
self.saving = value
elif key == "Load":
self.loading = value
elif key == "Plot":
self.plot_pos = value
elif key == "n_parallel":
self.n_parallel = value
elif key == "vertical":
self.Z_param.vertical = value
elif key == "a_RK":
self.a_RK = value
elif key == "image":
self.image = value
elif section == "Paths":
if key == "File":
self.file_name = value
elif key == "Save":
self.Path_saving = value
elif key == "Load":
self.Path_loading = value
elif key == "Wolf":
self.Path_Wolf = value
elif section == "DT":
if key == "CFL":
self.CFL = value
elif key == "dt_min":
self.dt_min = value
elif key == "dt_max":
self.dt_max = value
elif section == "Duration":
if key == "Days":
self.Days = value
elif key == "Hours":
self.Hours = value
elif key == "Minutes":
self.Minutes = value
elif key == "Seconds":
self.Seconds = value
elif section == "Victim (-1 for unknown)":
if key == "n_b":
self.n_b = value
elif key == "gender":
self.gender = value
elif key == "age":
self.Age = value
elif key == "h_b":
self.height = value
elif key == "m_b":
self.mass = value
elif key == "BMI":
self.BMI = value
elif key == "clothes":
self.clothes = value
elif key == "T_w":
self.T_w = value
elif key == "ini_drowning":
self.ini_drowning = value
elif key == "m_b_add":
self.m_b_add = value
elif section == "Initial_drowning_point":
if key == "x":
self.ind_pos_0_x = value
elif key == "y":
self.ind_pos_0_y = value
elif section == "Grid":
if key == "Origin X":
self.origx = value
elif key == "Origin Y":
self.origy = value
elif key == "Delta X":
self.dx = value
elif key == "Delta Y":
self.dy = value
elif key == "Nb X":
self.nbx = value
elif key == "Nb Y":
self.nby = value
# Initialise the parameters of the simulation with default values and values given in the parameters.param file
self.t_initial = 0
self.i_initial = 0
self.time_goal = self.Days*24*60*60 + self.Hours*60*60 + self.Minutes*60 + self.Seconds #s
self.wanted_time = np.array([self.t_initial])
self.n_saved = 1
for i in np.arange(10,self.time_goal+10,10):
if i<60:
self.wanted_time = np.append(self.wanted_time,i)
self.n_saved += 1
elif np.logical_and(i<10*60,(i%60==0)):
self.wanted_time = np.append(self.wanted_time,i)
self.n_saved += 1
elif np.logical_and(i<30*60,(i%(5*60)==0)):
self.wanted_time = np.append(self.wanted_time,i)
self.n_saved += 1
elif np.logical_and(i<60*60,(i%(10*60)==0)):
self.wanted_time = np.append(self.wanted_time,i)
self.n_saved += 1
elif np.logical_and(i<=24*60*60,(i%(60*60)==0)):
self.wanted_time = np.append(self.wanted_time,i)
self.n_saved += 1
elif np.logical_and(i<=2*24*60*60,(i%(2*60*60)==0)):
self.wanted_time = np.append(self.wanted_time,i)
self.n_saved += 1
elif (i%(3*60*60)==0):
self.wanted_time = np.append(self.wanted_time,i)
self.n_saved += 1
self.wanted_time = np.append(self.wanted_time,0)
self.n_t = math.floor(self.time_goal/self.dt_min)+1
self.count_initial = 1
self.count_pre = self.count_initial-1
n_b = self.n_b
self.random_IP = 1 # number of cells considered for the radius of random position
## Parameters of vertical motion (ADD, temp)
self.DZ = 0.1
# Update of the dataframe
Z_param = pd.DataFrame(data=None,columns=COLUMN_Z_PARAM,dtype=np.int32)
Z_param.U_z = 0*np.ones((n_b)) #0 = U constant on depth, 1 = U varies with the depth (log law)
d_50 = 2*2*40 *10**-3 #to be confirmed
Z_param.z_0 = d_50/30*np.ones((n_b)) #experimental results of Nikuradse (not found if published in 1933 or 1950 but nobody seems to care)
Z_param.mu_stat = 1 * np.ones((n_b)) #rnd.beta(1,1,size=(n_b))*(1-0.3)+0.3
Z_param.Time_float = 0*np.ones((n_b))
Z_param.T_w = self.T_w*np.ones((n_b))
Z_param.ADD = self.time_goal/60/60/24*self.T_w
Z_param.ADD_resurface = 5250/self.T_w * rnd.beta(4,4,size=n_b) #source: Heaton 2011 considering a TADS between 14 and 15 as maximum expension
self.Z_param = Z_param
[docs]
def from_dot_param_to_dictionnary(self,store_dir: Path = None):
"""
Update the parameters with the modifications made by the user with the file parameters.param
:param store_dir: directory where the file parameters.param is
"""
# Charger le dictionnaire existant
param_dict = self.param_dict
data = {}
text_file_path = join(store_dir,"parameters.param")
if not os.path.exists(text_file_path):
raise FileNotFoundError(f"Le fichier {text_file_path} est introuvable.")
with open(text_file_path, 'r', encoding='ISO-8859-1') as file:
for line in file:
line = line.strip()
# Vérifier si la ligne est un nom de section (par exemple, 'Options:', 'Path:', etc.)
if line.endswith(":"):
# Crée une nouvelle sous-section
current_section = line[:-1] # Retire le ':' pour obtenir le nom de section
data[current_section] = {}
elif "\t" in line and current_section:
# Split clé et valeur
key, value = line.split("\t", 1)
# Convertir la valeur en nombre si possible
try:
value = float(value)
except ValueError:
pass # Garde la valeur comme chaîne de caractères si non convertible
# Ajout de la clé et de la valeur dans la section actuelle
data[current_section][key] = value
# Mettre à jour self.param_dict avec les valeurs de data
for section, params in data.items():
if section in param_dict:
for key, value in params.items():
# Rechercher la clé dans le dictionnaire existant
for param_key, param_data in param_dict[section].items():
if param_data["explicit name"] == key:
# Mettre à jour la valeur
param_dict[section][param_key]["value"] = value
break
# Sauvegarder le dictionnaire mis à jour dans self.param_dict
self.param_dict = param_dict
return
[docs]
def Human_generation(self):
"""
Generates the bodies for the simulation
:return Human
Attributes:
Human : Dataframe panda with each line representing a body and n_b lines, so one for eahc body
gender : Gender of the victim, 1 for man, 2 for women
Age : Age of the victim in years
height : Height of the victim in m
mass : Mass of the victim in kg
BMI : BMI of the victim in kg/m²
clothes : clothing type of the victim (0 for naked, 1 for summer clothes, 2 for spring clothes, 3 for winter clothes)
CAM : Added mass coefficient of the body
CDA : Drag area of the body (drag coefficient * a reference area)
CLA : Lift area of the body
CSA : Side area of the body
fp_x : Projection coefficient along the x-axis to go from the BSA to the frontal area
fp_y : Projection coefficient along the y-axis to go from the BSA to the frontal area
fp_z : Projection coefficient along the z-axis to go from the BSA to the frontal area
lungs_volume_TLC : Lungs volume at Total Lungs Capacity
lungs_volume_FRC : Lungs volume at Functionnal Residual Capacity (at rest, after normally expiring)
dm : Amount of swallowed water
BSA : Body surface area (i.e. surface of the skin)
Death : Type of death
eps : Width of the body
V_clothes_o : Initial volume of clothes (according to Barwood et al., 2011)
V_clothes_one : Volume of clothes after 20min at rest (according to Barwood et al., 2011)
V_clothes_two : Volume of clothes after 20min of swimming (according to Barwood et al., 2011)
error_perc_fat : Deviation to the average on the percentage body fat of the body calculated from the equation of Siri et al. 1960
"""
Human = pd.DataFrame(data=None,columns=COLUMNS_HUMAN,dtype=np.int32)
n_b = self.n_b
##Gender
Human.gender = self.gender * np.ones((n_b))
if self.gender == -1:
Human.gender = np.zeros(n_b)
Human.gender[:n_b // 2] = 2
Human.gender[n_b // 2:] = 1
ind_m = np.where(Human.gender==1)[0]
ind_w = np.where(Human.gender==2)[0]
##Age
Human.Age = self.Age * np.ones((n_b))
if self.Age==-1:
age_min = 18
age_max = 90 + 1
Human.Age = rnd.randint(age_min,age_max,size=(n_b))
#Height
h_av = self.height
h_max = np.array([205, 190]) /100
h_min = np.array([150, 140]) /100
[ah_w,bh_w] = known_1(2,h_min[1],h_max[1],h_av-0.025,h_av+0.025,0.1,0.9)
[ah_m,bh_m] = known_1(1,h_min[0],h_max[0],h_av-0.025,h_av+0.025,0.1,0.9)
Human.loc[ind_m,'height'] = rnd.beta(ah_m,bh_m,size=(len(ind_m)))*(h_max[0]-h_min[0])+h_min[0] #men
Human.loc[ind_w,'height'] = rnd.beta(ah_w,bh_w,size=(len(ind_w)))*(h_max[1]-h_min[1])+h_min[1] #women
if h_av == -1:
Human.loc[ind_m,'height'] = rnd.beta(5.8697,6.075,size=(len(ind_m)))*(h_max[0]-h_min[0])+h_min[0] #men
Human.loc[ind_w,'height'] = rnd.beta(3.976,5.965,size=(len(ind_w)))*(h_max[1]-h_min[1])+h_min[1] #women
##Mass or BMI
m_av = self.mass
m_max = np.array([135, 130])
m_min = np.array([35, 35])
[am_w,bm_w] = known_1(2+3,m_min[1],m_max[1],m_av-2.5,m_av+2.5,0.1,0.9)
[am_m,bm_m] = known_1(1+3,m_min[0],m_max[0],m_av-2.5,m_av+2.5,0.1,0.9)
Human.loc[ind_m,'mass'] = rnd.beta(am_m,bm_m,size=((len(ind_m))))*(m_max[0] - m_min[0]) + m_min[0]
Human.loc[ind_w,'mass'] = rnd.beta(am_w,bm_w,size=((len(ind_w))))*(m_max[1] - m_min[1]) + m_min[1]
Human.BMI = Human.mass / Human.height**2
known = 1
if m_av < 0:
BMI = self.BMI
BMI_min = 16
BMI_max = 40
BMI_25 = [20, 21.3, 22.5, 23.3, 22.9, 23.7, 23.1]
BMI_50 = [21.7, 23.4, 24.8, 25.7, 25.9, 26.3, 25.3]
BMI_75 = [24.3, 26.4, 28, 29, 29.1, 29.7, 28]
ind_Age = np.minimum(math.floor(np.mean(Human.Age.to_numpy())/10)-1,6)
if BMI > 0:
BMI_down = BMI-1
BMI_up = BMI+1
[abmi,bbmi] = known_1(3,BMI_min,BMI_max,BMI_down,BMI_up,0.1,0.9)
Human.BMI = rnd.beta(abmi,bbmi,size=((n_b)))*(BMI_max-BMI_min)+BMI_min
else:
[abmi,bbmi] = known_1(3,BMI_min,BMI_max,BMI_25[ind_Age],BMI_75[ind_Age],0.25,0.75)
Human.BMI = rnd.beta(abmi,bbmi,size=((n_b)))*(BMI_max-BMI_min)+BMI_min
known = 0
Human.mass = Human.BMI*Human.height**2
##Clothes
clothes = self.clothes #simpledialog.askinteger('Dialog title','Clothing type with: \n-1 if unknown \n0 for naked or in underwear\n1 for summer clothes (short and a t-shirt or dress)\n2 for autumn/spring clothes (trousers, a t-shirt, a sweater, and eventually a water/windproof jacket)\n3 for winter clothes (trousers, a t-shirt, a sweater, and a heavy warm jacket or more clothes)',minvalue=-1,maxvalue=3,parent=root) #0 for naked, 1 for summer clothes, 2 for autumn/spring clothes, 3 for winter clothes
if clothes==-1:
clothes=2
Human,error_perc_fat = Skinfold(n_b,known,Human)
Human.CAM = ((2-Human.gender)*0.268+(Human.gender-1)*0.236) *np.ones((n_b)) #according Caspersen et al., 2010: Added mass in human swimmers
#CD, CL and CS fitted to the results of tests realised in the wind tunnel on 28/02/2023, tight clothing = no clothes or summer clothes
Human.CDA = 0.4181*np.ones(n_b)*(clothes<2) + 0.5172*np.ones(n_b)*(clothes>=2)
Human.CLA = 0.07019*np.ones(n_b)*(clothes<2) + 0.08387*np.ones(n_b)*(clothes>=2)
Human.CSA = 0.03554*np.ones(n_b)*(clothes<2) + 0.04047*np.ones(n_b)*(clothes>=2)
# Human.CDA = rnd.normal(0.4181,0.03434,n_b)*(clothes<2) + rnd.normal(0.5172,0.0406,n_b)*(clothes>=2)
# Human.CLA = rnd.normal(0.07019,0.05035,n_b)*(clothes<2) + rnd.normal(0.08387,0.08138,n_b)*(clothes>=2)
# Human.CSA = rnd.normal(0.03554,0.02545,n_b)*(clothes<2) + rnd.normal(0.04047,0.04311,n_b)*(clothes>=2)
#Mandatory for the structure of the numpy variable Human generated for the calculations
Human.fp_x = rnd.beta(1,1,size=(n_b))*(0.36-0.16)+0.16
Human.fp_y = 0.36-Human.fp_x+0.16
Human.fp_z = np.ones((n_b)) * 0.2
Human.lungs_volume_TLC = 10**-3 * ((7.99*Human.height-7.08) * (2-Human.gender) + (6.6*Human.height-5.79) * (Human.gender-1)) #Formulas ERC valid for men between 1.55 and 1.95 m and women between 1.45 and 1.8 m
Human.lungs_volume_FRC = 10**-3 * ((2.34*Human.height+0.01*Human.Age-1.09) * (2-Human.gender) + (2.24*Human.height+0.001*Human.Age-1) * (Human.gender-1)) #Formulas ERC valid for men between 1.55 and 1.95 m and women between 1.45 and 1.8 m
Human.lungs_volume_TLC = Human.lungs_volume_TLC * (0.0403*Human.BMI**2 - 3.1049*Human.BMI + 149.58)/100 #Digitalization of TOTAL part of figure 4 of doi:10.1136/bmjresp-2017-000231
Human.lungs_volume_FRC = Human.lungs_volume_FRC * (0.102*Human.BMI**2 - 7.4504*Human.BMI + 229.61)/100 #Digitalization of TOTAL part of figure 4 of doi:10.1136/bmjresp-2017-000231
Human.dm = Human.mass * (0.1 * rnd.beta(1.5,1.5,size=((n_b))) + 0.0) #Between x and y% of the body mass (usually around 10 according to test on dogs)
Human.dm += self.m_b_add
Human.BSA = ((128.1 * Human.mass**0.44 * (Human.height*100)**0.6) * (2-Human.gender) + (147.4 * Human.mass**0.47 * (Human.height*100)**0.55) * (Human.gender-1))*10**-4
Human.Death = np.ones(n_b)
Human.eps = np.ones(n_b)*0.2
clothes_alpha_m = np.array([[1,0.1771,0.0192,0.0082],[1,5.7342e-6,0.0253,0.8511],[1,5.9719e-6,0.333,4.981e-6]])
clothes_alpha_w = np.array([[1,0.7676,4.4037,0.8523],[1,0.5375,0.3333,3.5128],[1,0.5375,0.7082,2.333]])
clothes_beta_m = np.array([[1,0.3542,0.0385,0.0095],[1,9.9565e-6,0.0505,1.7022],[1,1.1522e-5,0.667,8.7348e-6]])
clothes_beta_w = np.array([[1,1.5353,8.8072,1.7046],[1,1.0749,0.6667,7.0255],[1,1.0749,1.4165,4.6659]])
clothes_mean_m = np.array([[0.5,0.6127,2.7573,4.3912],[0.5,0.204,0.715,1.021],[0.5,0.408,1.1233,0.613]])*10**-3
clothes_mean_w = np.array([[0.5,1.123,2.655,4.493],[0.5,0.9191,1.1233,1.94],[0.5,0.9191,1.2254,2.0424]])*10**-3
clothes_std_m = np.array([[0.5,0.7148,0.817,0.6127],[0.5,0.9191,1.1233,0.817],[0.5,0.817,1.1233,2.451]])*10**-3
clothes_std_w = np.array([[0.5,0.9191,1.6339,1.6339],[0.5,0.817,1.1233,1.225],[0.5,0.817,1.0212,0.817]])*10**-3
clothes_max_m = clothes_mean_m + 2*clothes_std_m
clothes_min_m = clothes_mean_m - 2*clothes_std_m
clothes_max_w = clothes_mean_w + 2*clothes_std_w
clothes_min_w = clothes_mean_w - 2*clothes_std_w
Human.V_clothes_o = (clothes!=0)* ((rnd.beta(clothes_alpha_m[0,clothes],clothes_beta_m[0,clothes],size=(n_b))*(clothes_max_m[0,clothes]-clothes_min_m[0,clothes])+clothes_min_m[0,clothes])*(2-Human.gender) + (rnd.beta(clothes_alpha_w[0,clothes],clothes_beta_w[0,clothes],size=(n_b))*(clothes_max_w[0,clothes]-clothes_min_w[0,clothes])+clothes_min_w[0,clothes])*(Human.gender-1))
Human.V_clothes_one = (clothes!=0)* ((rnd.beta(clothes_alpha_m[1,clothes],clothes_beta_m[1,clothes],size=(n_b))*(clothes_max_m[1,clothes]-clothes_min_m[1,clothes])+clothes_min_m[1,clothes])*(2-Human.gender) + (rnd.beta(clothes_alpha_w[1,clothes],clothes_beta_w[1,clothes],size=(n_b))*(clothes_max_w[1,clothes]-clothes_min_w[1,clothes])+clothes_min_w[1,clothes])*(Human.gender-1))
Human.V_clothes_two = (clothes!=0)* ((rnd.beta(clothes_alpha_m[2,clothes],clothes_beta_m[2,clothes],size=(n_b))*(clothes_max_m[2,clothes]-clothes_min_m[2,clothes])+clothes_min_m[2,clothes])*(2-Human.gender) + (rnd.beta(clothes_alpha_w[2,clothes],clothes_beta_w[2,clothes],size=(n_b))*(clothes_max_w[2,clothes]-clothes_min_w[2,clothes])+clothes_min_w[2,clothes])*(Human.gender-1))
Human.error_perc_fat = error_perc_fat
self.Human = Human
[docs]
def Initialisation_arrays(self):
"""
Function where the matrixes of body position, speed, time, resurfacing and sinking are initialised, both for computing and saving
Initialisation of other variables used in the simulation
Attributes:
BC_cells : Array containing the index of all cells that are boundary conditions for the hydrodynamic simulation
DT_WOLF : Time step of the WOLF simulation
NbX : Number of cells in the x-direction for the WOLF simulation
NbY : Number of cells in the y-direction for the WOLF simulation
ini_drowning : Hour at which the victim fell in the water
count_Wolf : Time step of the Wolf simulation that we consider as our initial time in the Lagrangian simulation
wanted_Wolf : Array containing all the times at which we have a new Wolf result
Delta : Array containing the spatial and time steps
Pos : Working array containing the 3D positions of all bodies at time t and t-dt with shape (n_b,3,2)
Pos_b : Saving array containing the 3D positions of all bodies at all saving times with shape (n_b,3,n_t)
U : Working array containing the 3D velocities of all bodies at time t and t-dt with shape (n_b,3,2)
U_b : Saving array containing the 3D velocities of all bodies at all saving times with shape (n_b,3,n_t)
time : Working array containing the time associated to each body with shape (n_b,)
resurface : Saving array containing the resurfacing time of all bodies with shape (n_b,)
sinking : Saving array containing the sinking time of all bodies with shape (n_b,)
count : Counter to evaluate the progression of the savings
sp : Parameter deserving to work with the working variables
"""
self.Human_generation()
n_b = self.n_b
n_saved = self.n_saved
self.BC_cells,self.DT_WOLF,DX,DY,H_mat,self.NbX,self.NbY,t_tot_Wolf = Read_Wolf_GPU_metadata(self.Path_Wolf)
X = np.arange(0,DX*self.NbX,DX)+DX/2
Y = np.arange(0,DY*self.NbY,DY)+DY/2
self.count_Wolf = self.ini_drowning -1
self.wanted_Wolf = np.arange(0,t_tot_Wolf+self.DT_WOLF,self.DT_WOLF)
Delta = np.zeros((5))
Delta[0] = DX
Delta[1] = DY
Delta[2] = 1 #DZ
Delta[3] = self.dt_max
Delta[4] = np.sqrt(DX**2+DY**2)
self.Delta = Delta
ind_pos_0_x = self.ind_pos_0_x
ind_pos_0_y = self.ind_pos_0_y
NbZ = H_mat[ind_pos_0_y,ind_pos_0_x]/Delta[2]
ind_pos_0_z = NbZ.astype(int)
index_b = np.zeros((n_b,3,n_saved)) #number of the body,(xyz) index in the matrix, time step
index_b = index_b.astype(int)
Pos_b = np.zeros((n_b,3,n_saved)) #number of the body,(xyz), time step
U_b = np.zeros((n_b,3,n_saved)) #number of the body,(xyz), time step
self.time_b = np.zeros((n_b,n_saved))
index_b[:,0,0] = np.ones((n_b)) * ind_pos_0_x
index_b[:,1,0] = np.ones((n_b)) * ind_pos_0_y
index_b[:,2,0] = np.zeros((n_b))
Pos_b[:,0,0] = np.ones((n_b)) * X[ind_pos_0_x]
Pos_b[:,1,0] = np.ones((n_b)) * Y[ind_pos_0_y]
## Calculation of horizontal and vertical body motion
Pos = np.zeros((n_b,3,2))
U = np.zeros((n_b,3,2))
# Generation of uncertainty on the drownin point
if self.loading == 0:
rand_x = DX* rnd.uniform(size=(n_b))*np.sign(rnd.uniform(size=(n_b))-1/2)*self.random_IP
rand_y = DY* rnd.uniform(size=(n_b))*np.sign(rnd.uniform(size=(n_b))-1/2)*self.random_IP
Pos[:,0,0] = X[int(index_b[0,0,0])]+rand_x
Pos[:,1,0] = Y[int(index_b[0,1,0])]+rand_y
Pos[:,2,0] = H_mat[int(index_b[0,1,0]),int(index_b[0,0,0])]
Pos[:,0,1] = X[int(index_b[0,0,0])]+rand_x
Pos[:,1,1] = Y[int(index_b[0,1,0])]+rand_y
Pos[:,2,1] = H_mat[int(index_b[0,1,0]),int(index_b[0,0,0])]
Pos_b[:,:,0] = Pos[:,:,0]
else:
self.count_initial,self.Human,n_loaded,Pos_b,self.time_b,U_b,self.Z_param = Loading(self.Path_loading,Pos_b,self.time_b,U_b)
Pos[:,0,0] = Pos_b[:,0,n_loaded]
Pos[:,1,0] = Pos_b[:,1,n_loaded]
Pos[:,2,0] = Pos_b[:,2,n_loaded]
Pos[:,0,1] = Pos_b[:,0,n_loaded]
Pos[:,1,1] = Pos_b[:,1,n_loaded]
Pos[:,2,1] = Pos_b[:,2,n_loaded]
U[:,0,0] = U_b[:,0,n_loaded]
U[:,1,0] = U_b[:,1,n_loaded]
U[:,2,0] = U_b[:,2,n_loaded]
U[:,0,1] = U_b[:,0,n_loaded]
U[:,1,1] = U_b[:,1,n_loaded]
U[:,2,1] = U_b[:,2,n_loaded]
self.Pos = Pos
self.Pos_b = Pos_b
self.U = U
self.U_b = U_b
self.time = self.t_initial*np.ones((n_b))
self.resurface = np.zeros((n_b,2))
self.sinking = np.zeros((n_b,2))
self.count = self.count_initial
self.sp = 1
[docs]
def start(self):
"""
Main of the class, runs the code with a parallelised code (n_parallel>1) or without
"""
start = timeit.default_timer()
self.Initialisation_arrays()
if self.Profile_this ==1:
profiler = cProfile.Profile()
profiler.enable()
# Conversion of dataframe to numpy array for parallel processing and memory efficiency
Human_np = self.Human.to_numpy()
Z_param_np = self.Z_param.to_numpy()
# Multiprocess run
if self.n_parallel>1:
# Set up progress queue for inter-process communication
with multiprocessing.Manager() as manager:
progress_queue = manager.Queue()
stop_event = threading.Event() # Indicateur pour arrêter le thread
# Start wxPython application and create the frame
app = wx.App(False)
frame = wx.Frame(None)
if self.image==1:
frame = ProgressImage(self.n_parallel,self.time_goal, None)
else:
frame = ProgressBar(None,n_processes=self.n_parallel,total=self.time_goal)
frame.Show()
tasks = Preparation_parallelisation(progress_queue,self.a_RK,self.BC_cells,self.count,self.count_Wolf,self.CFL,self.Delta,Human_np,self.i_initial,self.n_b,self.n_saved,self.n_parallel,self.n_t,self.NbX,self.NbY,self.Path_saving,self.Path_Wolf,self.Pos,self.Pos_b,self.resurface,self.sinking,self.time,self.time_b,self.time_goal,self.U,self.U_b,self.wanted_time,self.wanted_Wolf,Z_param_np)
# Création de la thread de suivi de la progression
time_viewer = 1
monitor_thread = threading.Thread(target=state_of_run, args=(progress_queue,frame, time_viewer))
monitor_thread.daemon = True # Ensure it terminates when the program exits
monitor_thread.start()
with multiprocessing.Pool(processes=self.n_parallel) as pool:
result_async = pool.map_async(Parallel_loop, tasks)
while not result_async.ready():
wx.Yield() # This allows the GUI to update while waiting for the results
# Wait for the result to be ready
results = result_async.get()
def on_close(event):
frame.Close() # Ferme la fenêtre
app.ExitMainLoop()
frame.Bind(wx.EVT_CLOSE, on_close)
frame.Close()
stop_event.set() # Stop the monitoring thread
self.Pos_b = np.concatenate([result[0] for result in results], axis=0)
self.resurface = np.concatenate([result[1] for result in results], axis=0)
self.sinking = np.concatenate([result[2] for result in results], axis=0)
self.time_b = np.concatenate([result[3] for result in results], axis=0)
self.U_b = np.concatenate([result[4] for result in results], axis=0)
# No use of multiprocessing
else:
self.Pos_b,self.resurface,self.sinking,self.time_b,self.U_b = Loop_management(-1,-1,self.a_RK,self.BC_cells,self.count,self.count_Wolf,self.CFL,self.Delta,Human_np,self.i_initial,self.n_b,self.n_saved,self.n_t,self.NbX,self.NbY,self.Path_saving,self.Path_Wolf,self.Pos,self.Pos_b,self.resurface,self.sinking,self.time,self.time_b,self.time_goal,self.U,self.U_b,self.wanted_time,self.wanted_Wolf,Z_param_np)
stop = timeit.default_timer()
execution_time = stop - start
n_b = self.n_b
time_goal = self.time_goal
# Save of the results
Path_save = os.path.join(self.Path_saving,'Results')
os.makedirs(Path_save,exist_ok=True)
np.savez(Path_save,Pos_b=self.Pos_b,U_b=self.U_b,Human=self.Human,Z_param=self.Z_param,wanted_time=self.wanted_time,time_b=self.time_b)
logging.info(f"Program executed in "+str(round(execution_time/60,1))+" min, for "+str(n_b)+" bodies and "+str(np.floor(time_goal/(60*60*24)))+" days "+str(int((time_goal/60/60-24*int(time_goal/60/60/24))))+" h "+str((np.floor((time_goal/60)%60)))+" min "+str(time_goal%60)+ " s")
if self.Profile_this ==1:
profiler.disable()
stats = pstats.Stats(profiler).sort_stats('ncalls')
stats.sort_stats('tottime')
stats.print_stats()
if self.saving == 1:
np.savez(self.Path_saving,Pos_b=self.Pos_b,U_b=self.U_b,Human=self.Human,Z_param=self.Z_param,wanted_time=self.wanted_time,time_b=self.time_b)
[docs]
def Parallel_loop(args):
"""
Necessary for the parallelisation as we have to give a list of arguments to the function instead of all the args separately
"""
result = Loop_management(*args)
return result
[docs]
class Drowning_victim_Viewer(Element_To_Draw):
def __init__(self, idx = '', plotted = True, mapviewer = None, need_for_wx = False,filedir = None):
super().__init__(idx, plotted, mapviewer, need_for_wx)
[docs]
self.file_drowning = None
self.init_plot()
[docs]
self.newdrowning = Drowning_victim(Path_dir=filedir)
self.from_dictionnary_to_wp()
[docs]
def selection_drowning_point(self,event):
"""
Function to select the drowning point in the viewer
"""
from ..PyDraw import draw_type
liste = self.mapviewer.get_list_keys(draw_type.RES2D,checked_state=None)
if not liste:
dialog = wx.DirDialog(None, "Folder containing the wanted simulation WOLF 2D GPU", style=wx.DD_DEFAULT_STYLE)
# Afficher la boîte de dialogue et attendre l'interaction de l'utilisateur
if dialog.ShowModal() == wx.ID_OK:
# Récupérer le chemin sélectionné
self.newdrowning.Path_Wolf = dialog.GetPath()
self.wp.change_param('Paths','Results of Wolf GPU simulation path',self.newdrowning.Path_Wolf)
# Ajouter l'objet avec le chemin sélectionné
self.mapviewer.add_object(which='res2d_gpu', ToCheck=True, filename=join(self.newdrowning.Path_Wolf, 'Results'))
self.mapviewer.menu_wolf2d()
self.mapviewer.menu_2dgpu()
self.mapviewer.Autoscale()
dialog.Destroy()
else:
# L'utilisateur a annulé la boîte de dialogue
logging.info(_('No folder selected for the WOLF 2D GPU simulation.'))
# Détruire la boîte de dialogue pour libérer les ressources
dialog.Destroy()
return
else:
myitem = self.mapviewer.single_choice_key(draw_type.RES2D,checked_state=None)
# nameitem = self.mapviewer.treelist.GetItemText(myitem).lower()
# curobj = self.mapviewer.getobj_from_id(nameitem)
# myobj = self.mapviewer.treelist.GetItemData(myitem)
# self.mapviewer.active_res2d = myobj
self.wp.change_param('Grid','Origin X',self.mapviewer.active_res2d.origx)
self.wp.change_param('Grid','Origin Y',self.mapviewer.active_res2d.origy)
with open(join(self.newdrowning.Path_Wolf,'parameters.json'), 'r', encoding='utf-8') as file:
data = json.load(file)
dx = data['parameters']['dx']
dy = data['parameters']['dy']
nbx = data['parameters']['nbx']
nby = data['parameters']['nby']
self.wp.change_param('Grid','Delta X',dx)
self.wp.change_param('Grid','Delta Y',dy)
self.wp.change_param('Grid','Nb X',nbx)
self.wp.change_param('Grid','Nb Y',nby)
self.button_selection_progress = wx.Button(self.wp,label='Drowning point')
self.button_selection_progress.Bind(wx.EVT_BUTTON,self.selection_progress)
self.button_selection_progress.SetToolTip('Check if you have exactly one drowning point selected')
self.wp.sizerbut.Insert(4,self.button_selection_progress,1,wx.EXPAND)
self.wp.sizer.Fit(self.wp)
self.wp.SetSize(0,0,self.w,800)
self.wp.Show(True)
[docs]
def selection_progress(self,event):
"""
Function to select the drowning point in the viewer
"""
if self.mapviewer.active_res2d.SelectionData.nb==0:
wx.MessageBox(_("No point selected, please select a drowning point"), "Error", wx.OK | wx.ICON_ERROR)
return
elif self.mapviewer.active_res2d.SelectionData.nb==1:
value_got = self.mapviewer.active_res2d.myblocks[getkeyblock(0)].SelectionData.myselection
x,y = value_got[0]
self.newdrowning.ind_pos_0_x, self.newdrowning.ind_pos_0_y = self.mapviewer.active_res2d.get_ij_from_xy(x=x,y=y)
self.update_drowning_pos()
self.mapviewer.active_res2d.SelectionData.reset_all()
self.button_selection_progress.SetBackgroundColour(wx.Colour(50, 190, 50))
self.file_drowning = 1
return
elif self.mapviewer.active_res2d.SelectionData.nb>1:
wx.MessageBox(_("More than one point selected, please select only one drowning point"), "Error", wx.OK | wx.ICON_ERROR)
return
[docs]
def update_drowning_pos(self):
"""
Update the values of "X-cell" and "Y-cell" in the parameters.param file.
"""
self.wp.change_param("Initial_drowning_point",'X-cell', int(self.newdrowning.ind_pos_0_x))
self.wp.change_param("Initial_drowning_point",'Y-cell', int(self.newdrowning.ind_pos_0_y))
[docs]
def create_exe_file(self,event):
"""
Start the drowning in a separate process
"""
import subprocess
try:
if self.filedir is None:
wx.MessageBox(_("No directory selected for the simulation. \nPlease, save your drowning in a directory."), "Error", wx.OK | wx.ICON_ERROR)
return
self.filedir = Path(self.filedir)
# Créer le répertoire self.Path_saving s'il n'existe pas
self.filedir.mkdir(parents=True, exist_ok=True)
# Définir le chemin du fichier exe_drowning.py
self.exe_file = self.filedir / "exe_drowning.py"
project_root = Path(__file__).resolve().parents[2]
# Contenu du fichier exe_drowning.py
script_content = f"""
import sys
import os
from pathlib import Path
directory = r"{project_root}"
os.chdir(directory)
_root_dir = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0,os.path.join(directory,'./wolfhece'))
try:
from .Drowning_victims.Class import Drowning_victim
except:
from Drowning_victims.Class import Drowning_victim
if __name__ == "__main__":
# Définir le chemin de sauvegarde
Path_saving = r"{self.filedir}"
# Exécuter la simulation
newdrowning = Drowning_victim(Path_dir=Path_saving)
newdrowning.start()
"""
# Créer et écrire le fichier exe_drowning.py
with open(self.exe_file, "w", encoding="utf-8") as file:
file.write(script_content)
logging.info(f"Drowning simulation file created at: {self.exe_file}")
except subprocess.CalledProcessError as e:
logging.error(f"Error while creating the drowning simulation file: {e}")
[docs]
def run_code(self,event):
if self.file_drowning is not None:
import subprocess
try:
self.newdrowning.start()
logging.info(_("Drowning simulation done."))
except subprocess.CalledProcessError as e:
logging.error(f"Error while running the drowning simulation file: {e}")
# process = multiprocessing.Process(target=self.__main__())
# process.start() # Démarre code2 dans un processus distinct
else:
logging.error(('No drowning point selected, select one before starting the simulation'))
[docs]
def show_properties(self):
self.w=800
self.wp._set_gui(title='Parameters for the drowning simulation', toShow=True, w=self.w)
self.wp.hide_selected_buttons([Buttons.Reload,Buttons.Save])
select_button = wx.Button(self.wp,id=10,label="Wolf2D simulation")
select_button.SetToolTip(_("Select reference Wolf2D simulation to choose your drowning point"))
create_file_button = wx.Button(self.wp,id=11,label="Create exe file")
create_file_button.SetToolTip(_("Create the executable file to run the drowning"))
run_button = wx.Button(self.wp,id=12,label="Run")
run_button.SetToolTip(_("Run the drowning simulation \nNot recommended here"))
select_button.Bind(wx.EVT_BUTTON, self.selection_drowning_point)
create_file_button.Bind(wx.EVT_BUTTON, self.create_exe_file)
run_button.Bind(wx.EVT_BUTTON, self.run_code)
run_button.SetBackgroundColour(wx.Colour(240,160,160))
self.wp.sizerbut.Add(select_button,2,wx.EXPAND)
self.wp.sizerbut.Add(create_file_button,2,wx.EXPAND)
self.wp.sizerbut.Add(run_button,1,wx.EXPAND)
self.wp.SetSizer(self.wp.sizer)
# self.SetSize(w,h)
self.wp.SetAutoLayout(1)
self.wp.sizer.Fit(self.wp)
self.wp.SetSize(0,0,self.w,800)
self.wp.Show(True)
# self.wp.myparams = self.merge_dicts(self.wp.myparams,self.wp.myparams_default)
self.from_wp_to_dictionnary()
self.newdrowning.from_dictionnary_to_attributes()
[docs]
def from_dictionnary_to_wp(self):
"""
Return a Wolf_Param object that represents the parameters of the simulation,
directly using the attributes of the class.
"""
# Initialisation de l'objet Wolf_Param
wp = Wolf_Param(
parent=None,
title="Drift of a drowning victim",
to_read=False,
withbuttons=True,
toShow=False,
init_GUI=False,
force_even_if_same_default=True,
filename=self.filename
)
params_dict = self.newdrowning.param_dict
# Ajout des paramètres au Wolf_Param
for current_section in params_dict.keys():
for key in params_dict[current_section].keys():
value = params_dict[current_section][key]["value"]
description = params_dict[current_section][key]["description"]
name = params_dict[current_section][key]["explicit name"]
wp.add_param(
groupname=current_section,
name=name,
value=value,
type=params_dict[current_section][key]["type"],
whichdict="All" if params_dict[current_section][key]["mandatory"] else "Default",
jsonstr={"Values": params_dict[current_section][key]["choices"]} if params_dict[current_section][key]["choices"] else None,
comment= description
)
self.wp = wp
self.newdrowning.time_goal = self.newdrowning.Days*24*60*60 + self.newdrowning.Hours*60*60 + self.newdrowning.Minutes*60 + self.newdrowning.Seconds #s
wanted_time = np.array([self.newdrowning.t_initial])
self.newdrowning.n_saved = 1
for i in np.arange(10,self.newdrowning.time_goal+10,10):
if i<60:
wanted_time = np.append(wanted_time,i)
self.newdrowning.n_saved += 1
elif np.logical_and(i<10*60,(i%60==0)):
wanted_time = np.append(wanted_time,i)
self.newdrowning.n_saved += 1
elif np.logical_and(i<30*60,(i%(5*60)==0)):
wanted_time = np.append(wanted_time,i)
self.newdrowning.n_saved += 1
elif np.logical_and(i<60*60,(i%(10*60)==0)):
wanted_time = np.append(wanted_time,i)
self.newdrowning.n_saved += 1
elif np.logical_and(i<=24*60*60,(i%(60*60)==0)):
wanted_time = np.append(wanted_time,i)
self.newdrowning.n_saved += 1
elif np.logical_and(i<=2*24*60*60,(i%(2*60*60)==0)):
wanted_time = np.append(wanted_time,i)
self.newdrowning.n_saved += 1
elif (i%(3*60*60)==0):
wanted_time = np.append(wanted_time,i)
self.newdrowning.n_saved += 1
self.newdrowning.wanted_time = np.append(wanted_time,0)
[docs]
def from_wp_to_dictionnary(self):
"""
Compare the parameters in self.wp with self.newdrowning.param_dict and update the values
in self.newdrowning.param_dict when they differ.
"""
# Charger le dictionnaire existant
param_dict = self.newdrowning.param_dict
# Parcourir les sections et les clés de wp
for current_section in self.wp.myparams.keys():
if current_section in param_dict:
for key, wp_param in self.wp.myparams[current_section].items():
# Trouver la clé correspondante dans param_dict
for param_key, param_data in param_dict[current_section].items():
if param_data["explicit name"] == key:
# Comparer les valeurs et mettre à jour si elles diffèrent
if param_data["value"] != wp_param[key_Param.VALUE]:
param_dict[current_section][param_key]["value"] = wp_param[key_Param.VALUE]
break
# Sauvegarder le dictionnaire mis à jour dans self.newdrowning.param_dict
self.newdrowning.param_dict = param_dict
[docs]
def hide_properties(self):
"""
Hide properties window
"""
if self.wp is not None:
self.wp.Destroy()
self.wp = None
[docs]
def save(self):
'''
Save the parameters in a text file
'''
if self.filename is None:
self.saveas()
else:
self.wp.Save(self.filename)
[docs]
def saveas(self):
'''
Save the parameters in a text file
'''
fdlg = wx.DirDialog(None, "Where should the parameters be stored? File automatically named parameters", style=wx.FD_SAVE)
ret = fdlg.ShowModal()
if ret == wx.ID_OK:
self.filedir = fdlg.GetPath()
self.filename = self.filedir + "/parameters.param"
self.Path_saving = self.filedir
self.wp.change_param("Paths","Save path",self.filedir)
self.save()
fdlg.Destroy()
[docs]
def load_results(self):
"""
Load the results from the 'Results.npz' file and assign the arrays as attributes of the class.
"""
# Construire le chemin du fichier Results.npz
results_file = join(self.filedir, 'Results.npz')
# Vérifier si le fichier existe
if not os.path.exists(results_file):
logging.error(f"Le fichier {results_file} est introuvable.")
return
# Charger le fichier npz
with np.load(results_file, allow_pickle=True) as data:
# Assigner les tableaux comme attributs de la classe
self.Human = data['Human']
self.Pos_b = data['Pos_b']
self.U_b = data['U_b']
self.Z_param = data['Z_param']
self.wanted_time = data['wanted_time']
self.time_b = data['time_b']
self.Pos_b[:,0,:] += self.newdrowning.origx
self.Pos_b[:,1,:] += self.newdrowning.origy
[docs]
def init_plot(self):
self.bottom_cells = None
self.bottom_kde = None
self.vertex_bottom_run = None
self.surface_cells = None
self.surface_kde = None
self.vertex_surface_run = None
self.plot_runs = None
self.plot_cells = None
self.plot_KDE = None
[docs]
def read_oneresult(self,idx):
"""
Read one result of the simulation and update the parameters in the GUI
"""
count=0
self.time_id = idx
if self.plot_runs is not None:
self.prepare_plot_runs_positions()
count +=1
if self.plot_cells is not None:
self.prepare_plot_cells_positions()
count +=1
if self.plot_KDE is not None:
self.prepare_plot_kde()
count +=1
if count==0:
self.prepare_plot_runs_positions()
return
[docs]
def read_last_result(self):
"""
Read the last results of the simulation and update the parameters in the GUI
"""
self.time_id = -1
self.read_oneresult(idx=-1)
return
[docs]
def find_minmax(self, update=False):
"""
Generic function to find min and max spatial extent in data
example : a WolfMapViewer instance needs spatial extent to zoom or test if
element must be plotted
"""
self.xmin=999999. # spatial extension - lower left corner X
self.ymin=999999. # spatial extension - lower left corner Y
self.xmax=-999999. # spatial extension - upper right corner X
self.ymax=-999999. # spatial extension - upper right corner Y
if self.bottom_kde is not None:
[xmin, xmax], [ymin, ymax] = self.bottom_kde.get_bounds()
self.xmin = min(self.xmin, xmin)
self.xmax = max(self.xmax, xmax)
self.ymin = min(self.ymin, ymin)
self.ymax = max(self.ymax, ymax)
if self.surface_kde is not None:
[xmin, xmax], [ymin, ymax] = self.surface_kde.get_bounds()
self.xmin = min(self.xmin, xmin)
self.xmax = max(self.xmax, xmax)
self.ymin = min(self.ymin, ymin)
self.ymax = max(self.ymax, ymax)
if self.bottom_cells is not None:
[xmin, xmax], [ymin, ymax] = self.bottom_cells.get_bounds()
self.xmin = min(self.xmin, xmin)
self.xmax = max(self.xmax, xmax)
self.ymin = min(self.ymin, ymin)
self.ymax = max(self.ymax, ymax)
if self.surface_cells is not None:
[xmin, xmax], [ymin, ymax] = self.surface_cells.get_bounds()
self.xmin = min(self.xmin, xmin)
self.xmax = max(self.xmax, xmax)
self.ymin = min(self.ymin, ymin)
self.ymax = max(self.ymax, ymax)
if self.vertex_bottom_run is not None:
self.vertex_bottom_run.find_minmax(update)
self.xmin = min(self.xmin, self.vertex_bottom_run.xmin)
self.xmax = max(self.xmax, self.vertex_bottom_run.xmax)
self.ymin = min(self.ymin, self.vertex_bottom_run.ymin)
self.ymax = max(self.ymax, self.vertex_bottom_run.ymax)
if self.vertex_surface_run is not None:
self.vertex_surface_run.find_minmax(update)
self.xmin = min(self.xmin, self.vertex_surface_run.xmin)
self.xmax = max(self.xmax, self.vertex_surface_run.xmax)
self.ymin = min(self.ymin, self.vertex_surface_run.ymin)
self.ymax = max(self.ymax, self.vertex_surface_run.ymax)
pass
[docs]
def plot(self, sx=None, sy=None, xmin=None, ymin=None, xmax=None, ymax=None, size=None):
"""
Plot data in OpenGL context
"""
if self.plotted:
self.plotting = True
if self.bottom_kde is not None:
self.bottom_kde.check_plot()
self.bottom_kde.plot(sx, sy, xmin, ymin, xmax, ymax, size)
if self.surface_kde is not None:
self.surface_kde.check_plot()
self.surface_kde.plot(sx, sy, xmin, ymin, xmax, ymax, size)
if self.bottom_cells is not None:
self.bottom_cells.check_plot()
self.bottom_cells.plot(sx, sy, xmin, ymin, xmax, ymax, size)
if self.surface_cells is not None:
self.surface_cells.check_plot()
self.surface_cells.plot(sx, sy, xmin, ymin, xmax, ymax, size)
if self.vertex_bottom_run is not None:
self.vertex_bottom_run.plot()
if self.vertex_surface_run is not None:
self.vertex_surface_run.plot()
self.plotting = False
[docs]
def sort_positions_bodies(self):
time_id = self.time_id
ind_surface = np.where(self.Pos_b[:,2,time_id] > 0.2)[0]
ind_bottom = np.where(self.Pos_b[:,2,time_id] <= 0.2)[0]
if len(ind_surface) == 0:
ind_surface = [0]
if len(ind_bottom) == 0:
ind_bottom = [0]
return ind_bottom,ind_surface,time_id
[docs]
def prepare_plot_runs_positions(self):
"""
Plot the runs position on a georeferenced map with bodies in blue being at the bottom and red being at the surface.
"""
self.plot_runs = 1
ind_bottom,ind_surface,time_id = self.sort_positions_bodies()
self.vertex_bottom_run = cloud_vertices(mapviewer=self.mapviewer)
self.vertex_surface_run = cloud_vertices(mapviewer=self.mapviewer)
self.vertex_bottom_run.init_from_nparray(self.Pos_b[ind_bottom,:,time_id])
self.vertex_surface_run.init_from_nparray(self.Pos_b[ind_surface,:,time_id])
self.vertex_bottom_run.myprop.color = [40,50,250]
self.vertex_surface_run.myprop.color = [250,100,80]
self.vertex_bottom_run.myprop.alpha = 0.5
self.vertex_surface_run.myprop.alpha = 0.5
self.find_minmax(True)
return
[docs]
def reset_plot_runs_positions(self):
self.vertex_bottom_run = None
self.vertex_surface_run = None
self.plot_runs = None
[docs]
def kde_on_grid(self,points, bandwidth, xmin, xmax, ymin, ymax, grid_size):
"""
Function used to calculate the kde on a point cloud. Use a large grid size to identify peaks and a small one to refine
"""
x_grid = np.linspace(xmin, xmax, grid_size[0])
y_grid = np.linspace(ymin, ymax, grid_size[1])
X, Y = np.meshgrid(x_grid, y_grid)
sample_grid = np.vstack([X.ravel(), Y.ravel()]).T
kde = KernelDensity(bandwidth=bandwidth)
kde.fit(points)
Z = np.exp(kde.score_samples(sample_grid)).reshape(grid_size[0], grid_size[1])
return Z, x_grid, y_grid
[docs]
def detect_peaks(self,x,y,radius,num_peaks=2):
"""
Détecte les pics locaux dans une matrice 2D sans skimage.
param: x X coordinate of the points cloud
param: y Y coordinate of the points cloud
param: radius size of the grid to detect peaks
param: n_peaks number of peaks to detect
"""
x += -self.newdrowning.origx
y += -self.newdrowning.origy
dx = self.newdrowning.dx
dy = self.newdrowning.dy
ij = np.array([np.int32(x/radius), np.int32(y/radius)]).T
unique_positions, counts = np.unique(ij,axis=0, return_counts=True)
ind_peaks = []
selected_mask = np.zeros(len(counts), dtype=bool) # pour marquer les indices déjà exclus
while True:
# On masque les indices déjà exclus
valid_indices = np.where(~selected_mask)[0]
if len(valid_indices) == 0:
break
# Trouver le max parmi les valides
idx_max = valid_indices[np.argmax(counts[valid_indices])]
ind_peaks.append(idx_max)
# Marquer les indices trop proches pour les exclure ensuite
pos_max = unique_positions[idx_max]
i_diff = np.abs(unique_positions[:, 0] - pos_max[0])
j_diff = np.abs(unique_positions[:, 1] - pos_max[1])
too_close = (i_diff <= 0) & (j_diff <= 0)
selected_mask |= too_close # mettre à jour le masque d'exclusion
if len(ind_peaks) >= num_peaks:
break
x_peaks = (unique_positions[ind_peaks,0]*radius) + radius/2 + self.newdrowning.origx
y_peaks = (unique_positions[ind_peaks,1]*radius) + radius/2 + self.newdrowning.origy
selected_peaks = np.zeros((len(ind_peaks),2))
selected_peaks[:,0] = x_peaks
selected_peaks[:,1] = y_peaks
return selected_peaks
[docs]
def kde_refined_based_coarse(self, points, wolfarray, bandwidth=50,
coarse_grid_size=50, fine_grid_size=5,
window_size=200, radius=150, n_peaks=3):
"""
Optimisation à 2 étages : détection des pics sur grille grossière puis raffinement local.
Returns:
- refined_peaks : coordonnées des pics raffinés
- clusters : liste de points pour chaque cluster
- coords : coordonnées (x, y) de chaque maille dans les zones raffinées
- values : valeur KDE associée à chaque maille
"""
array = wolfarray.array[:,:]
x_min, y_min = points.min(axis=0)
x_max, y_max = points.max(axis=0)
# 1. Déduction de dx, dy depuis array
ny, nx = array.shape
dx = (x_max - x_min) / nx
dy = (y_max - y_min) / ny
# 3. Préparation des résultats
all_indices = []
all_values = []
ij = wolfarray.get_ij_from_xy_array(points)
unique_positions, counts = np.unique(ij,axis=0, return_counts=True)
delta = radius/self.newdrowning.dx
ind_peaks = []
selected_mask = np.zeros(len(counts), dtype=bool) # pour marquer les indices déjà exclus
while True:
# On masque les indices déjà exclus
valid_indices = np.where(~selected_mask)[0]
if len(valid_indices) == 0:
break
# Trouver le max parmi les valides
idx_max = valid_indices[np.argmax(counts[valid_indices])]
ind_peaks.append(idx_max)
# Marquer les indices trop proches pour les exclure ensuite
pos_max = unique_positions[idx_max]
i_diff = np.abs(unique_positions[:, 0] - pos_max[0])
j_diff = np.abs(unique_positions[:, 1] - pos_max[1])
too_close = (i_diff <= delta) & (j_diff <= delta)
selected_mask |= too_close # mettre à jour le masque d'exclusion
if len(ind_peaks) >= n_peaks:
break
for ind_peak in ind_peaks:
x0 = int((unique_positions[ind_peak,0]-delta)*dx+x_min)
x1 = int((unique_positions[ind_peak,0]+delta)*dx+x_min)
y0 = int((unique_positions[ind_peak,1]-delta)*dy+y_min)
y1 = int((unique_positions[ind_peak,1]+delta)*dy+y_min)
grid_nx = max(2, int((x1 - x0) / dx))
grid_ny = max(2, int((y1 - y0) / dy))
Z_fine, xg_fine, yg_fine = self.kde_on_grid(
points, bandwidth, x0, x1, y0, y1, grid_size=(grid_nx, grid_ny)
)
# Z_fine, xg_fine, yg_fine = self.kde_on_grid(
# local_points, bandwidth, x0, x1, y0, y1, grid_size=grid
# )
# Coordonnées physiques -> indices dans `array`
Y, X = np.meshgrid(yg_fine, xg_fine, indexing='ij')
x_flat = X.ravel()
y_flat = Y.ravel()
i = np.floor((y_flat - y_min) / dy).astype(int)
j = np.floor((x_flat - x_min) / dx).astype(int)
# Filtrage : indices valides dans array
valid = (i >= 0) & (i < ny) & (j >= 0) & (j < nx)
indices = np.stack((i[valid], j[valid]), axis=1) # shape (n, 2)
values = Z_fine.ravel()[valid]
all_indices.append(indices)
all_values.append(values)
# Empilement final
coords_array = np.vstack(all_indices) if all_indices else np.empty((0, 2), dtype=int)
values_array = np.concatenate(all_values) if all_values else np.array([])
return coords_array,values_array
[docs]
def prepare_plot_kde(self):
"""
Plot the kernel density estimation of positions on a georeferenced map with bodies in blue being at the bottom and red being at the surface.
"""
self.plot_KDE = 1
self.n_peaks = 2
ind_bottom,ind_surface,time_id = self.sort_positions_bodies()
head = header_wolf()
head.set_origin(self.newdrowning.origx, self.newdrowning.origy)
head.set_resolution(self.newdrowning.dx, self.newdrowning.dy)
head.nbx, head.nby = self.newdrowning.nbx, self.newdrowning.nby
head.set_translation(0., 0.)
self.bottom_kde = WolfArray(mapviewer=self.mapviewer, srcheader= head, nullvalue= 0.)
self.surface_kde = WolfArray(mapviewer=self.mapviewer, srcheader= head, nullvalue= 0.)
for locarray, locind in zip([self.bottom_kde, self.surface_kde],
[ind_bottom, ind_surface]):
xy = self.Pos_b[locind,:2,time_id]
coords,values = self.kde_refined_based_coarse(xy,locarray,bandwidth=50,coarse_grid_size=200,fine_grid_size=5,window_size=50,radius=25,n_peaks=self.n_peaks)
locarray.array[:,:] = 0.
locarray.array[coords[:,1],coords[:,0]] = values
locarray.mask_data(0.)
self.bottom_kde.mypal.defaultblue_minmax(self.bottom_kde.array)
self.surface_kde.mypal.defaultred_minmax(self.surface_kde.array)
self.bottom_kde.reset_plot()
self.surface_kde.reset_plot()
return
[docs]
def reset_plot_kde(self):
self.bottom_kde = None
self.surface_kde = None
self.plot_KDE = None
[docs]
def prepare_plot_cells_positions(self):
"""
Plot the cells of the WOLF simulation, with a colorbar associated to the number of elements in the cell
"""
self.plot_cells = 1
ind_bottom,ind_surface,time_id = self.sort_positions_bodies()
head = header_wolf()
head.set_origin(self.newdrowning.origx, self.newdrowning.origy)
head.set_resolution(self.newdrowning.dx, self.newdrowning.dy)
head.nbx, head.nby = self.newdrowning.nbx, self.newdrowning.nby
head.set_translation(0., 0.)
self.bottom_cells = WolfArray(mapviewer=self.mapviewer, srcheader= head, nullvalue= 0.)
self.surface_cells = WolfArray(mapviewer=self.mapviewer, srcheader= head, nullvalue= 0.)
for locarray, locind in zip([self.bottom_cells, self.surface_cells],
[ind_bottom, ind_surface]):
# i_bottom,j_bottom = self.bottom_cells.get_ij_from_xy_array(self.Pos_b[ind_bottom,:2,time_id])
ij = locarray.get_ij_from_xy_array(self.Pos_b[locind,:2,time_id])
unique_positions, counts = np.unique(ij,axis=0, return_counts=True)
locarray.array[:,:] = 0.
locarray.array[unique_positions[:,0],unique_positions[:,1]] = counts/self.newdrowning.n_b*100
locarray.mask_data(0.)
array = self.bottom_cells.array
self.bottom_cells.mypal.nb = 2
self.bottom_cells.mypal.values = np.asarray([np.min(array), np.max(array)], dtype=np.float64)
self.bottom_cells.mypal.colors = np.asarray([[175, 200, 255, 255], [0, 0, 255, 255]], dtype=np.int32)
self.bottom_cells.mypal.colorsflt = np.asarray([[0., 0., 0., 1.], [1., 1., 1., 1.]], dtype=np.float64)
self.bottom_cells.mypal.fill_segmentdata()
array = self.surface_cells.array
self.surface_cells.mypal.nb = 2
self.surface_cells.mypal.values = np.asarray([np.min(array), np.max(array)], dtype=np.float64)
self.surface_cells.mypal.colors = np.asarray([[255, 200, 175, 255], [0, 0, 255, 255]], dtype=np.int32)
self.surface_cells.mypal.colorsflt = np.asarray([[0., 0.2, 0.6, 1.], [1., 1., 1., 1.]], dtype=np.float64)
self.surface_cells.mypal.fill_segmentdata()
self.bottom_cells.reset_plot()
self.surface_cells.reset_plot()
return
[docs]
def reset_plot_cells_positions(self):
self.bottom_cells = None
self.surface_cells = None
self.plot_cells = None
[docs]
def zoom_on_hotspots(self,memory_view):
"""
Zoom on the hotspots of the KDE
"""
delta = 150
ind_bottom,ind_surface,time_id = self.sort_positions_bodies()
xy_peaks_bottom = self.detect_peaks(self.Pos_b[ind_bottom,0,time_id], self.Pos_b[ind_bottom,1,time_id], radius=100, num_peaks=self.n_peaks)
xy_peaks_surface = self.detect_peaks(self.Pos_b[ind_surface,0,time_id], self.Pos_b[ind_surface,1,time_id], radius=100, num_peaks=self.n_peaks)
xy_peaks = np.concatenate((xy_peaks_bottom, xy_peaks_surface),axis=0)
names = ['Highest peak at the bottom','Second peak at the bottom','Highest peak at the surface','Second peak at the surface']
for locxy,locname in zip(xy_peaks,names):
xmin = locxy[0] - delta
xmax = locxy[0] + delta
ymin = locxy[1] - delta
ymax = locxy[1] + delta
memory_view.add_view(locname, self.mapviewer.canvaswidth, self.mapviewer.canvasheight, xmin, xmax, ymin, ymax)
return
[docs]
def get_bodies_characteristics(self):
"""
Plots a table of the Dataframe panda "Human" with the characteristics of each run
"""
if not isinstance(self.Human, pd.DataFrame):
Human = pd.DataFrame(self.Human, columns=COLUMNS_HUMAN)
else:
Human = self.Human
self.grid = PandasGrid(parent=self.mapviewer, id = self.idx, df=Human)
self.grid.ShowModal()
self.grid.Destroy()
return
[docs]
def get_vertical_position_proportion(self):
"""
Gives the proportion of bodies at the surface and at the bottom of the water
"""
def update_pie(time_idx):
"""Met à jour uniquement la KDE en fonction de l'index temporel sélectionné."""
time_idx = int(time_idx) # Assurer que l'indice est un entier
time_value = self.wanted_time[time_idx] # Temps sélectionné à afficher
ax.clear()
# Mise à jour du titre pour refléter le temps sélectionné
days = time_value // (24*3600)
hours = (time_value % (24*3600)) // 3600
minutes = (time_value % (3600)) // 60
seconds = time_value % 60
ax.set_title(f"Proportion of bodies at the bottom and surface after \n{days} days, {hours} hours, {minutes} minutes, {seconds} seconds")
# Obtenir les positions x, y à l'instant de temps spécifié par time_idx
z = self.Pos_b[:,2,time_idx]
surface = np.where(z > 0.2)[0]
bottom = np.where(z <= 0.2)[0]
counts = [len(surface)/self.newdrowning.n_b*100, len(bottom)/self.newdrowning.n_b*100]
labels = ['Surface', 'Bottom']
colors = [[1,0.7,0.6,1], [0.6,0.7,1,1]]
ax.pie(counts, labels=labels, colors=colors, autopct='%1.1f%%')
# Rafraîchissement du graphique
fig.canvas.draw_idle()
# Initialisation de la figure et des axes
fig, ax = plt.subplots()
time_idx_initial = 0
z = self.Pos_b[:,2,time_idx_initial]
surface = np.where(z > 0.2)[0]
bottom = np.where(z <= 0.2)[0]
counts = [len(surface)/self.newdrowning.n_b*100, len(bottom)/self.newdrowning.n_b*100]
labels = ['Surface', 'Bottom']
colors = [[1,0.7,0.6,1], [0.6,0.7,1,1]]
ax.pie(counts, labels=labels, colors=colors, autopct='%1.1f%%')
# Création du slider pour ajuster l'indice temporel
ax_slider = plt.axes([0.1, 0.1, 0.8, 0.05], facecolor='lightgoldenrodyellow')
time_slider = Slider(ax_slider, 'Time', 0, len(self.wanted_time) - 2, valinit=time_idx_initial, valstep=1)
# Mise à jour de la KDE lorsque le slider est utilisé
time_slider.on_changed(update_pie)
plt.show()
return
[docs]
class ProgressBar(wx.Frame):
"""
Creates and manages the progress frame
"""
def __init__(self,parent, n_processes,total):
super(ProgressBar, self).__init__(parent)
[docs]
self.n_processes = n_processes
# Set up the main panel and sizer
panel = wx.Panel(self)
sizer = wx.BoxSizer(wx.VERTICAL)
progress_bar = wx.Gauge(panel, range=100)
sizer.Add(progress_bar, flag=wx.EXPAND | wx.ALL, border=5)
[docs]
self.progress_bars = progress_bar
[docs]
self.progress_text = wx.StaticText(panel, label="0%")
sizer.Add(self.progress_text, flag=wx.ALIGN_CENTER | wx.TOP, border=5)
panel.SetSizer(sizer)
self.SetTitle(_("Drowning Progress"))
self.SetSize((300, 90))
[docs]
def update_progress(self, progress_dict):
"""
Update the progress bars based on the values in `progress_dict`.
"""
min_progress_value = None
for i, progress in progress_dict.items():
# Assume progress is a percentage (0 to 100)
if progress is not None:
if min_progress_value is None or progress < min_progress_value:
min_progress_value = progress
progress_percent = int(min_progress_value/self.total*100)
self.progress_bars.SetValue(progress_percent)
self.progress_text.SetLabel(f"{progress_percent}%")
self.SetTitle(f"Drowning Progress - {progress_percent}%")
[docs]
class ProgressImage(wx.Frame):
"""
Creates and manages the progress frame with a fractioned image appearance
"""
def __init__(self, n_processes, total, *args, **kw):
super(ProgressImage, self).__init__(*args, **kw)
[docs]
self.n_processes = n_processes
total_segments = 10*n_processes
[docs]
self.total_segments = total_segments
# Set up the main panel and sizer
panel = wx.Panel(self)
grid_sizer = wx.GridSizer(rows=total_segments, cols=n_processes, gap=(5, 5))
current_dir = Path(__file__).resolve().parent
# Path to your main image (set it to a suitable path)
[docs]
self.image_path = str(current_dir / "image.png")
# Load the image
image = wx.Image(str(self.image_path), wx.BITMAP_TYPE_PNG)
img_width, img_height = image.GetSize()
# Calculate segment dimensions
segment_height = img_height // total_segments
segment_width = img_width // n_processes
# Initialize a 2D list to hold each segment for each process
[docs]
self.image_segments = []
# Create a grid of segments for each process
for row in range(total_segments):
row_segments = []
for col in range(n_processes):
# Extract the specific segment for each cell in the grid
x = col * segment_width
y = row * segment_height
segment = image.GetSubImage((x, y, segment_width, segment_height))
# Create a bitmap for the segment and add it to the grid, initially hidden
segment_bitmap = wx.StaticBitmap(panel, bitmap=wx.Bitmap(segment))
segment_bitmap.Hide() # Hide initially; show as progress advances
grid_sizer.Add(segment_bitmap, flag=wx.ALIGN_CENTER)
row_segments.append(segment_bitmap)
self.image_segments.append(row_segments)
panel.SetSizer(grid_sizer)
self.SetTitle(_("Drowning Progress"))
self.SetSize((img_width + 50, img_height + 50))
[docs]
def update_progress(self, progress_dict):
"""
Update the visibility of image segments based on the progress.
"""
for process_id, progress in progress_dict.items():
if progress is not None:
# Calculate the number of segments to show based on progress
num_segments_to_show = int((progress / self.total) * (self.total_segments // self.n_processes))
# Show the segments that correspond to the current progress
for segment_id in range(num_segments_to_show):
self.image_segments[segment_id][process_id].Show()
# Refresh the layout after updating visibility
self.Layout()
self.Refresh()