[PYTHON] [Statistiques] Multitraitement de l'échantillonnage MCMC

Il s'agit d'un article qui tente d'accélérer l'implémentation scratch de MCMC en le rendant multitraitement. Dans l'article de l'autre jour, "[Statistics] Je vais expliquer l'échantillonnage par la méthode de Monte Carlo en chaîne de Markov (MCMC) avec animation.", j'ai implémenté la chaîne. Je ne l'avais pas, donc je n'avais qu'une chaîne, mais j'ai essayé de l'échantillonner avec plusieurs chaînes et de l'exécuter en tant que multi-processus. Étant donné que MCMC est indépendant pour chaque chaîne, il est acceptable de simplement séparer le processus, il était donc facile d'accélérer.

environnement

⇒ Puisqu'il a 2 cœurs, seuls jusqu'à 2 processus peuvent être efficacement accélérés ...

Code pour cet article

Le code est publié sur GitHub.  https://github.com/matsuken92/Qiita_Contents/blob/master/multiprocessing/parallel_MCMC.ipynb

Bases du multiprocessing

Tout d'abord, j'aimerais voir le mouvement de MultiProcessing avec un processus simple.

Le premier est l'importation de la bibliothèque. Nous utilisons une classe appelée Pool qui gère plusieurs processus de travail.

from multiprocessing import Pool

Pour le moment, cela semble être un processus lourd, alors ciblons un processus qui boucle beaucoup. Cela ne fait que s'additionner, mais il faut quelques secondes pour le tourner environ 100000000 fois.

def test_calc(num):
    """Traitement lourd"""
    _sum = 0
    for i in xrange(num):
        _sum += i
    return _sum

Mesurons la vitesse lorsque ce processus est exécuté deux fois dans l'ordre.

#Mesurer le temps où il est exécuté deux fois de manière séquentielle
start = time.time()
_sum = 0
for _ in xrange(2):
    _sum += test_calc(100000000)
end = time.time()
print _sum
print "time: {}".format(end-start)

Cela a pris moins de 12 secondes.

out


9999999900000000
time: 11.6906960011

Ensuite, effectuez le même traitement en parallèle pour deux processus et mesurez.

#Mesurer le temps d'exécution en 2 processus
n_worker = 2

pool = Pool(processes=n_worker)

#Liste des arguments à passer aux fonctions exécutées par les deux processus
args = [100000000] * n_worker

start = time.time() #la mesure
result = pool.map(test_calc, args)
end = time.time()   #la mesure

print  np.sum(result)
print "time: {}".format(end-start)
pool.close()

C'est un peu plus de 6 secondes, donc c'est presque la moitié du temps. J'ai pu accélérer par 2 processus: en riant:

out


9999999900000000
time: 6.28346395493

Application de MultiProcessing à l'échantillonnage MCMC

Maintenant, appliquons cela au traitement de chaque chaîne d'échantillonnage MCMC en parallèle. Comme toujours, la première chose à faire est d'importer la bibliothèque.

import numpy as np
import numpy.random as rd
import scipy.stats as st
import copy, time, os
from datetime import datetime as dt

from multiprocessing import Pool

%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="whitegrid", palette="muted", color_codes=True)

La fonction P (・) est le noyau de distribution postérieure cible. Ici, nous utilisons un noyau de distribution normale bidimensionnelle.

#Fonction de probabilité moins constante de normalisation
def P(x1, x2, b):
    assert np.abs(b) < 1
    return np.exp(-0.5*(x1**2 - 2*b*x1*x2 + x2**2))

Les paramètres de la distribution proposée sont définis comme globaux. Il définit également une fonction «maintenant (・)» pour la mesure du temps. Une fonction qui affiche une chaîne de l'heure actuelle.

# global parameters
b = 0.5
delta = 1

def now():
    return  dt.strftime(dt.now(), '%H:%M:%S')

Voici la fonction qui effectue l'échantillonnage. Échantillonnage jusqu'à ce que le nombre d'échantillons spécifié soit atteint. C'est presque la même chose que Dernière fois sauf qu'il est fonctionnalisé et que le code de mesure du temps est ajouté. La clé est d'exécuter cette fonction en parallèle. Comme il est échantillonné pour chaque chaîne, il peut se déplacer indépendamment entre les processus, de sorte que la communication entre les processus n'est pas nécessaire et cela semble facile.

def exec_sampling(n_samples):
    global b, delta
    rd.seed(int(time.time())+os.getpid())
    pid = os.getpid()
    start = time.time()
    start_time = now()
    
    #initial state
    sampling_result = []
    current = np.array([5, 5])
    sampling_result.append(current)
    cnt = 1
    while cnt < n_samples:
        # rv from proposal distribution(Normal Dist: N(0, delta) )
        next = current + rd.normal(0, delta, size=2)
        r = P(next[0], next[1], b)/P(current[0], current[1], b)

        if r > 1 or r > rd.uniform(0, 1):
            # 0-Lorsque le nombre aléatoire uniforme de 1 est supérieur à r, l'état est mis à jour.
            current = copy.copy(next)
            sampling_result.append(current)
            cnt += 1
            
    end = time.time()    
    end_time = now()

    #Affichage du temps requis pour chaque chaîne
    print "PID:{}, exec time: {}, {}-{}".format(pid, end-start, start_time, end_time)
    return sampling_result

