Wenn Sie die Interprozesskommunikation in Pythons Multiprozess implementiert haben Der Austausch einer großen Datenmenge dauerte enorm lange.
Also habe ich jede Methode gemessen. Die Version von Python ist 3.7.
Die Ergebnisse sind wie folgt.
send time | recv time | Datenbeschränkungen | Separate Schreibverwaltung | Bemerkungen | |
---|---|---|---|---|---|
Queue | 0.00s | 5.33s | Dinge, die eingelegt werden können | Nicht notwendig | |
Pipe | 3.12s | 5.33s | Dinge, die eingelegt werden können | Nicht notwendig | |
geteilte Erinnerung(Array) | 3.02s | 2.55s | 1-dimensionales Array | notwendig | Daten sind eingeschränkt |
Manager | 10.19s | 10.29s | Arrays, Wörterbücher usw. | Nicht notwendig | |
RawArray | 5.61s(Einschließlich numpy) | 0.18s | 1-dimensionales Array | notwendig | Schneller aber komplizierter Code |
File | 3.86s | 5.26s | Dinge, die eingelegt werden können | notwendig | Über Datei |
socket | 4.13s(Einschließlich Gurke) | 5.34s(Einschließlich Gurke) | Dinge, die eingelegt werden können | notwendig | Der Code ist kompliziert. |
Nehmen wir zunächst Pipe als Beispiel, um zu sehen, wie stark sich die Geschwindigkeit in Abhängigkeit von der Anzahl der Daten unterscheidet.
Als Auswertungsmethode wird die Datenübertragung nur einmal durchgeführt und die Größe des Arrays zum Zeitpunkt der Übertragung wird geändert. Der p1-Prozess sendet die Daten und sendet sie nur einmal. Der p2-Prozess ist ein Prozess, der auf Daten wartet und beendet wird, wenn sie empfangen werden.
import multiprocessing as mp
import time
def pipe1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.send(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def pipe2(*args):
p = args[0]
key = args[1]
while True:
if not p.poll():
continue
print("[{}]recv start".format(key))
t0 = time.time()
p.recv()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_pipe(key, size=100000000):
reciver, sender = mp.Pipe(duplex=False)
p1 = mp.Process(target=pipe1, args=(sender, key, size))
p2 = mp.Process(target=pipe2, args=(reciver, key))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
main_pipe("pipe10", 10)
main_pipe("pipe10^5", 100000)
main_pipe("pipe10^6", 1000000)
main_pipe("pipe10^7", 10000000)
main_pipe("pipe10^8", 100000000)
Eine Assert-Anweisung ist ebenfalls enthalten, um zu überprüfen, ob die Daten empfangen wurden.
Übertragungszeit | Empfangszeit | |
---|---|---|
pipe10 | 0.0s | 0.0s |
pipe10^5 | 0.0109s | 0.0039s |
pipe10^6 | 0.0324s | 0.0708s |
pipe10^7 | 0.3240s | 0.6425s |
pipe10^8 | 3.2228s | 6.4095s |
Der Code ist nur ein Auszug aus dem Senden und Empfangen der einzelnen Daten. Die Datengröße ist auf 10 ^ 8 festgelegt.
Den vollständigen Code finden Sie am Ende des Artikels.
Referenz: Vollständiges Verständnis von Python-Threading und Multiprocessing
Queue
import multiprocessing as mp
#Erstellen(Auszug)
q = mp.Queue()
#Senden(Auszug)
p.put(d)
#Erhalten(Auszug)
if not p.empty():
d = p.get()
Ausführungsergebnis
[queue]send start
[queue]send end: 0.0s
[queue]recv start
[queue]recv end: 5.337700128555298s
Pipe Pipe hat eine unidirektionale Implementierung.
import multiprocessing as mp
#Erstellen(Auszug)
reciver, sender = mp.Pipe(duplex=False)
#Senden(Auszug)
sender.send(d)
#Erhalten(Auszug)
if p.poll():
reciver = p.recv()
Ausführungsergebnis
[pipe]send start
[pipe]recv start
[pipe]send end: 3.121206045150757s
[pipe]recv end: 5.337015151977539s
Da sich im Shared Memory selbst nichts befindet, um festzustellen, ob der Schreibvorgang abgeschlossen ist Eine separate Wertvariable (Shared Memory) wird hinzugefügt.
import multiprocessing as mp
import ctypes
#Erstellen(Auszug)
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)
#Senden(Auszug)
p[:] = d
flag.value = True
#Erhalten(Auszug)
if flag.value:
d = p[:]
Ausführungsergebnis
[array]send start
[array]send end: 3.0218513011932373s
[array]recv start
[array]recv end: 2.5539581775665283s
Manager
import multiprocessing as mp
#Erstellen(Auszug)
with mp.Manager() as manager:
shared_arr = manager.list()
#Senden(Auszug)
shared_arr.append(d)
#Erhalten(Auszug)
if len(p) > 0:
d = p.pop()
Ausführungsergebnis
[manager]send start
[manager]send end: 10.194092750549316s
[manager]recv start
[manager]recv end: 10.295690059661865s
RawArray
Ich beziehe mich auf Folgendes.
Wie beim Shared Memory (Array) wird eine Flag-Variable für den Abschluss der Übertragung separat vorbereitet. Zusätzlich wird die für die Numpy-Konvertierung erforderliche Zeit separat gemessen.
import multiprocessing.sharedctypes
import ctypes
import numpy as np
#Erstellen(Auszug)
byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
flag = mp.Value(ctypes.c_bool, False)
#Senden(Auszug)
d = np.asarray(d)
np.asarray(p)[:] = d
flag.value = True
#Erhalten(Auszug)
if flag.value:
d = np.array(p, dtype=np.uint8, copy=True)
Ausführungsergebnis
[raw]cast numpy time: 5.4573814868927s
[raw]send start
[raw]send end: 0.15658187866210938s
[raw]recv start
[raw]recv end: 0.18018245697021484s
File
Beim Richtungswechsel und beim Austausch über Dateien statt über Speicher. Wie beim gemeinsamen Speicher wird auch das Ende des Schreibens von der Flag-Variablen verwaltet.
import multiprocessing as mp
import pickle
import tempfile
import os
#Erstellen(Auszug)
with tempfile.TemporaryDirectory() as tmpdir:
flag = mp.Value(ctypes.c_bool, False)
#Senden(Auszug)
with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
pickle.dump(d, f)
flag.value = True
#Erhalten(Auszug)
if flag.value:
with open(os.path.join(p, 'testfile'), 'r+b') as f:
d = pickle.load(f)
Ausführungsergebnis
[file]send start
[file]send end: 3.8698363304138184s
[file]recv start
[file]recv end: 5.267671585083008s
socket
Es wird durch Socket-Kommunikation ausgetauscht, indem die Richtung weiter geändert wird. Die Flag-Variable wird verwendet, um den Serverstart zu bestätigen. Zusätzlich wird die Gurkenumwandlung separat gemessen.
import multiprocessing as mp
import socket
import pickle
#Erstellen(Auszug)
flag = mp.Value(ctypes.c_bool, False)
#Senden (Client)(Auszug)
d = pickle.dumps(d)
if flag.value:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', 50007))
s.sendall(d)
#Empfangen (Server)(Auszug)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 50007))
s.listen(1)
flag.value = True
while True:
conn, addr = s.accept()
d = b""
with conn:
while True:
#Empfange Daten
data = conn.recv(1024*1024*1024)
if not data:
break
d += data
d = pickle.loads(d)
Ausführungsergebnis
[socket]server wait start
[socket]pickle pack time: 3.798427104949951s
[socket]send start
[socket]recv start
[socket]send end: 0.3363354206085205s
[socket]recv end: 0.5375902652740479s
[socket]pickle unpack time: 4.91701340675354s
import multiprocessing as mp
import multiprocessing.sharedctypes
import time
import ctypes
import numpy as np
import tempfile
import pickle
import os
import socket
def queue1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.put(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def queue2(*args):
p = args[0]
key = args[1]
while True:
if p.empty():
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p.get()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_queue(key, size=10000*10000):
q = mp.Queue()
p1 = mp.Process(target=queue1, args=(q, key, size))
p2 = mp.Process(target=queue2, args=(q, key))
p1.start()
p2.start()
p1.join()
p2.join()
def pipe1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.send(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def pipe2(*args):
p = args[0]
key = args[1]
while True:
if not p.poll():
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p.recv()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_pipe(key, size=10000*10000):
reciver, sender = mp.Pipe(duplex=False)
p1 = mp.Process(target=pipe1, args=(sender, key, size))
p2 = mp.Process(target=pipe2, args=(reciver, key))
p1.start()
p2.start()
p1.join()
p2.join()
def array1(*args):
p = args[0]
flag = args[1]
key = args[2]
size = args[3]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p[:] = d
flag.value = True
print("[{}]send end: {}s".format(key, time.time()-t0))
def array2(*args):
p = args[0]
flag = args[1]
key = args[2]
while True:
if not flag.value: #Warten Sie, bis sich die Daten ändern
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p[:]
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_array(key, size=10000*10000):
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=array1, args=(arr, flag, key, size))
p2 = mp.Process(target=array2, args=(arr, flag, key))
p1.start()
p2.start()
p1.join()
p2.join()
def manager1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.append(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def manager2(*args):
p = args[0]
key = args[1]
while True:
if len(p) == 0:
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p.pop()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_manager(key, size=10000*10000):
with mp.Manager() as manager:
shared_arr = manager.list()
p1 = mp.Process(target=manager1, args=(shared_arr, key, size))
p2 = mp.Process(target=manager2, args=(shared_arr, key))
p1.start()
p2.start()
p1.join()
p2.join()
def raw1(*args):
p = args[0]
flag = args[1]
key = args[2]
size = args[3]
d = [i for i in range(size)]
t0 = time.time()
d = np.asarray(d)
print("[{}]cast numpy time: {}s".format(key, time.time()-t0))
print("[{}]send start".format(key))
t0 = time.time()
np.asarray(p)[:] = d
flag.value = True
print("[{}]send end: {}s".format(key, time.time()-t0))
def raw2(*args):
p = args[0]
flag = args[1]
key = args[2]
while True:
if not flag.value: #Warten Sie, bis sich die Daten ändern
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = np.array(p, dtype=np.uint8, copy=True)
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_raw(key, size=10000*10000):
byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=raw1, args=(byte_buf, flag, key, size))
p2 = mp.Process(target=raw2, args=(byte_buf, flag, key))
p1.start()
p2.start()
p1.join()
p2.join()
def file1(*args):
tmpdir = args[0]
flag = args[1]
key = args[2]
size = args[3]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
pickle.dump(d, f)
flag.value = True
print("[{}]send end: {}s".format(key, time.time()-t0))
def file2(*args):
tmpdir = args[0]
flag = args[1]
key = args[2]
while True:
if not flag.value: #Warten Sie, bis sich die Daten ändern
continue
print("[{}]recv start".format(key))
t0 = time.time()
with open(os.path.join(tmpdir, 'testfile'), 'rb') as f:
d = pickle.load(f)
print("[{}]recv end: {}s".format(key, time.time()-t0))
assert d[5]==5
break
def main_file(key, size=10000*10000):
with tempfile.TemporaryDirectory() as tmpdir:
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=file1, args=(tmpdir, flag, key, size))
p2 = mp.Process(target=file2, args=(tmpdir, flag, key))
p1.start()
p2.start()
p1.join()
p2.join()
def socket1(*args):
flag = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
t0 = time.time()
d = pickle.dumps(d)
print("[{}]pickle pack time: {}s".format(key, time.time()-t0))
while True:
if not flag.value:
continue
print("[{}]send start".format(key))
t0 = time.time()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', 50007))
s.sendall(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
break
def socket2(*args):
flag = args[0]
key = args[1]
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 50007))
s.listen(1)
flag.value = True
while True:
print("[{}]server wait start".format(key))
conn, addr = s.accept()
print("[{}]recv start".format(key))
t0 = time.time()
d = b""
with conn:
while True:
#Empfange Daten
data = conn.recv(1024*1024*1024)
if not data:
break
d += data
print("[{}]recv end: {}s".format(key, time.time()-t0))
t0 = time.time()
d = pickle.loads(d)
print("[{}]pickle unpack time: {}s".format(key, time.time()-t0))
assert d[5]==5
break
def main_socket(key, size=10000*10000):
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=socket1, args=(flag, key, size))
p2 = mp.Process(target=socket2, args=(flag, key,))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
main_pipe("pipe10", 10)
main_pipe("pipe10^5", 100000)
main_pipe("pipe10^6", 1000000)
main_pipe("pipe10^7", 10000000)
main_pipe("pipe10^8", 100000000)
main_queue("queue")
main_pipe("pipe")
main_array("array")
main_manager("manager")
main_raw("raw")
main_file("file")
main_socket("socket")
Recommended Posts