Ich habe verschiedene Methoden der Kommunikation zwischen Prozessen bei der Mehrfachverarbeitung von Python3 gemessen

Wenn Sie die Interprozesskommunikation in Pythons Multiprozess implementiert haben Der Austausch einer großen Datenmenge dauerte enorm lange.

Also habe ich jede Methode gemessen. Die Version von Python ist 3.7.

Ergebniszusammenfassung

Die Ergebnisse sind wie folgt.

send time recv time Datenbeschränkungen Separate Schreibverwaltung Bemerkungen
Queue 0.00s 5.33s Dinge, die eingelegt werden können Nicht notwendig
Pipe 3.12s 5.33s Dinge, die eingelegt werden können Nicht notwendig
geteilte Erinnerung(Array) 3.02s 2.55s 1-dimensionales Array notwendig Daten sind eingeschränkt
Manager 10.19s 10.29s Arrays, Wörterbücher usw. Nicht notwendig
RawArray 5.61s(Einschließlich numpy) 0.18s 1-dimensionales Array notwendig Schneller aber komplizierter Code
File 3.86s 5.26s Dinge, die eingelegt werden können notwendig Über Datei
socket 4.13s(Einschließlich Gurke) 5.34s(Einschließlich Gurke) Dinge, die eingelegt werden können notwendig Der Code ist kompliziert.

Geschwindigkeitsunterschied abhängig von der Anzahl der Daten

Nehmen wir zunächst Pipe als Beispiel, um zu sehen, wie stark sich die Geschwindigkeit in Abhängigkeit von der Anzahl der Daten unterscheidet.

Als Auswertungsmethode wird die Datenübertragung nur einmal durchgeführt und die Größe des Arrays zum Zeitpunkt der Übertragung wird geändert. Der p1-Prozess sendet die Daten und sendet sie nur einmal. Der p2-Prozess ist ein Prozess, der auf Daten wartet und beendet wird, wenn sie empfangen werden.

Code

import multiprocessing as mp
import time

def pipe1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p.send(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))


def pipe2(*args):
    p = args[0]
    key = args[1]
    
    while True:
        if not p.poll():
            continue

        print("[{}]recv start".format(key))
        t0 = time.time()
        p.recv()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break


