Bei der Suche mit Python-Parallelverarbeitung wird als Erstes Multiprocessing oder Joblib angezeigt. ist.
Zu beiden gibt es verschiedene Kommentarartikel, aber Joblib ist besser als Multiprocessing.
Da es viele nützliche Funktionen wie gibt, habe ich das normalerweise verwendet. Ich hatte jedoch aus irgendeinem Grund die Möglichkeit, Multiprocessing zu verwenden, und als ich die Geschwindigkeiten der beiden verglich, verstand ich den Grund nicht, aber die Parallelisierung mit Multiprocessing war schneller, sodass ich ein Memorandum hinterlassen werde.
Joblib
Die Installation kann mit pip erfolgen.
Um kurz zu zeigen, wie man es benutzt,
def process(i):
return [{'id': j, 'sum': sum(range(i*j))} for j in range(100)]
Wenn es eine Funktion wie gibt
from joblib import Parallel, delayed
result = Parallel(n_jobs=-1)([delayed(process)(n) for n in range(1000)])
Sie können die Funktion "Prozess" parallel ausführen, indem Sie den Parallelisierungsprozess wie folgt schreiben. Sie können die Anzahl der zu verwendenden Kerne ändern, indem Sie den Inhalt von n_jobs
ändern. Es wird mit der maximalen Anzahl von Kernen ausgeführt, die mit -1 verwendet werden können, und wenn es 1 ist, ist es die gleiche Situation wie bei einer einzelnen Ausführung. Es ist einfach und gut.
Selbst wenn Sie in n_jobs mehr als die Anzahl der verfügbaren Kerne angeben, wird diese nicht gelöscht, und es scheint, dass die Verarbeitung entsprechend auf die verfügbaren Kerne verteilt wird.
Zusätzlich zu n_jobs können Sie ein ausführliches Argument verwenden. Wenn Sie einen Wert zwischen 0 und 10 angeben, wird der Fortschritt entsprechend der angegebenen Häufigkeit ausgegeben.
multiprocessing
Da Multiprocessing eine Standard-Python-Bibliothek ist, können Sie sie verwenden, ohne sie zu installieren. Es gibt viele Funktionen, aber die einfachste Möglichkeit, sie zu verwenden, ist
from multiprocessing import Pool
import multiprocessing as multi
p = Pool(multi.cpu_count())
p.map(process, list(range(1000)))
p.close()
Es sieht aus wie. Geben Sie die Anzahl der Parallelen zu Pool
an und führen Sie die Funktion parallel zu map
aus. Sie können die maximale Anzahl ausführbarer Kerne mit "multi.cpu_count ()" ermitteln. Bitte beachten Sie, dass wenn Sie den erstellten Pool nicht mit close ()
schließen, dies Speicherplatz verbraucht und eine große Sache ist.
Der größte Vorteil von joblib gegenüber Multiprocessing besteht darin, dass Sie alles andere als ein Array als Argument für die zu parallelisierende Funktion verwenden können.
Zum Beispiel
def process(i, s):
return [{'id': i, 'str': s, 'sum': sum(range(i*j))} for j in range(100)]
Angenommen, es gibt eine Funktion, die mehrere Argumente akzeptiert, wie z In joblib
strs = ['a', 'b', 'c']
result = Parallel(n_jobs=job)([delayed(process)(i,s) for i,s in enumerate(strs * 1000)])
Sie können es so machen, aber selbst wenn Sie versuchen, es mit Multiprocessing auf die gleiche Weise auszuführen,
p = Pool(multi.cpu_count())
strs = ['a', 'b', 'c']
p.map(process, enumerate(strs * 1000))
p.close()
Ich ärgere mich über das Argument und bekomme einen Fehler.
TypeError: process() missing 1 required positional argument:
In diesem Fall die Ausführungsfunktion
def process(v):
return [{'id': v[0], 'str': v[1], 'sum': sum(range(v[0]*j))} for j in range(100)]
Es muss geändert werden, damit ein Array als Argument verwendet wird.
Nun zum Geschwindigkeitsvergleich des Hauptthemas: Für die Ausführungsfunktion
def process(n):
return sum([i*n for i in range(100000)])
Versuchen Sie jeweils eine parallele Ausführung und messen Sie die Geschwindigkeit.
def usejoblib(job, num):
Parallel(n_jobs=job)([delayed(process)(n) for n in range(num)])
def usemulti(job, num):
p = Pool(multi.cpu_count() if job < 0 else job)
p.map(process, list(range(num)))
p.close()
Hier ist das Ergebnis einer 10-maligen Messung mit unterschiedlicher Anzahl von Schleifen und einer Mittelwertbildung (8 Jobs, Einheit ist Sek.)
loop_n | normal | Joblib | multiprocessing |
---|---|---|---|
10 | 0.0441 | 0.113 | 0.0217 |
100 | 0.414 | 0.211 | 0.139 |
1000 | 4.16 | 1.32 | 1.238 |
10000 | 41.1 | 12.5 | 12.2 |
100000 | 430 | 123 | 119 |
Beide sind viel schneller als ohne Parallelisierung. Es scheint, dass es keinen großen Geschwindigkeitsunterschied zwischen den beiden gibt (ist die Multiprozessor-Version etwas schneller?).
Wenn Sie versuchen, die Verarbeitungsmenge in einer Schleife zu erhöhen
def process(i):
return [{'id': j, 'sum': sum(range(i*j))} for j in range(1000)]
loop_n | normal | Joblib | multiprocessing |
---|---|---|---|
10 | 0.25 | 0.21 | 0.07 |
100 | 28.4 | 13.2 | 7.53 |
1000 | - | 737 | 701 |
Auch hier war die Mehrfachverarbeitung etwas schneller, aber ich wusste nicht warum. Wenn es einen solchen Unterschied gibt, kann das Ergebnis abhängig von der Ausführungsumgebung und der Verarbeitungslast der Funktion umgekehrt werden. In jedem Fall ist es jedoch viel schneller als die normale Schleifenverarbeitung. Seien Sie also aggressiv, wenn Sie die Schleifenverarbeitung mit Python durchführen. Ich möchte es als Ziel verwenden.
Zum Schluss werde ich den diesmal ausgeführten Code einfügen.
import time
from joblib import Parallel, delayed
from multiprocessing import Pool
import multiprocessing as multi
from more_itertools import flatten
import sys
import functools
def process(i):
return [{'id': j, 'sum': sum(range(i*j))} for j in range(1000)]
#def process(n):
# return sum([i*n for i in range(100000)])
def usejoblib(job, num):
result =Parallel(n_jobs=job)([delayed(process)(n) for n in range(num)])
return result
def usemulti(job, num):
p = Pool(multi.cpu_count() if job < 0 else job)
result = p.map(process, range(num))
p.close()
return result
if __name__ == '__main__':
argv = sys.argv
total = 0
n = 1
for i in range(n):
s = time.time()
if argv[1] == 'joblib':
result = usejoblib(int(argv[2]),int(argv[3]))
elif argv[1] == 'multi':
result = usemulti(int(argv[2]),int(argv[3]))
else:
result = [process(j) for j in range(int(argv[3]))]
elapsed = time.time()-s
print('time: {0} [sec]'.format(elapsed))
total += elapsed
print('--------')
print('average: {0} [sec]'.format(total/n))
sums = functools.reduce(lambda x, y: x + y['sum'], list(flatten(result)), 0)
print('total: {0}'.format(sums))
# print('total: {0}'.format(sum(result)))
Recommended Posts