Il s'agit d'un article qui tente d'accélérer l'implémentation scratch de MCMC en le rendant multitraitement. Dans l'article de l'autre jour, "[Statistics] Je vais expliquer l'échantillonnage par la méthode de Monte Carlo en chaîne de Markov (MCMC) avec animation.", j'ai implémenté la chaîne. Je ne l'avais pas, donc je n'avais qu'une chaîne, mais j'ai essayé de l'échantillonner avec plusieurs chaînes et de l'exécuter en tant que multi-processus. Étant donné que MCMC est indépendant pour chaque chaîne, il est acceptable de simplement séparer le processus, il était donc facile d'accélérer.
⇒ Puisqu'il a 2 cœurs, seuls jusqu'à 2 processus peuvent être efficacement accélérés ...
Le code est publié sur GitHub. https://github.com/matsuken92/Qiita_Contents/blob/master/multiprocessing/parallel_MCMC.ipynb
Tout d'abord, j'aimerais voir le mouvement de MultiProcessing avec un processus simple.
Le premier est l'importation de la bibliothèque. Nous utilisons une classe appelée Pool qui gère plusieurs processus de travail.
from multiprocessing import Pool
Pour le moment, cela semble être un processus lourd, alors ciblons un processus qui boucle beaucoup. Cela ne fait que s'additionner, mais il faut quelques secondes pour le tourner environ 100000000 fois.
def test_calc(num):
"""Traitement lourd"""
_sum = 0
for i in xrange(num):
_sum += i
return _sum
Mesurons la vitesse lorsque ce processus est exécuté deux fois dans l'ordre.
#Mesurer le temps où il est exécuté deux fois de manière séquentielle
start = time.time()
_sum = 0
for _ in xrange(2):
_sum += test_calc(100000000)
end = time.time()
print _sum
print "time: {}".format(end-start)
Cela a pris moins de 12 secondes.
out
9999999900000000
time: 11.6906960011
Ensuite, effectuez le même traitement en parallèle pour deux processus et mesurez.
#Mesurer le temps d'exécution en 2 processus
n_worker = 2
pool = Pool(processes=n_worker)
#Liste des arguments à passer aux fonctions exécutées par les deux processus
args = [100000000] * n_worker
start = time.time() #la mesure
result = pool.map(test_calc, args)
end = time.time() #la mesure
print np.sum(result)
print "time: {}".format(end-start)
pool.close()
C'est un peu plus de 6 secondes, donc c'est presque la moitié du temps. J'ai pu accélérer par 2 processus: en riant:
out
9999999900000000
time: 6.28346395493
Maintenant, appliquons cela au traitement de chaque chaîne d'échantillonnage MCMC en parallèle. Comme toujours, la première chose à faire est d'importer la bibliothèque.
import numpy as np
import numpy.random as rd
import scipy.stats as st
import copy, time, os
from datetime import datetime as dt
from multiprocessing import Pool
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="whitegrid", palette="muted", color_codes=True)
La fonction P (・)
est le noyau de distribution postérieure cible. Ici, nous utilisons un noyau de distribution normale bidimensionnelle.
#Fonction de probabilité moins constante de normalisation
def P(x1, x2, b):
assert np.abs(b) < 1
return np.exp(-0.5*(x1**2 - 2*b*x1*x2 + x2**2))
Les paramètres de la distribution proposée sont définis comme globaux. Il définit également une fonction «maintenant (・)» pour la mesure du temps. Une fonction qui affiche une chaîne de l'heure actuelle.
# global parameters
b = 0.5
delta = 1
def now():
return dt.strftime(dt.now(), '%H:%M:%S')
Voici la fonction qui effectue l'échantillonnage. Échantillonnage jusqu'à ce que le nombre d'échantillons spécifié soit atteint. C'est presque la même chose que Dernière fois sauf qu'il est fonctionnalisé et que le code de mesure du temps est ajouté. La clé est d'exécuter cette fonction en parallèle. Comme il est échantillonné pour chaque chaîne, il peut se déplacer indépendamment entre les processus, de sorte que la communication entre les processus n'est pas nécessaire et cela semble facile.
def exec_sampling(n_samples):
global b, delta
rd.seed(int(time.time())+os.getpid())
pid = os.getpid()
start = time.time()
start_time = now()
#initial state
sampling_result = []
current = np.array([5, 5])
sampling_result.append(current)
cnt = 1
while cnt < n_samples:
# rv from proposal distribution(Normal Dist: N(0, delta) )
next = current + rd.normal(0, delta, size=2)
r = P(next[0], next[1], b)/P(current[0], current[1], b)
if r > 1 or r > rd.uniform(0, 1):
# 0-Lorsque le nombre aléatoire uniforme de 1 est supérieur à r, l'état est mis à jour.
current = copy.copy(next)
sampling_result.append(current)
cnt += 1
end = time.time()
end_time = now()
#Affichage du temps requis pour chaque chaîne
print "PID:{}, exec time: {}, {}-{}".format(pid, end-start, start_time, end_time)
return sampling_result
Les trois fonctions suivantes draw_scatter ()
, draw_traceplot ()
et remove_burn_in_samples ()
sont des fonctions qui traitent le résultat de l'échantillonnage.
def draw_scatter(sample, alpha=0.3):
"""Dessinez un diagramme de dispersion des résultats d'échantillonnage"""
plt.figure(figsize=(9,9))
plt.scatter(sample[:,0], sample[:,1], alpha=alpha)
plt.title("Scatter plot of 2-dim normal random variable with MCMC. sample size:{}".format(len(sample)))
plt.show()
def draw_traceplot(sample):
"""Dessinez un tracé du résultat de l'échantillonnage"""
assert sample.shape[1] == 2
plt.figure(figsize=(15, 6))
for i in range(2):
plt.subplot(2, 1, i+1)
plt.xlim(0, len(sample[:,i]))
plt.plot(sample[:,i], lw=0.05)
if i == 0:
order = "1st"
else:
order = "2nd"
plt.title("Traceplot of {} parameter.".format(order))
plt.show()
def remove_burn_in_samples(total_sampling_result, burn_in_rate=0.2):
"""Burn-Excluez l'échantillon de la section spécifiée dans."""
adjust_burn_in_result = []
for i in xrange(len(total_sampling_result)):
idx = int(len(total_sampling_result[i])*burn_in_rate)
adjust_burn_in_result.extend(total_sampling_result[i][idx:])
return np.array(adjust_burn_in_result)
Vous trouverez ci-dessous les fonctions qui effectuent un traitement parallèle. Si vous regardez de près, vous pouvez voir que c'est pratiquement le même que le premier exemple simple.
def parallel_exec(n_samples, n_chain, burn_in_rate=0.2):
"""Exécution du traitement parallèle"""
#Calculer la taille de l'échantillon par chaîne
n_samples_per_chain = n_samples / float(n_chain)
print "Making {} samples per {} chain. Burn-in rate:{}".format(n_samples_per_chain, n_chain, burn_in_rate)
#Créer un objet Pool
pool = Pool(processes=n_chain)
#Génération d'arguments pour l'exécution
n_trials_per_process = [n_samples_per_chain] * n_chain
#Exécution du traitement parallèle
start = time.time() #la mesure
total_sampling_result = pool.map(exec_sampling, n_trials_per_process)
end = time.time() #la mesure
#Affichage du temps total requis
print "total exec time: {}".format(end-start)
# Drawing scatter plot
adjusted_samples = remove_burn_in_samples(total_sampling_result)
draw_scatter(adjusted_samples, alpha=0.01)
draw_traceplot(adjusted_samples)
pool.close()
Voyons maintenant l'effet réel. Le nombre d'échantillons est de 1 000 000 et le nombre de chaînes est mesuré lorsqu'il est de 2 et lorsqu'il est de 1.
#paramètre: n_samples = 1000000, n_chain = 2
parallel_exec(1000000, 2)
L'échantillonnage est effectué en moins de 19 secondes au total, environ 12 secondes par processus de travail.
out
Making 500000.0 samples per 2 chain. Burn-in rate:0.2
total exec time: 18.6980280876
PID:2374, exec time: 12.0037689209, 20:53:41-20:53:53
PID:2373, exec time: 11.9927477837, 20:53:41-20:53:53
#paramètre: n_samples = 1000000, n_chain = 1
parallel_exec(1000000, 1)
L'exécution dans un processus de travail a pris moins de 33 secondes. Donc, si vous l'exécutez en deux processus, vous pouvez voir qu'il s'exécute 1,7 fois plus vite: satisfait:
out
Making 1000000.0 samples per 1 chain. Burn-in rate:0.2
total exec time: 32.683218956
PID:2377, exec time: 24.7304420471, 20:54:07-20:54:31
Documentation Python (2.7ja1) 16.6. Multiprocessing - Interface de «traitement parallèle» basée sur les processus http://docs.python.jp/2.7/library/multiprocessing.html
Python haute performance (O'Reilly) https://www.oreilly.co.jp/books/9784873117409/