If you implemented interprocess communication in python's multiprocess, It took a tremendous amount of time to exchange a large amount of data.
So, I measured each method. The version of python is 3.7.
The results are as follows.
send time | recv time | Data constraints | Separate write management | Remarks | |
---|---|---|---|---|---|
Queue | 0.00s | 5.33s | Things that can be pickled | Unnecessary | |
Pipe | 3.12s | 5.33s | Things that can be pickled | Unnecessary | |
shared memory(Array) | 3.02s | 2.55s | One-dimensional array | necessary | Data is constrained |
Manager | 10.19s | 10.29s | Arrays, dictionaries, etc. | Unnecessary | |
RawArray | 5.61s(Including numpy) | 0.18s | One-dimensional array | necessary | Fast but complicated code |
File | 3.86s | 5.26s | Things that can be pickled | necessary | Via file |
socket | 4.13s(Including pickle) | 5.34s(Including pickle) | Things that can be pickled | necessary | The code is complicated. |
First, let's take Pipe as an example to see how much the speed differs depending on the number of data.
As an evaluation method, data transfer is performed only once, and the size of the array at the time of transfer is changed. The p1 process is the one that sends the data and only sends it once. The p2 process is a process that keeps waiting for data and terminates when it receives it.
import multiprocessing as mp
import time
def pipe1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.send(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def pipe2(*args):
p = args[0]
key = args[1]
while True:
if not p.poll():
continue
print("[{}]recv start".format(key))
t0 = time.time()
p.recv()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_pipe(key, size=100000000):
reciver, sender = mp.Pipe(duplex=False)
p1 = mp.Process(target=pipe1, args=(sender, key, size))
p2 = mp.Process(target=pipe2, args=(reciver, key))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
main_pipe("pipe10", 10)
main_pipe("pipe10^5", 100000)
main_pipe("pipe10^6", 1000000)
main_pipe("pipe10^7", 10000000)
main_pipe("pipe10^8", 100000000)
An assert statement is also included to check if the data has been received.
Transmission time | Reception time | |
---|---|---|
pipe10 | 0.0s | 0.0s |
pipe10^5 | 0.0109s | 0.0039s |
pipe10^6 | 0.0324s | 0.0708s |
pipe10^7 | 0.3240s | 0.6425s |
pipe10^8 | 3.2228s | 6.4095s |
The code is an excerpt of only how to send and receive each data. The data size is fixed at 10 ^ 8.
See the end of the article for the full code.
Reference: Complete understanding of Python threading and multiprocessing
Queue
import multiprocessing as mp
#Create(Excerpt)
q = mp.Queue()
#Send(Excerpt)
p.put(d)
#Receive(Excerpt)
if not p.empty():
d = p.get()
Execution result
[queue]send start
[queue]send end: 0.0s
[queue]recv start
[queue]recv end: 5.337700128555298s
Pipe Pipe has a unidirectional implementation.
import multiprocessing as mp
#Create(Excerpt)
reciver, sender = mp.Pipe(duplex=False)
#Send(Excerpt)
sender.send(d)
#Receive(Excerpt)
if p.poll():
reciver = p.recv()
Execution result
[pipe]send start
[pipe]recv start
[pipe]send end: 3.121206045150757s
[pipe]recv end: 5.337015151977539s
Since there is nothing in the shared memory itself to determine if the write is complete A separate Value (shared memory) variable is added.
import multiprocessing as mp
import ctypes
#Create(Excerpt)
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)
#Send(Excerpt)
p[:] = d
flag.value = True
#Receive(Excerpt)
if flag.value:
d = p[:]
Execution result
[array]send start
[array]send end: 3.0218513011932373s
[array]recv start
[array]recv end: 2.5539581775665283s
Manager
import multiprocessing as mp
#Create(Excerpt)
with mp.Manager() as manager:
shared_arr = manager.list()
#Send(Excerpt)
shared_arr.append(d)
#Receive(Excerpt)
if len(p) > 0:
d = p.pop()
Execution result
[manager]send start
[manager]send end: 10.194092750549316s
[manager]recv start
[manager]recv end: 10.295690059661865s
RawArray
I refer to the following.
Same as shared memory (Array), a flag variable that completes transmission is prepared separately. In addition, the time required for numpy conversion is measured separately.
import multiprocessing.sharedctypes
import ctypes
import numpy as np
#Create(Excerpt)
byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
flag = mp.Value(ctypes.c_bool, False)
#Send(Excerpt)
d = np.asarray(d)
np.asarray(p)[:] = d
flag.value = True
#Receive(Excerpt)
if flag.value:
d = np.array(p, dtype=np.uint8, copy=True)
Execution result
[raw]cast numpy time: 5.4573814868927s
[raw]send start
[raw]send end: 0.15658187866210938s
[raw]recv start
[raw]recv end: 0.18018245697021484s
File
When changing direction and exchanging via a file instead of memory. Also, as with shared memory, the end of writing is managed by the flag variable.
import multiprocessing as mp
import pickle
import tempfile
import os
#Create(Excerpt)
with tempfile.TemporaryDirectory() as tmpdir:
flag = mp.Value(ctypes.c_bool, False)
#Send(Excerpt)
with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
pickle.dump(d, f)
flag.value = True
#Receive(Excerpt)
if flag.value:
with open(os.path.join(p, 'testfile'), 'r+b') as f:
d = pickle.load(f)
Execution result
[file]send start
[file]send end: 3.8698363304138184s
[file]recv start
[file]recv end: 5.267671585083008s
socket
It is an exchange by socket communication with a different direction. The flag variable is used to confirm the server startup. In addition, pickle conversion is measured separately.
import multiprocessing as mp
import socket
import pickle
#Create(Excerpt)
flag = mp.Value(ctypes.c_bool, False)
#Send (client)(Excerpt)
d = pickle.dumps(d)
if flag.value:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', 50007))
s.sendall(d)
#Receive (server)(Excerpt)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 50007))
s.listen(1)
flag.value = True
while True:
conn, addr = s.accept()
d = b""
with conn:
while True:
#Receive data
data = conn.recv(1024*1024*1024)
if not data:
break
d += data
d = pickle.loads(d)
Execution result
[socket]server wait start
[socket]pickle pack time: 3.798427104949951s
[socket]send start
[socket]recv start
[socket]send end: 0.3363354206085205s
[socket]recv end: 0.5375902652740479s
[socket]pickle unpack time: 4.91701340675354s
import multiprocessing as mp
import multiprocessing.sharedctypes
import time
import ctypes
import numpy as np
import tempfile
import pickle
import os
import socket
def queue1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.put(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def queue2(*args):
p = args[0]
key = args[1]
while True:
if p.empty():
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p.get()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_queue(key, size=10000*10000):
q = mp.Queue()
p1 = mp.Process(target=queue1, args=(q, key, size))
p2 = mp.Process(target=queue2, args=(q, key))
p1.start()
p2.start()
p1.join()
p2.join()
def pipe1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.send(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def pipe2(*args):
p = args[0]
key = args[1]
while True:
if not p.poll():
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p.recv()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_pipe(key, size=10000*10000):
reciver, sender = mp.Pipe(duplex=False)
p1 = mp.Process(target=pipe1, args=(sender, key, size))
p2 = mp.Process(target=pipe2, args=(reciver, key))
p1.start()
p2.start()
p1.join()
p2.join()
def array1(*args):
p = args[0]
flag = args[1]
key = args[2]
size = args[3]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p[:] = d
flag.value = True
print("[{}]send end: {}s".format(key, time.time()-t0))
def array2(*args):
p = args[0]
flag = args[1]
key = args[2]
while True:
if not flag.value: #Wait until the data changes
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p[:]
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_array(key, size=10000*10000):
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=array1, args=(arr, flag, key, size))
p2 = mp.Process(target=array2, args=(arr, flag, key))
p1.start()
p2.start()
p1.join()
p2.join()
def manager1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.append(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def manager2(*args):
p = args[0]
key = args[1]
while True:
if len(p) == 0:
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p.pop()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_manager(key, size=10000*10000):
with mp.Manager() as manager:
shared_arr = manager.list()
p1 = mp.Process(target=manager1, args=(shared_arr, key, size))
p2 = mp.Process(target=manager2, args=(shared_arr, key))
p1.start()
p2.start()
p1.join()
p2.join()
def raw1(*args):
p = args[0]
flag = args[1]
key = args[2]
size = args[3]
d = [i for i in range(size)]
t0 = time.time()
d = np.asarray(d)
print("[{}]cast numpy time: {}s".format(key, time.time()-t0))
print("[{}]send start".format(key))
t0 = time.time()
np.asarray(p)[:] = d
flag.value = True
print("[{}]send end: {}s".format(key, time.time()-t0))
def raw2(*args):
p = args[0]
flag = args[1]
key = args[2]
while True:
if not flag.value: #Wait until the data changes
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = np.array(p, dtype=np.uint8, copy=True)
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_raw(key, size=10000*10000):
byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=raw1, args=(byte_buf, flag, key, size))
p2 = mp.Process(target=raw2, args=(byte_buf, flag, key))
p1.start()
p2.start()
p1.join()
p2.join()
def file1(*args):
tmpdir = args[0]
flag = args[1]
key = args[2]
size = args[3]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
pickle.dump(d, f)
flag.value = True
print("[{}]send end: {}s".format(key, time.time()-t0))
def file2(*args):
tmpdir = args[0]
flag = args[1]
key = args[2]
while True:
if not flag.value: #Wait until the data changes
continue
print("[{}]recv start".format(key))
t0 = time.time()
with open(os.path.join(tmpdir, 'testfile'), 'rb') as f:
d = pickle.load(f)
print("[{}]recv end: {}s".format(key, time.time()-t0))
assert d[5]==5
break
def main_file(key, size=10000*10000):
with tempfile.TemporaryDirectory() as tmpdir:
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=file1, args=(tmpdir, flag, key, size))
p2 = mp.Process(target=file2, args=(tmpdir, flag, key))
p1.start()
p2.start()
p1.join()
p2.join()
def socket1(*args):
flag = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
t0 = time.time()
d = pickle.dumps(d)
print("[{}]pickle pack time: {}s".format(key, time.time()-t0))
while True:
if not flag.value:
continue
print("[{}]send start".format(key))
t0 = time.time()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', 50007))
s.sendall(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
break
def socket2(*args):
flag = args[0]
key = args[1]
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 50007))
s.listen(1)
flag.value = True
while True:
print("[{}]server wait start".format(key))
conn, addr = s.accept()
print("[{}]recv start".format(key))
t0 = time.time()
d = b""
with conn:
while True:
#Receive data
data = conn.recv(1024*1024*1024)
if not data:
break
d += data
print("[{}]recv end: {}s".format(key, time.time()-t0))
t0 = time.time()
d = pickle.loads(d)
print("[{}]pickle unpack time: {}s".format(key, time.time()-t0))
assert d[5]==5
break
def main_socket(key, size=10000*10000):
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=socket1, args=(flag, key, size))
p2 = mp.Process(target=socket2, args=(flag, key,))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
main_pipe("pipe10", 10)
main_pipe("pipe10^5", 100000)
main_pipe("pipe10^6", 1000000)
main_pipe("pipe10^7", 10000000)
main_pipe("pipe10^8", 100000000)
main_queue("queue")
main_pipe("pipe")
main_array("array")
main_manager("manager")
main_raw("raw")
main_file("file")
main_socket("socket")
Recommended Posts