[PYTHON] Calcul d'algorithme EM pour un problème de distribution gaussienne mixte

Bonjour. J'ai écrit le calcul de l'algorithme EM pour le problème de distribution gaussienne mixte en Python. Ceci est un exemple d'univarié et bivarié. Étant donné que les valeurs initiales, etc. sont générées aléatoirement, vous pouvez voir que la progression de la convergence change différemment si vous l'exécutez à plusieurs reprises.

$ ./em.py --univariate
nstep= 98  log(likelihood) = -404.36

$ ./em.py --bivariate
nstep= 39  log(likelihood) = -1534.51

figure_2.png figure_1.png

em.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division
from __future__ import print_function
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
from matplotlib.patches import Ellipse

#Distribution gaussienne mixte par= (pi, mean, var): (Coefficient de mélange, moyenne, variance)
def gaussians(x, par):
    return [gaussian(x-mu, var) * pi for pi,mu,var in zip(*par)]

#Distribution gaussienne
def gaussian(x, var):
    nvar = n_variate(x)
    if not nvar:
        qf, detvar, nvar = x**2/var, var, 1
    else:
        qf, detvar = np.dot(np.linalg.solve(var, x), x), np.linalg.det(var)
    return np.exp(-qf/2) / np.sqrt(detvar*(2*np.pi)**nvar)

#Probabilité du journal
def loglikelihood(data, par):
    gam = [gaussians(x, par) for x in data]
    ll = sum([np.log(sum(g)) for g in gam])
    return ll, gam

#Étape E
def e_step(data, pars):
    ll, gam = loglikelihood(data, pars)
    gammas = transpose(map(normalize, gam))
    return gammas, ll

#M étape pars= (pis, means, vars)
def m_step(data, gammas):
    ws = map(sum, gammas)
    pis = normalize(ws)
    means = [np.dot(g, data)/w for g, w in zip(gammas, ws)]
    vars = [make_var(g, data, mu)/w for g, w, mu in zip(gammas, ws, means)]
    return pis, means, vars

#Co-distribué
def make_var(gammas, data, mean):
    return np.sum([g * make_cov(x-mean) for g, x in zip(gammas, data)], axis=0)

def make_cov(x):
    nvar = n_variate(x)
    if not nvar:
        return x**2
    m = np.matrix(x)
    return m.reshape(nvar, 1) * m.reshape(1, nvar)

#n-variable
def n_variate(x):
    if isinstance(x, (list, np.ndarray)):
        return len(x)
    return 0  # univariate

#Normalisation
def normalize(lst):
    s = sum(lst)
    return [x/s for x in lst]

#Translocation
def transpose(a):
    return zip(*a)

def flatten(lst):
    if isinstance(lst[0], np.ndarray):
        lst = map(list, lst)
    return sum(lst, [])

#ellipse
def ellipse(cov, mstd=1.0):
    vals, vecs = eigsorted(cov)
    radii = mstd * np.sqrt(vals)
    tilt = np.degrees(np.arctan2(*vecs[:,0][::-1]))
    return radii, tilt

def eigsorted(cov):
    vals, vecs = np.linalg.eigh(cov)
    order = vals.argsort()[::-1]
    return vals[order], vecs[:,order]

# color map
def cm(x, a=0.85):
    if x > a:
        return (1, 0, (1-x)/(1-a), 1)
    return (x/a, 1-x/a, 1, 1)

#Graphique des résultats
def plot_em(data, ls, par):
    nvar = n_variate(data[0])
    col = [cm((l-ls[0])/(ls[-1]-ls[0])) for l in ls]
    ax1 = plt.subplot2grid((4, 1), (0, 0), rowspan=3)  # subplot(211)
    ax2 = plt.subplot2grid((4, 1), (3, 0))  # subplot(212)
    if not nvar:
        subplot_hist(data, par, col, ax1)
    elif nvar == 2:
        subplot_bivariate(data, ls, par, col, ax1)
    subplot_loglikelihood(ls, col, ax2)
    plt.show()
    return 0