Les trois fonctions suivantes draw_scatter (), draw_traceplot () et remove_burn_in_samples () sont des fonctions qui traitent le résultat de l'échantillonnage.

def draw_scatter(sample, alpha=0.3):
    """Dessinez un diagramme de dispersion des résultats d'échantillonnage"""
    plt.figure(figsize=(9,9))
    plt.scatter(sample[:,0], sample[:,1], alpha=alpha)
    plt.title("Scatter plot of 2-dim normal random variable with MCMC. sample size:{}".format(len(sample)))
    plt.show()
    
def draw_traceplot(sample):
    """Dessinez un tracé du résultat de l'échantillonnage"""
    assert sample.shape[1] == 2
    
    plt.figure(figsize=(15, 6))
    
    for i in range(2):
        plt.subplot(2, 1, i+1)
        plt.xlim(0, len(sample[:,i]))
        plt.plot(sample[:,i], lw=0.05)
        if i == 0:
            order = "1st"
        else:
            order = "2nd"
        plt.title("Traceplot of {} parameter.".format(order))
    
    plt.show()

def remove_burn_in_samples(total_sampling_result, burn_in_rate=0.2):
    """Burn-Excluez l'échantillon de la section spécifiée dans."""
    adjust_burn_in_result = []
    for i in xrange(len(total_sampling_result)):
        idx = int(len(total_sampling_result[i])*burn_in_rate)
        adjust_burn_in_result.extend(total_sampling_result[i][idx:])
    return np.array(adjust_burn_in_result)

Vous trouverez ci-dessous les fonctions qui effectuent un traitement parallèle. Si vous regardez de près, vous pouvez voir que c'est pratiquement le même que le premier exemple simple.

def parallel_exec(n_samples, n_chain, burn_in_rate=0.2):
    """Exécution du traitement parallèle"""

    #Calculer la taille de l'échantillon par chaîne
    n_samples_per_chain = n_samples / float(n_chain)
    print "Making {} samples per {} chain. Burn-in rate:{}".format(n_samples_per_chain, n_chain, burn_in_rate)

    #Créer un objet Pool
    pool = Pool(processes=n_chain)

    #Génération d'arguments pour l'exécution
    n_trials_per_process = [n_samples_per_chain] * n_chain

    #Exécution du traitement parallèle
    start = time.time() #la mesure
    total_sampling_result = pool.map(exec_sampling, n_trials_per_process)
    end = time.time()   #la mesure

    #Affichage du temps total requis
    print "total exec time: {}".format(end-start)

    # Drawing scatter plot
    adjusted_samples = remove_burn_in_samples(total_sampling_result)
    draw_scatter(adjusted_samples, alpha=0.01)
    draw_traceplot(adjusted_samples)
    pool.close()

Voyons maintenant l'effet réel. Le nombre d'échantillons est de 1 000 000 et le nombre de chaînes est mesuré lorsqu'il est de 2 et lorsqu'il est de 1.

Nombre d'échantillons: 1000000, Nombre de chaînes: 2

#paramètre: n_samples = 1000000, n_chain = 2
parallel_exec(1000000, 2)

L'échantillonnage est effectué en moins de 19 secondes au total, environ 12 secondes par processus de travail.

out


Making 500000.0 samples per 2 chain. Burn-in rate:0.2
total exec time: 18.6980280876
PID:2374, exec time: 12.0037689209, 20:53:41-20:53:53
PID:2373, exec time: 11.9927477837, 20:53:41-20:53:53

scatter_chain2.png

traceplot_chain2.png

Nombre d'échantillons: 1000000, Nombre de chaînes: 1

#paramètre: n_samples = 1000000, n_chain = 1
parallel_exec(1000000, 1)

L'exécution dans un processus de travail a pris moins de 33 secondes. Donc, si vous l'exécutez en deux processus, vous pouvez voir qu'il s'exécute 1,7 fois plus vite: satisfait:

out


Making 1000000.0 samples per 1 chain. Burn-in rate:0.2
total exec time: 32.683218956
PID:2377, exec time: 24.7304420471, 20:54:07-20:54:31

scatter_chain1.png

traceplot_chain1.png

référence

Documentation Python (2.7ja1) 16.6. Multiprocessing - Interface de «traitement parallèle» basée sur les processus  http://docs.python.jp/2.7/library/multiprocessing.html

Python haute performance (O'Reilly)  https://www.oreilly.co.jp/books/9784873117409/

Recommended Posts

[Statistiques] Multitraitement de l'échantillonnage MCMC
Pratiquer des méthodes typiques de statistiques (1)