Multiprocessing-Paket ist eine Bibliothek für die parallele Verarbeitung mithilfe von Prozessen. Der Titel "multiprocessing.pool.Pool.map" ist also eine Parallelverarbeitungsversion von map.
Es wird wie folgt verwendet.
from multiprocessing import Pool
import time
def iter():
for i in range(100):
print("{0} : iter {1}".format(time.time(), i))
yield i
time.sleep(2)
print("{0} : iter finished".format(time.time()))
def fun(n):
print("{0} : {1}".format(time.time(), n))
with Pool(4) as p: #Zuordnung in 4 Prozessen
p.map(fun, iter())
Was passiert also, wenn Sie dies tun? Das Ergebnis ist wie folgt: Nachdem der von der Iter-Funktion generierte Iterator alle Iterationen abgeschlossen hat, wird die Funktion in mehreren Prozessen angewendet.
$ python mp.py | sort -n
1424411166.882628 : iter 0
1424411166.882708 : iter 1
1424411166.882714 : iter 2
1424411166.882725 : iter 3
1424411166.88273 : iter 4
1424411166.882734 : iter 5
1424411166.882738 : iter 6
1424411166.882741 : iter 7
1424411166.882745 : iter 8
1424411166.882748 : iter 9
1424411166.882752 : iter 10
1424411166.882755 : iter 11
1424411166.882758 : iter 12
1424411166.882763 : iter 13
1424411166.882766 : iter 14
1424411166.88277 : iter 15
1424411166.882773 : iter 16
1424411166.882776 : iter 17
1424411166.88278 : iter 18
1424411166.882784 : iter 19
1424411168.884807 : iter finished
1424411168.890891 : 0
1424411168.891006 : 2
1424411168.891053 : 1
1424411168.891174 : 3
1424411168.891351 : 4
1424411168.891527 : 5
1424411168.891707 : 8
1424411168.89173 : 9
1424411168.89206 : 10
1424411168.892085 : 11
1424411168.892139 : 12
1424411168.892162 : 13
1424411168.892473 : 14
1424411168.892483 : 16
1424411168.892495 : 15
1424411168.892506 : 17
1424411168.892599 : 18
1424411168.892619 : 19
In Bezug auf die Implementierung war es wie folgt.
multiprocessing/pool.py
def map(self, func, iterable, chunksize=None):
'''
Apply `func` to each element in `iterable`, collecting the results
in a list that is returned.
'''
return self._map_async(func, iterable, mapstar, chunksize).get()
:
:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
'''
Helper function to implement map, starmap and their async counterparts.
'''
if self._state != RUN:
raise ValueError("Pool not running")
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback,
error_callback=error_callback)
self._taskqueue.put((((result._job, i, mapper, (x,), {})
for i, x in enumerate(task_batches)), None))
return result
Wenn in if not hasattr (iterable, '__ len __')
keine len -Eigenschaft vorhanden ist, scheint list (iterable)
den Iterator in eine Liste zu konvertieren. Bevor Sie die Funktion anwenden, sind alle Iteratoren fertig.
Recommended Posts