Source code for wolfhece.eva.mixture_models

"""
Author: HECE - University of Liege, Pierre Archambeau
Date: 2024

Copyright (c) 2024 University of Liege. All rights reserved.

This script and its content are protected by copyright law. Unauthorized
copying or distribution of this file, via any medium, is strictly prohibited.
"""

import numpy as np
from scipy import stats
from scipy.optimize import root_scalar, minimize
from autograd import jacobian
import matplotlib.pyplot as plt

"""
Refs :
    - Mixture Model in Python:
        - http://www.awebb.info/probability/2017/05/12/quantiles-of-mixture-distributions.html
    - Expectation-Maximization :
        - https://fr.wikipedia.org/wiki/Algorithme_esp%C3%A9rance-maximisation
        - https://wildart.github.io/post/mle-mm/
        - https://wjchen.net/post/en/gmm-em-en.html#3em-algorithm-for-univariate-gmm
        - https://medium.com/@prateek.shubham.94/expectation-maximization-algorithm-7a4d1b65ca55
        - https://towardsdatascience.com/implement-expectation-maximization-em-algorithm-in-python-from-scratch-f1278d1b9137
"""

[docs] class MixtureModel(): """ Modèle de mélange """ def __init__(self,component_distributions, ps) -> None: """ Liste des distributions à mélanger et liste de coefficients (dont la somme vaut 1.) """ self._components = component_distributions self.pond = ps # Return the function that is the pdf of the mixture distribution self.pdf = lambda x: sum(component_dist.pdf(x) * p for component_dist, p in zip(component_distributions, ps)) # Internal function that is the cdf of the mixture distribution self.cdf = lambda x: sum(component_dist.cdf(x) * p for component_dist, p in zip(component_distributions, ps)) # Return the function that is the nnlf of the mixture distribution self.nnlf = lambda x: -np.log(self.pdf(x))
[docs] def ppf(self,p): """ Return the pth quantile of the mixture distribution given by the component distributions and their probabilities Defined like function enabling overloading (ex.: SeasonMixtureModel) """ # We can probably be a bit smarter about how we pick the limits lo = np.min([dist.ppf(.00001) for dist in self._components]) hi = np.max([dist.ppf(.99999) for dist in self._components]) # root_scalar must have opposite sign for function to bounds while np.sign(p-self.cdf(lo)) * np.sign(p-self.cdf(hi))>0: lo/=2 hi*=2 if lo is np.nan: lo = -10000. if hi is np.nan: hi = 10000. res = root_scalar(lambda x,p,f: p-f(x), args=(p,self.cdf), method='brenth', bracket=[lo,hi], xtol=1e-5) if res.converged: return res.root else: print(res) raise ValueError('Bad convergence of the root scalar function - Please debug !')
[docs] def plots(self,fig=None, axes=None, show=True): """ Graphique de la cdf et de la pdf """ if axes is None: fig, axes = plt.subplots(2,1) p = np.arange(.1, 1.,.001) q_all = [] ax=axes[0] for k, curdist in enumerate(self._components): q = curdist.ppf(p) q_all+=list(q) ax.plot(q,p, label='component {}'.format(k+1)) q = [self.ppf(curp) for curp in p] q_all+=q ax.plot(q,p, label='mixture model') ax.legend() ax.set_title('Cumulative probability function') ax.set_xlabel('Quantile') ax.set_ylabel('Cumulative probability') q_all = np.asarray(sorted(q_all)) ax=axes[1] for k, curdist in enumerate(self._components): p = curdist.pdf(q_all) ax.plot(q_all,p, label='component {}'.format(k+1)) p = [self.pdf(curq) for curq in q_all] ax.plot(q_all,p, label='mixture model') ax.legend() ax.set_title('Density probability function') ax.set_xlabel('Quantile') ax.set_ylabel('Probability') fig.tight_layout() if show: fig.show() return fig,axes
[docs] class SeasonMixtureModel(MixtureModel): """ Extension d'un MixtureModel pour tenir compte directement de l'adaptation de la probabilité sur base de 2 saisons sur une année Les variables et résultats des fonctions cdf et ppf s'expriment en probabilité annuelle Liaison entre période de retour annuelle (T) et probabilité cumulée/de non-dépassement (F) $ T = 1/(1-F^2) $ """ def __init__(self, component_distributions, ps) -> None: super().__init__(component_distributions, ps) self.power = 2. # Nombre d'événements sélectionnés par année # Return the function that is the pdf of the mixture distribution self._pdf = lambda x: sum(component_dist.pdf(x) * p for component_dist, p in zip(component_distributions, ps)) self.pdf = lambda x: pow(sum(component_dist.pdf(x) * p for component_dist, p in zip(component_distributions, ps)),self.power) # Return the function that is the cdf of the mixture distribution self.cdf = lambda x: pow(sum(component_dist.cdf(x) * p for component_dist, p in zip(component_distributions, ps)),self.power) # Return the function that is the nnlf of the mixture distribution self.nnlf = lambda x: -np.log(self.pdf(x))
[docs] def plots(self,fig=None, axes=None, show=True): """ Graphique de la cdf et de la pdf """ if axes is None: fig, axes = plt.subplots(2,1) p = np.arange(.1, 1.,.001) q_all = [] ax=axes[0] for k, curdist in enumerate(self._components): q = curdist.ppf(p) q_all+=list(q) ax.plot(q,p, label='component {}'.format(k+1)) q = [self.ppf(curp) for curp in p] q_all+=q ax.plot(q,p, label='mixture model') ax.legend() ax.set_title('Cumulative probability function') ax.set_xlabel('Quantile') ax.set_ylabel('Cumulative probability') q_all = np.asarray(sorted(q_all)) ax=axes[1] for k, curdist in enumerate(self._components): p = curdist.pdf(q_all) ax.plot(q_all,p, label='component {}'.format(k+1)) p = [self.pdf(curq) for curq in q_all] ax.plot(q_all,p, label='mixture model w expon') p = [self._pdf(curq) for curq in q_all] ax.plot(q_all,p, label='mixture model wo expon') ax.legend() ax.set_title('Density probability function') ax.set_xlabel('Quantile') ax.set_ylabel('Probability') fig.tight_layout() if show: fig.show() return fig,axes
[docs] def _nnlf_wgen(x0, data, weights): shape,loc,scale=x0 # pdf = stats.genextreme.pdf(data,shape, loc=loc, scale=scale) # func = np.inner(weights[pdf>0.],-np.log(pdf[pdf>0.])) lpdf = stats.genextreme.logpdf(data,shape, loc=loc, scale=scale) func = np.inner(weights,-lpdf) return func
[docs] def _nnlf_wgum(x0, data, weights): loc,scale=x0 lpdf = stats.gumbel_r.logpdf(data, loc=loc, scale=scale) func = np.inner(weights,-lpdf) return func
[docs] def _nnlf_gen(x0, data): shape,loc,scale=x0 func = -np.sum(np.log(stats.genextreme.pdf(data,shape, loc=loc, scale=scale))) return func
[docs] def fit_wgev(data, weights, shape = None, loc = None, scale= None): """Fit d'une loi GEV avec des poids attachés aux données""" if shape is None: shape=0. loc, scale = stats.gumbel_r.fit(data) res = minimize(_nnlf_wgum, (loc,scale), args=(data, weights)) loc, scale = res.x if loc<0: res = minimize(_nnlf_wgum, (1.,1.), args=(data, weights)) loc, scale = res.x res = minimize(_nnlf_wgen, (shape,loc,scale), args=(data, weights)) #, method='BFGS') newshape, newloc, newscale = res.x if abs(newshape)<1 and newloc>0.: return newshape, newloc, newscale else: return shape, loc, scale
[docs] def fit_mixture_gev(data, fig, ax, sols): """ Fit d'une loi de mélange sur base d'un algo "simple" d'Espérance-Maximisation """ A_shape=0. B_shape=0. A_loc, A_scale = stats.gumbel_r.fit(data[:int(len(data)/2)]) B_loc, B_scale = stats.gumbel_r.fit(data[int(len(data)/2):]) diff = 1. i=1 while diff>1.e-4: old = np.asarray([A_shape, A_loc, A_scale, B_shape, B_loc, B_scale]) # Pour chaque valeur de X, calculer la probabilité # sous l'hypothèse A et B p_A = stats.genextreme(A_shape, loc=A_loc, scale=A_scale).pdf(data) p_B = stats.genextreme(B_shape, loc=B_loc, scale=B_scale).pdf(data) # Calculer pour chaque valeur de X, un poids correspondant # à son degrès d'appartenance à la loi A ou B. p_total = p_A + p_B weight_A = p_A / p_total weight_B = p_B / p_total # weight_B = 1. - weight_A ax[1].clear() ax[1].plot(data, weight_A) ax[1].plot(data, weight_B) #Ajustement des paramètres A_shape, A_loc, A_scale = fit_wgev(data, weight_A, A_shape, A_loc, A_scale) B_shape, B_loc, B_scale = fit_wgev(data, weight_B, B_shape, B_loc, B_scale) ax[0].clear() ax[0].hist(data, bins=200, density=True) ax[0].plot(data, stats.genextreme(sols[0][0], loc = sols[0][1], scale = sols[0][2]).pdf(data), 'r--') ax[0].plot(data, stats.genextreme(sols[1][0], loc = sols[1][1], scale = sols[1][2]).pdf(data), 'b--') ax[0].plot(data,stats.genextreme(A_shape,loc=A_loc,scale=A_scale).pdf(data), 'r') ax[0].plot(data,stats.genextreme(B_shape,loc=B_loc,scale=B_scale).pdf(data), 'b') fig.canvas.draw() fig.canvas.flush_events() diff = np.sum(np.abs(old - np.asarray([A_shape, A_loc, A_scale, B_shape, B_loc, B_scale]))) old = [A_shape, A_loc, A_scale, B_shape, B_loc, B_scale] print(i, diff) i+=1 return (A_shape, A_loc, A_scale), (B_shape, B_loc, B_scale), (weight_A, weight_B)
[docs] def example_em(): """ voir https://dridk.me/expectation-maximisation.html """ import seaborn as sns hommes = np.random.normal(190, 10, 1000) # hommes = [171,171,173,180,190,159 ...] femmes = np.random.normal(160,5, 1000) # femmes = [145,170,145,161,139,150 ...] # sns.distplot(femmes, label="Femmes") # sns.distplot(hommes, label="Hommes") X = np.sort(np.concatenate((femmes,hommes))) # sns.distplot(X, label="mixture", color="green",) # plt.legend() # Distribution des tailles X.. (voir plus haut ) # X = [159,158, 159, 179, 189 ....] # Générer un modèle aléatoire A A_mean = np.random.randint(100,300) A_sd = np.random.randint(10,30) # Générer un modèle aléatoire B B_mean = np.random.randint(100,300) B_sd = np.random.randint(10,30) fig, ax = plt.subplots(1,1) # Faite 50 itérations... ( ca suffira) for i in range(50): # Pour chaque valeur de X, calculer la probabilité # sous l'hypothèse A et B p_A = stats.norm(loc=A_mean, scale=A_sd).pdf(X) p_B = stats.norm(loc=B_mean, scale=B_sd).pdf(X) # Calculer pour chaque valeur de X, un poids correspondant # à son degrès d'appartenance à la loi A ou B. p_total = p_A + p_B weight_A = p_A / p_total weight_B = p_B / p_total # Exemple : Si la taille de 189cm appartient à la lois B # alors weight_B(189) sera grand et weight_A(189) sera petit. #Ajustement des paramètres (μA,σA) et (μB,σB) en fonction du poids. A_mean = np.sum(X * weight_A )/ np.sum(weight_A) B_mean = np.sum(X * weight_B )/ np.sum(weight_B) A_sd = np.sqrt(np.sum(weight_A * (X - A_mean)**2) / np.sum(weight_A)) B_sd = np.sqrt(np.sum(weight_B * (X - B_mean)**2) / np.sum(weight_B)) ax.clear() ax.step(X,weight_A) ax.step(X,weight_B) # On recommence jusqu'à convergence. Non testé ici, je m'arrête à 50 iterations. res = stats.genlogistic.fit(weight_A) ax.plot(X,stats.genlogistic(res[0],loc=res[1],scale=res[2]).cdf(X)) pass
[docs] def example_one_gev(): data = [9.4, 38.0, 12.5, 35.3, 17.6, 12.9, 12.4, 19.6, 15.0, 13.2, 12.3, 16.9, 16.9, 29.4, 13.6, 11.1, 8.0, 16.6, 12.0, 13.1, 9.1, 9.7, 21.0, 11.2, 14.4, 18.8, 14.0, 19.9, 12.4, 10.8, 21.6, 15.4, 17.4, 14.8, 22.7, 11.5, 10.5, 11.8, 12.4, 16.6, 11.7, 12.9, 17.8] shape, loc, scale = stats.genextreme.fit(data) # mle = -np.sum(stats.genextreme(shape, loc=loc, scale=scale).logpdf(data)) # mle1 = stats.genextreme.nnlf((shape, loc, scale), data) # mle2 = _nnlf_gen((shape, loc, scale), data) # mle3 = _nnlf_gen((shape, loc, scale), data, np.ones(len(data))) if not np.allclose([shape,loc,scale],[-0.21988720690114583, 12.749730029827154, 3.448963234019624]): raise Exception('Bad result -- Verify') shape, loc, scale = fit_wgev(data,np.ones(len(data))) if not np.allclose([shape,loc,scale],[-0.21989445389551832, 12.74974586825777, 3.4490271528260927]): raise Exception('Bad result -- Verify')
[docs] def example_mixture_gev(): A_sol = [-.15,1.,1.] B_sol = [-.2,2.,1.5] data1 = stats.genextreme.rvs(A_sol[0], loc = A_sol[1], scale = A_sol[2], size=100) data2 = stats.genextreme.rvs(B_sol[0], loc = B_sol[1], scale = B_sol[2], size=100) data1 = np.sort(data1) data2 = np.sort(data2) data_all = np.sort(np.hstack((data1, data2))) fig, ax = plt.subplots(2,1) ax[0].hist(data_all, bins=200, density=True) ax[0].hist(data1,bins=100, density=True) ax[0].hist(data2,bins=100, density=True) ax[0].plot(data1, stats.genextreme(A_sol[0], loc = A_sol[1], scale = A_sol[2]).pdf(data1)) ax[0].plot(data2, stats.genextreme(B_sol[0], loc = B_sol[1], scale = B_sol[2]).pdf(data2)) plt.show(block=False) res = fit_mixture_gev(data_all, fig, ax, [A_sol, B_sol]) print('A : ',res[0]) print('B : ',res[1]) ax[0].plot(data_all,stats.genlogistic(res[0][0],loc=res[0][1],scale=res[0][2]).cdf(data_all)) ax[0].plot(data_all,stats.genlogistic(res[1][0],loc=res[1][1],scale=res[1][2]).cdf(data_all)) plt.show() pass
[docs] def test_mixture(): # The two component distributions: a normal and an exponential distribution component_dists = [stats.norm(), stats.expon()] # Chosen by fair coin flip ps = [0.5, 0.5] # We want the 75th percentile of the mixture p = 0.75 mymixt = MixtureModel(component_dists,ps) quantile = mymixt.ppf(p) test_p = mymixt.cdf(quantile) if abs(quantile - 1.044491028438254)>1e-13: raise Exception('Bad result -- Verify') if abs(test_p-p) > 1e-13: raise Exception('Bad result -- Verify') print("Computed quantile for p = 0.75: {}".format(quantile))
[docs] def test_season_mixture(): # The two component distributions: a normal and an exponential distribution component_dists = [stats.norm(), stats.expon()] # Chosen by fair coin flip ps = [0.5, 0.5] # We want the 75th percentile of the mixture p = 0.75 mymixt = SeasonMixtureModel(component_dists,ps) quantile = mymixt.ppf(p) test_p = mymixt.cdf(quantile) if abs(quantile - 2.1092509198855587) > 1e-13: raise Exception('Bad result -- Verify') if abs(test_p-p) > 1e-13: raise Exception('Bad result -- Verify') print("Computed quantile for p = 0.75: {}".format(quantile))
if __name__ == '__main__': test_mixture() test_season_mixture() # example_mixture_gev() # example_one_gev() # example_em()