def main_pipe(key, size=100000000):
    reciver, sender = mp.Pipe(duplex=False)
    p1 = mp.Process(target=pipe1, args=(sender, key, size))
    p2 = mp.Process(target=pipe2, args=(reciver, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    main_pipe("pipe10", 10)
    main_pipe("pipe10^5", 100000)
    main_pipe("pipe10^6", 1000000)
    main_pipe("pipe10^7", 10000000)
    main_pipe("pipe10^8", 100000000)

Eine Assert-Anweisung ist ebenfalls enthalten, um zu überprüfen, ob die Daten empfangen wurden.

Ausführungsergebnis

Übertragungszeit Empfangszeit
pipe10 0.0s 0.0s
pipe10^5 0.0109s 0.0039s
pipe10^6 0.0324s 0.0708s
pipe10^7 0.3240s 0.6425s
pipe10^8 3.2228s 6.4095s

Vergleich nach jeder Datenübertragungsmethode

Der Code ist nur ein Auszug aus dem Senden und Empfangen der einzelnen Daten. Die Datengröße ist auf 10 ^ 8 festgelegt.

Den vollständigen Code finden Sie am Ende des Artikels.

Referenz: Vollständiges Verständnis von Python-Threading und Multiprocessing

Queue

import multiprocessing as mp

#Erstellen(Auszug)
q = mp.Queue()

#Senden(Auszug)
p.put(d)

#Erhalten(Auszug)
if not p.empty():
    d = p.get()

Ausführungsergebnis

[queue]send start
[queue]send end: 0.0s
[queue]recv start
[queue]recv end: 5.337700128555298s

Pipe Pipe hat eine unidirektionale Implementierung.

import multiprocessing as mp

#Erstellen(Auszug)
reciver, sender = mp.Pipe(duplex=False)

#Senden(Auszug)
sender.send(d)

#Erhalten(Auszug)
if p.poll():
    reciver = p.recv()

Ausführungsergebnis

[pipe]send start
[pipe]recv start
[pipe]send end: 3.121206045150757s
[pipe]recv end: 5.337015151977539s

Shared Memory (Array)

Da sich im Shared Memory selbst nichts befindet, um festzustellen, ob der Schreibvorgang abgeschlossen ist Eine separate Wertvariable (Shared Memory) wird hinzugefügt.

import multiprocessing as mp
import ctypes

#Erstellen(Auszug)
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)

#Senden(Auszug)
p[:] = d
flag.value = True

#Erhalten(Auszug)
if flag.value:
    d = p[:]

Ausführungsergebnis

[array]send start
[array]send end: 3.0218513011932373s
[array]recv start
[array]recv end: 2.5539581775665283s

Manager

import multiprocessing as mp

#Erstellen(Auszug)
with mp.Manager() as manager:
    shared_arr = manager.list()
    
#Senden(Auszug)
shared_arr.append(d)

#Erhalten(Auszug)
if len(p) > 0:
    d = p.pop()

Ausführungsergebnis

[manager]send start
[manager]send end: 10.194092750549316s
[manager]recv start
[manager]recv end: 10.295690059661865s

RawArray

Ich beziehe mich auf Folgendes.

Wie beim Shared Memory (Array) wird eine Flag-Variable für den Abschluss der Übertragung separat vorbereitet. Zusätzlich wird die für die Numpy-Konvertierung erforderliche Zeit separat gemessen.

import multiprocessing.sharedctypes
import ctypes
import numpy as np

#Erstellen(Auszug)
byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
flag = mp.Value(ctypes.c_bool, False)

#Senden(Auszug)
d = np.asarray(d)
np.asarray(p)[:] = d
flag.value = True

#Erhalten(Auszug)
if flag.value:
    d = np.array(p, dtype=np.uint8, copy=True)

Ausführungsergebnis

[raw]cast numpy time: 5.4573814868927s
[raw]send start
[raw]send end: 0.15658187866210938s
[raw]recv start
[raw]recv end: 0.18018245697021484s

File

Beim Richtungswechsel und beim Austausch über Dateien statt über Speicher. Wie beim gemeinsamen Speicher wird auch das Ende des Schreibens von der Flag-Variablen verwaltet.

import multiprocessing as mp
import pickle
import tempfile
import os

#Erstellen(Auszug)
with tempfile.TemporaryDirectory() as tmpdir:
    flag = mp.Value(ctypes.c_bool, False)

#Senden(Auszug)
with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
    pickle.dump(d, f)
flag.value = True

#Erhalten(Auszug)
if flag.value:
    with open(os.path.join(p, 'testfile'), 'r+b') as f:
        d = pickle.load(f)

Ausführungsergebnis

[file]send start
[file]send end: 3.8698363304138184s
[file]recv start
[file]recv end: 5.267671585083008s

socket

Es wird durch Socket-Kommunikation ausgetauscht, indem die Richtung weiter geändert wird. Die Flag-Variable wird verwendet, um den Serverstart zu bestätigen. Zusätzlich wird die Gurkenumwandlung separat gemessen.

import multiprocessing as mp
import socket
import pickle

#Erstellen(Auszug)
flag = mp.Value(ctypes.c_bool, False)

#Senden (Client)(Auszug)
d = pickle.dumps(d)
if flag.value:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect(('127.0.0.1', 50007))
        s.sendall(d)

#Empfangen (Server)(Auszug)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind(('127.0.0.1', 50007))
    s.listen(1)
    flag.value = True
    while True:
        conn, addr = s.accept()
        d = b""
        with conn:
            while True:
                #Empfange Daten
                data = conn.recv(1024*1024*1024)
                if not data:
                    break
                d += data
        d = pickle.loads(d)

Ausführungsergebnis

[socket]server wait start
[socket]pickle pack time: 3.798427104949951s
[socket]send start
[socket]recv start
[socket]send end: 0.3363354206085205s
[socket]recv end: 0.5375902652740479s
[socket]pickle unpack time: 4.91701340675354s

Ganzer Code

import multiprocessing as mp
import multiprocessing.sharedctypes
import time
import ctypes
import numpy as np
import tempfile
import pickle
import os
import socket


def queue1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p.put(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))

