I measured various methods of interprocess communication in multiprocessing of python3

If you implemented interprocess communication in python's multiprocess, It took a tremendous amount of time to exchange a large amount of data.

So, I measured each method. The version of python is 3.7.

Result Summary

The results are as follows.

send time recv time Data constraints Separate write management Remarks
Queue 0.00s 5.33s Things that can be pickled Unnecessary
Pipe 3.12s 5.33s Things that can be pickled Unnecessary
shared memory(Array) 3.02s 2.55s One-dimensional array necessary Data is constrained
Manager 10.19s 10.29s Arrays, dictionaries, etc. Unnecessary
RawArray 5.61s(Including numpy) 0.18s One-dimensional array necessary Fast but complicated code
File 3.86s 5.26s Things that can be pickled necessary Via file
socket 4.13s(Including pickle) 5.34s(Including pickle) Things that can be pickled necessary The code is complicated.

Difference in speed depending on the number of data

First, let's take Pipe as an example to see how much the speed differs depending on the number of data.

As an evaluation method, data transfer is performed only once, and the size of the array at the time of transfer is changed. The p1 process is the one that sends the data and only sends it once. The p2 process is a process that keeps waiting for data and terminates when it receives it.

code

import multiprocessing as mp
import time

def pipe1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p.send(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))


def pipe2(*args):
    p = args[0]
    key = args[1]
    
    while True:
        if not p.poll():
            continue

        print("[{}]recv start".format(key))
        t0 = time.time()
        p.recv()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break


def main_pipe(key, size=100000000):
    reciver, sender = mp.Pipe(duplex=False)
    p1 = mp.Process(target=pipe1, args=(sender, key, size))
    p2 = mp.Process(target=pipe2, args=(reciver, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    main_pipe("pipe10", 10)
    main_pipe("pipe10^5", 100000)
    main_pipe("pipe10^6", 1000000)
    main_pipe("pipe10^7", 10000000)
    main_pipe("pipe10^8", 100000000)

An assert statement is also included to check if the data has been received.

Execution result

Transmission time Reception time
pipe10 0.0s 0.0s
pipe10^5 0.0109s 0.0039s
pipe10^6 0.0324s 0.0708s
pipe10^7 0.3240s 0.6425s
pipe10^8 3.2228s 6.4095s

Comparison by each data transmission method

The code is an excerpt of only how to send and receive each data. The data size is fixed at 10 ^ 8.

See the end of the article for the full code.

Reference: Complete understanding of Python threading and multiprocessing

Queue

import multiprocessing as mp

#Create(Excerpt)
q = mp.Queue()

#Send(Excerpt)
p.put(d)

#Receive(Excerpt)
if not p.empty():
    d = p.get()

Execution result

[queue]send start
[queue]send end: 0.0s
[queue]recv start
[queue]recv end: 5.337700128555298s

Pipe Pipe has a unidirectional implementation.

import multiprocessing as mp

#Create(Excerpt)
reciver, sender = mp.Pipe(duplex=False)

#Send(Excerpt)
sender.send(d)

#Receive(Excerpt)
if p.poll():
    reciver = p.recv()

Execution result

[pipe]send start
[pipe]recv start
[pipe]send end: 3.121206045150757s
[pipe]recv end: 5.337015151977539s

Shared memory (Array)

Since there is nothing in the shared memory itself to determine if the write is complete A separate Value (shared memory) variable is added.

import multiprocessing as mp
import ctypes

#Create(Excerpt)
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)

#Send(Excerpt)
p[:] = d
flag.value = True

#Receive(Excerpt)
if flag.value:
    d = p[:]

Execution result

[array]send start
[array]send end: 3.0218513011932373s
[array]recv start
[array]recv end: 2.5539581775665283s

Manager

import multiprocessing as mp

#Create(Excerpt)
with mp.Manager() as manager:
    shared_arr = manager.list()
    
#Send(Excerpt)
shared_arr.append(d)

#Receive(Excerpt)
if len(p) > 0:
    d = p.pop()

Execution result

[manager]send start
[manager]send end: 10.194092750549316s
[manager]recv start
[manager]recv end: 10.295690059661865s

RawArray

I refer to the following.

Same as shared memory (Array), a flag variable that completes transmission is prepared separately. In addition, the time required for numpy conversion is measured separately.

import multiprocessing.sharedctypes
import ctypes
import numpy as np

#Create(Excerpt)
byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
flag = mp.Value(ctypes.c_bool, False)

#Send(Excerpt)
d = np.asarray(d)
np.asarray(p)[:] = d
flag.value = True

#Receive(Excerpt)
if flag.value:
    d = np.array(p, dtype=np.uint8, copy=True)

Execution result

[raw]cast numpy time: 5.4573814868927s
[raw]send start
[raw]send end: 0.15658187866210938s
[raw]recv start
[raw]recv end: 0.18018245697021484s

File

When changing direction and exchanging via a file instead of memory. Also, as with shared memory, the end of writing is managed by the flag variable.

import multiprocessing as mp
import pickle
import tempfile
import os

#Create(Excerpt)
with tempfile.TemporaryDirectory() as tmpdir:
    flag = mp.Value(ctypes.c_bool, False)

#Send(Excerpt)
with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
    pickle.dump(d, f)
flag.value = True

#Receive(Excerpt)
if flag.value:
    with open(os.path.join(p, 'testfile'), 'r+b') as f:
        d = pickle.load(f)

Execution result

[file]send start
[file]send end: 3.8698363304138184s
[file]recv start
[file]recv end: 5.267671585083008s

socket

It is an exchange by socket communication with a different direction. The flag variable is used to confirm the server startup. In addition, pickle conversion is measured separately.

import multiprocessing as mp
import socket
import pickle

#Create(Excerpt)
flag = mp.Value(ctypes.c_bool, False)

#Send (client)(Excerpt)
d = pickle.dumps(d)
if flag.value:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect(('127.0.0.1', 50007))
        s.sendall(d)

#Receive (server)(Excerpt)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind(('127.0.0.1', 50007))
    s.listen(1)
    flag.value = True
    while True:
        conn, addr = s.accept()
        d = b""
        with conn:
            while True:
                #Receive data
                data = conn.recv(1024*1024*1024)
                if not data:
                    break
                d += data
        d = pickle.loads(d)

Execution result

[socket]server wait start
[socket]pickle pack time: 3.798427104949951s
[socket]send start
[socket]recv start
[socket]send end: 0.3363354206085205s
[socket]recv end: 0.5375902652740479s
[socket]pickle unpack time: 4.91701340675354s

Whole code

import multiprocessing as mp
import multiprocessing.sharedctypes
import time
import ctypes
import numpy as np
import tempfile
import pickle
import os
import socket


def queue1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p.put(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))