#Affichage de l'histogramme (univarié)
def subplot_hist(data, pars, col, ax):
    xs = np.linspace(min(data), max(data), 200)
    ax.hist(data, bins=20, normed=True, alpha=0.1)
    for par, c in zip(pars, col):
        norm = [mlab.normpdf(xs, m, np.sqrt(var))*pi for pi,m,var in zip(*par)]
        ax.plot(xs, sum(norm), c=c, lw=1, alpha=0.8)
    ax.set_xlim(min(data), max(data))
    ax.set_xlabel("x")
    ax.set_ylabel("Probability")
    ax.grid()
    return 0

#Affichage de la distribution gaussienne bivariée
def subplot_bivariate(data, ls, par, cols, ax):
    x, y = zip(*data)
    ax.plot(x, y, 'g.', alpha=0.5)
    ax.grid()
    ax.set(aspect=1) # 'equal'
    nstep = 4
    mstd = 4.0
    for i in range(nstep):
        j = ((len(ls)-1)*i)//(nstep-1)
        (pi, mean, cov), col = par[j], cols[j]
        for m, c in zip(mean, cov):
            radii, tilt = ellipse(c, mstd)
            ax.add_artist(Ellipse(xy=m, width=radii[0], height=radii[1], angle=tilt, ec=col, fc='none', alpha=0.5))
    return 0

#Transition de la vraisemblance logarithmique
def subplot_loglikelihood(ls, col, ax):
    ax.scatter(range(len(ls)), ls, c=col, edgecolor='none')
    ax.set_xlim(-1, len(ls))
    ax.set_xlabel("steps")
    ax.set_ylabel("loglikelihood")
    ax.grid()
    return 0

#Données de distribution gaussienne mixte (K:Nombre de distributions gaussiennes mixtes)
def make_data(typ_nvariate):
    if typ_nvariate == 'univariate':  #Univarié
        par = [(2.0, 0.2, 100), (4.0, 0.4, 600), (6.0, 0.4, 300)]
        data = flatten([np.random.normal(mu,sig,n) for mu,sig,n in par])
        K = len(par)
        means = [np.random.choice(data) for _ in range(K)]
        vars = [np.var(data)]*K
    elif typ_nvariate == 'bivariate':  #Bivarié
        nvar, ndat, sig = 2, 250, 0.4
        centers = [[1, 1], [-1, -1], [1, -1]]
        K = len(centers)
        data = flatten([np.random.randn(ndat,nvar)*sig + np.array(c) for c in centers])
        means = np.random.rand(K, nvar)
        vars = [np.identity(nvar)]*K
    pis = [1.0/K]*K
    return data, (pis, means, vars)

#Algorithme EM (gammas: 'burden rates', or 'responsibilities')
def em(typ_nvariate='univariate'):
    delta_ls, max_step = 1e-5, 400
    lls, pars = [], []  #Enregistrez le résultat du calcul de chaque étape
    data, par = make_data(typ_nvariate)
    for _ in range(max_step):
        gammas, ll = e_step(data, par)
        par = m_step(data, gammas)
        pars.append(par)
        lls.append(ll)
        if len(lls) > 8 and lls[-1] - lls[-2] < delta_ls:
            break
    #Sortie de résultat
    print('nstep=%3d' % len(lls), " log(likelihood) =", lls[-1])
    plot_em(data, lls[1:], pars[1:])
    return 0

def main():
    """{f}: EM algorithm for a Gaussian mixture problem.

    usage: {f} [-h] [--univariate | --bivariate]
    
    options:
        -h, --help    show this help message and exit
        --univariate  calculates a univariate problem (default)
        --bivariate   calculates a bivariate problem
    """
    import docopt, textwrap
    args = docopt.docopt(textwrap.dedent(main.__doc__.format(f=__file__)))
    typ_nvariate = ["univariate", "bivariate"][args["--bivariate"]]
    em(typ_nvariate)

if __name__ == '__main__':
    main()

Recommended Posts

Calcul d'algorithme EM pour un problème de distribution gaussienne mixte
EM de distribution gaussienne mixte
Algorithme EM modèle mixte gaussien [apprentissage automatique statistique]
Distribution gaussienne mixte et logsumexp
Dérivation de l'algorithme EM et exemple de calcul pour le tirage au sort
PRML Chapitre 9 Implémentation Python de distribution gaussienne mixte
Estimation de la distribution gaussienne mixte par la méthode de Bayes
(Apprentissage automatique) J'ai essayé de comprendre attentivement l'algorithme EM dans la distribution gaussienne mixte avec l'implémentation.