def queue2(*args):
    p = args[0]
    key = args[1]
    
    while True:
        if p.empty():
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p.get()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_queue(key, size=10000*10000):
    q = mp.Queue()
    p1 = mp.Process(target=queue1, args=(q, key, size))
    p2 = mp.Process(target=queue2, args=(q, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()




def pipe1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p.send(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))

def pipe2(*args):
    p = args[0]
    key = args[1]
    
    while True:
        if not p.poll():
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p.recv()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_pipe(key, size=10000*10000):
    reciver, sender = mp.Pipe(duplex=False)
    p1 = mp.Process(target=pipe1, args=(sender, key, size))
    p2 = mp.Process(target=pipe2, args=(reciver, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



def array1(*args):
    p = args[0]
    flag = args[1]
    key = args[2]
    size = args[3]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p[:] = d
    flag.value = True
    print("[{}]send end: {}s".format(key, time.time()-t0))

def array2(*args):
    p = args[0]
    flag = args[1]
    key = args[2]

    while True:
        if not flag.value:  #Warten Sie, bis sich die Daten ändern
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p[:]
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_array(key, size=10000*10000):
    arr = mp.Array(ctypes.c_int, [0]*size )
    flag = mp.Value(ctypes.c_bool, False)
    p1 = mp.Process(target=array1, args=(arr, flag, key, size))
    p2 = mp.Process(target=array2, args=(arr, flag, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



def manager1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]
    
    print("[{}]send start".format(key))
    t0 = time.time()
    p.append(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))

def manager2(*args):
    p = args[0]
    key = args[1]

    while True:
        if len(p) == 0:
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p.pop()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_manager(key, size=10000*10000):
    with mp.Manager() as manager:
        shared_arr = manager.list()
        p1 = mp.Process(target=manager1, args=(shared_arr, key, size))
        p2 = mp.Process(target=manager2, args=(shared_arr, key))
        p1.start()
        p2.start()
        p1.join()
        p2.join()



def raw1(*args):
    p = args[0]
    flag = args[1]
    key = args[2]
    size = args[3]

    d = [i for i in range(size)]
    
    t0 = time.time()
    d = np.asarray(d)
    print("[{}]cast numpy time: {}s".format(key, time.time()-t0))

    print("[{}]send start".format(key))
    t0 = time.time()
    np.asarray(p)[:] = d
    flag.value = True
    print("[{}]send end: {}s".format(key, time.time()-t0))

def raw2(*args):
    p = args[0]
    flag = args[1]
    key = args[2]

    while True:
        if not flag.value:  #Warten Sie, bis sich die Daten ändern
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = np.array(p, dtype=np.uint8, copy=True)
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_raw(key, size=10000*10000):
    byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
    flag = mp.Value(ctypes.c_bool, False)
    p1 = mp.Process(target=raw1, args=(byte_buf, flag, key, size))
    p2 = mp.Process(target=raw2, args=(byte_buf, flag, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



def file1(*args):
    tmpdir = args[0]
    flag = args[1]
    key = args[2]
    size = args[3]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
        pickle.dump(d, f)
    flag.value = True
    print("[{}]send end: {}s".format(key, time.time()-t0))

def file2(*args):
    tmpdir = args[0]
    flag = args[1]
    key = args[2]

    while True:
        if not flag.value:  #Warten Sie, bis sich die Daten ändern
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        with open(os.path.join(tmpdir, 'testfile'), 'rb') as f:
            d = pickle.load(f)
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        assert d[5]==5
        break

def main_file(key, size=10000*10000):
    with tempfile.TemporaryDirectory() as tmpdir:
        flag = mp.Value(ctypes.c_bool, False)
        p1 = mp.Process(target=file1, args=(tmpdir, flag, key, size))
        p2 = mp.Process(target=file2, args=(tmpdir, flag, key))
        p1.start()
        p2.start()
        p1.join()
        p2.join()



def socket1(*args):
    flag = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    t0 = time.time()
    d = pickle.dumps(d)
    print("[{}]pickle pack time: {}s".format(key, time.time()-t0))

    while True:
        if not flag.value:
            continue
        print("[{}]send start".format(key))
        t0 = time.time()
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.connect(('127.0.0.1', 50007))
            s.sendall(d)
        print("[{}]send end: {}s".format(key, time.time()-t0))
        break

def socket2(*args):
    flag = args[0]
    key = args[1]

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(('127.0.0.1', 50007))
        s.listen(1)
        flag.value = True
        while True:
            print("[{}]server wait start".format(key))
            conn, addr = s.accept()
            print("[{}]recv start".format(key))
            t0 = time.time()

            d = b""
            with conn:
                while True:
                    #Empfange Daten
                    data = conn.recv(1024*1024*1024)
                    if not data:
                        break
                    d += data
            print("[{}]recv end: {}s".format(key, time.time()-t0))

            t0 = time.time()
            d = pickle.loads(d)
            print("[{}]pickle unpack time: {}s".format(key, time.time()-t0))

            assert d[5]==5
            break

def main_socket(key, size=10000*10000):
    flag = mp.Value(ctypes.c_bool, False)
    p1 = mp.Process(target=socket1, args=(flag, key, size))
    p2 = mp.Process(target=socket2, args=(flag, key,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



if __name__ == '__main__':
    main_pipe("pipe10", 10)
    main_pipe("pipe10^5", 100000)
    main_pipe("pipe10^6", 1000000)
    main_pipe("pipe10^7", 10000000)
    main_pipe("pipe10^8", 100000000)

    main_queue("queue")
    main_pipe("pipe")
    main_array("array")
    main_manager("manager")
    main_raw("raw")
    main_file("file")
    main_socket("socket")

Recommended Posts

Ich habe verschiedene Methoden der Kommunikation zwischen Prozessen bei der Mehrfachverarbeitung von Python3 gemessen
Zusammenfassung verschiedener for-Anweisungen in Python
Zusammenfassung der integrierten Methoden usw. der Python-Liste
Arten der Kommunikation zwischen Prozessen
Verschiedene Verarbeitung von Python
Da der memory_profiler von Python schwer ist, habe ich ihn gemessen
Old openssl verursacht Probleme in verschiedenen Teilen von Python
Ich habe Python auf Japanisch geschrieben
Ich habe verschiedene Methoden ausprobiert, um japanische Post mit Python zu senden
String-Objektmethoden in Python
Über verschiedene Codierungen von Python 3
Objektäquivalenzbeurteilung in Python
Rufen Sie Methoden in Python dynamisch auf
Ich verstehe Python auf Japanisch!
Implementierung der schnellen Sortierung in Python
Was ich in Python gelernt habe
Ich habe versucht, das Blackjack of Trump-Spiel mit Python zu implementieren
Ich habe N-Queen in verschiedenen Sprachen implementiert und die Geschwindigkeit gemessen
Ich habe die Berechnungszeit des in Python geschriebenen gleitenden Durchschnitts verglichen
Schreiben Sie mit dem ETE Toolkit verschiedene Formen phylogenetischer Bäume in Python
Ich habe den Code geschrieben, um den Brainf * ck-Code in Python zu schreiben
Verschiedene Möglichkeiten, um in Python ein Array von Zahlen von 1 bis 10 zu erstellen.
[Satzklassifikation] Ich habe verschiedene Pooling-Methoden von Convolutional Neural Networks ausprobiert
Zeitdelta in Python 2.7-Serie teilen
MySQL-automatische Escape-Funktion von Parametern in Python
Umgang mit JSON-Dateien in Python
Implementierung eines Lebensspiels in Python
Zusammenfassung verschiedener Operationen in Tensorflow
Ich habe Fizz Buzz in Python geschrieben
Holen Sie sich ein Kommunikationsmemo in Python
Ich habe versucht, den Prozess mit Python zu studieren
Scikit-learn kann nicht in Python installiert werden
Ich habe die Warteschlange in Python geschrieben
Definieren Sie Funktionen (Methoden) in Python dynamisch
Das Gesetz der Zahlen in Python
Implementierung der ursprünglichen Sortierung in Python
Reversibles Verwürfeln von Ganzzahlen in Python
Ich habe Line Benachrichtigung in Python versucht
Ich habe die SMTP-Kommunikation mit Python versucht
Ich habe den Stack in Python geschrieben
Wie viele Arten von Python haben Sie in Ihrem Windows 10? Ich hatte 5 Typen.
Ich möchte das Ergebnis von "Zeichenfolge" .split () in Python stapelweise konvertieren
Ich möchte die abstrakte Klasse (ABCmeta) von Python im Detail erklären
Ich möchte einen Teil der Excel-Zeichenfolge mit Python einfärben
Ich habe versucht, verschiedene Muster von Datumszeichenfolgen in pandas.to_datetime einzugeben
Ich habe verschiedene Versionen der Python + OpenCV + FFmpeg-Umgebung auf dem Mac ausprobiert
Funktionen von Modulen für reguläre Ausdrücke, die in Python häufig persönlich verwendet werden
Ich habe ein Programm erstellt, um die Größe einer Datei mit Python zu überprüfen
Ich habe die Geschwindigkeit der Listeneinschlussnotation für und während mit Python2.7 gemessen.
Ich habe versucht, Trumps Kartenspiel in Python zu implementieren
Ich habe versucht, berührungsbezogene Methoden im Szenenmodul von Pythonista zu berühren
Ich habe Python 2.7 in Sakura VPS 1 GB installiert.
Ich habe versucht, PLSA in Python zu implementieren
Konvertierung der Zeichenfolge <-> Datum (Datum, Datum / Uhrzeit) in Python
Überprüfen Sie das Verhalten des Zerstörers in Python
Ich habe versucht, Permutation in Python zu implementieren
Zusammenfassung der häufig verwendeten Methoden bei Pandas
Übung, dies in Python zu verwenden (schlecht)
Allgemeine Relativitätstheorie in Python: Einführung
Ich habe ein Pay-Management-Programm in Python erstellt!
Ausgabebaumstruktur von Dateien in Python