def queue2(*args):
    p = args[0]
    key = args[1]
    
    while True:
        if p.empty():
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p.get()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_queue(key, size=10000*10000):
    q = mp.Queue()
    p1 = mp.Process(target=queue1, args=(q, key, size))
    p2 = mp.Process(target=queue2, args=(q, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()




def pipe1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p.send(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))

def pipe2(*args):
    p = args[0]
    key = args[1]
    
    while True:
        if not p.poll():
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p.recv()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_pipe(key, size=10000*10000):
    reciver, sender = mp.Pipe(duplex=False)
    p1 = mp.Process(target=pipe1, args=(sender, key, size))
    p2 = mp.Process(target=pipe2, args=(reciver, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



def array1(*args):
    p = args[0]
    flag = args[1]
    key = args[2]
    size = args[3]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p[:] = d
    flag.value = True
    print("[{}]send end: {}s".format(key, time.time()-t0))

def array2(*args):
    p = args[0]
    flag = args[1]
    key = args[2]

    while True:
        if not flag.value:  #Wait until the data changes
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p[:]
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_array(key, size=10000*10000):
    arr = mp.Array(ctypes.c_int, [0]*size )
    flag = mp.Value(ctypes.c_bool, False)
    p1 = mp.Process(target=array1, args=(arr, flag, key, size))
    p2 = mp.Process(target=array2, args=(arr, flag, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



def manager1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]
    
    print("[{}]send start".format(key))
    t0 = time.time()
    p.append(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))

def manager2(*args):
    p = args[0]
    key = args[1]

    while True:
        if len(p) == 0:
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p.pop()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_manager(key, size=10000*10000):
    with mp.Manager() as manager:
        shared_arr = manager.list()
        p1 = mp.Process(target=manager1, args=(shared_arr, key, size))
        p2 = mp.Process(target=manager2, args=(shared_arr, key))
        p1.start()
        p2.start()
        p1.join()
        p2.join()



def raw1(*args):
    p = args[0]
    flag = args[1]
    key = args[2]
    size = args[3]

    d = [i for i in range(size)]
    
    t0 = time.time()
    d = np.asarray(d)
    print("[{}]cast numpy time: {}s".format(key, time.time()-t0))

    print("[{}]send start".format(key))
    t0 = time.time()
    np.asarray(p)[:] = d
    flag.value = True
    print("[{}]send end: {}s".format(key, time.time()-t0))

def raw2(*args):
    p = args[0]
    flag = args[1]
    key = args[2]

    while True:
        if not flag.value:  #Wait until the data changes
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = np.array(p, dtype=np.uint8, copy=True)
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_raw(key, size=10000*10000):
    byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
    flag = mp.Value(ctypes.c_bool, False)
    p1 = mp.Process(target=raw1, args=(byte_buf, flag, key, size))
    p2 = mp.Process(target=raw2, args=(byte_buf, flag, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



def file1(*args):
    tmpdir = args[0]
    flag = args[1]
    key = args[2]
    size = args[3]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
        pickle.dump(d, f)
    flag.value = True
    print("[{}]send end: {}s".format(key, time.time()-t0))

def file2(*args):
    tmpdir = args[0]
    flag = args[1]
    key = args[2]

    while True:
        if not flag.value:  #Wait until the data changes
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        with open(os.path.join(tmpdir, 'testfile'), 'rb') as f:
            d = pickle.load(f)
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        assert d[5]==5
        break

def main_file(key, size=10000*10000):
    with tempfile.TemporaryDirectory() as tmpdir:
        flag = mp.Value(ctypes.c_bool, False)
        p1 = mp.Process(target=file1, args=(tmpdir, flag, key, size))
        p2 = mp.Process(target=file2, args=(tmpdir, flag, key))
        p1.start()
        p2.start()
        p1.join()
        p2.join()



def socket1(*args):
    flag = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    t0 = time.time()
    d = pickle.dumps(d)
    print("[{}]pickle pack time: {}s".format(key, time.time()-t0))

    while True:
        if not flag.value:
            continue
        print("[{}]send start".format(key))
        t0 = time.time()
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.connect(('127.0.0.1', 50007))
            s.sendall(d)
        print("[{}]send end: {}s".format(key, time.time()-t0))
        break

def socket2(*args):
    flag = args[0]
    key = args[1]

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(('127.0.0.1', 50007))
        s.listen(1)
        flag.value = True
        while True:
            print("[{}]server wait start".format(key))
            conn, addr = s.accept()
            print("[{}]recv start".format(key))
            t0 = time.time()

            d = b""
            with conn:
                while True:
                    #Receive data
                    data = conn.recv(1024*1024*1024)
                    if not data:
                        break
                    d += data
            print("[{}]recv end: {}s".format(key, time.time()-t0))

            t0 = time.time()
            d = pickle.loads(d)
            print("[{}]pickle unpack time: {}s".format(key, time.time()-t0))

            assert d[5]==5
            break

def main_socket(key, size=10000*10000):
    flag = mp.Value(ctypes.c_bool, False)
    p1 = mp.Process(target=socket1, args=(flag, key, size))
    p2 = mp.Process(target=socket2, args=(flag, key,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



if __name__ == '__main__':
    main_pipe("pipe10", 10)
    main_pipe("pipe10^5", 100000)
    main_pipe("pipe10^6", 1000000)
    main_pipe("pipe10^7", 10000000)
    main_pipe("pipe10^8", 100000000)

    main_queue("queue")
    main_pipe("pipe")
    main_array("array")
    main_manager("manager")
    main_raw("raw")
    main_file("file")
    main_socket("socket")

Recommended Posts

I measured various methods of interprocess communication in multiprocessing of python3
Summary of various for statements in Python
Summary of built-in methods in Python list
Types of interprocess communication
Various processing of Python
Since memory_profiler of python is heavy, I measured it
Old openssl causes problems in various parts of python
I wrote python in Japanese
I tried various methods to send Japanese mail with Python
String object methods in Python
About various encodings of Python 3
Equivalence of objects in Python
Dynamically call methods in Python
I understand Python in Japanese!
Implementation of quicksort in Python
What I learned in Python
I tried to implement blackjack of card game in Python
I implemented N-Queen in various languages and measured the speed
I compared the calculation time of the moving average written in Python
Write various forms of phylogenetic tree in Python using ETE Toolkit
I wrote the code to write the code of Brainf * ck in python
Various ways to create an array of numbers from 1 to 10 in Python.
[Sentence classification] I tried various pooling methods of Convolutional Neural Networks
Division of timedelta in Python 2.7 series
MySQL-automatic escape of parameters in python
Handling of JSON files in Python
Implementation of life game in Python
Summary of various operations in Tensorflow
I wrote Fizz Buzz in Python
Get, post communication memo in Python
I learned about processes in Python
I can't install scikit-learn in Python
I wrote the queue in Python
Dynamically define functions (methods) in Python
Law of large numbers in python
Implementation of original sorting in Python
Reversible scrambling of integers in Python
I tried Line notification in Python
I tried SMTP communication with Python
I wrote the stack in Python
How many types of Python do you have in Windows 10? I had 5 types.
I want to batch convert the result of "string" .split () in Python
I want to explain the abstract class (ABCmeta) of Python in detail.
I want to color a part of an Excel string in Python
I tried various patterns of date strings to be entered in pandas.to_datetime
I tried putting various versions of Python + OpenCV + FFmpeg environment on Mac
Features of regular expression modules that I often use personally in Python
I made a program to check the size of a file in Python
I measured the speed of list comprehension, for and while with python2.7.
I tried to implement a card game of playing cards in Python
I tried touching touch related methods in the scene module of pythonista
I put Python 2.7 in Sakura VPS 1GB.
I tried to implement PLSA in Python
Conversion of string <-> date (date, datetime) in Python
Check the behavior of destructor in Python
I tried to implement permutation in Python
Summary of methods often used in pandas
(Bad) practice of using this in Python
General Theory of Relativity in Python: Introduction
I made a payroll program in Python!
Output tree structure of files in Python