Ich habe den Code gerade geschrieben, weil ich Python schneller machen wollte.
Meine wissenschaftlichen Berechnungen sind langsam. Sowohl die CPU-Verarbeitung als auch io sind langsam. Übrigens gibt es auf der Straße verschiedene Beschleunigungen.
Im Fall von Python scheint Multithread aufgrund von GIL nicht schnell zu sein Es wird unweigerlich zu einer Multiprozess- und asynchronen Verarbeitung. Selbst wenn Sie versuchen, Asyncio für die asynchrone Verarbeitung einzuführen, ** Das in asyncio verwendete Collout kann nicht eingelegt werden und ist nicht mit Multiprozessen kompatibel ** Es sieht aus wie ... aber ** es scheint kompatibel zu sein. ** ** **
Ich bin mir jedoch nicht sicher, ob es schneller geht, wenn beide kompatibel sind.
Dieses Mal werde ich nicht über "prozessbasierte asynchrone Verarbeitung" sprechen. Ich werde über "Ausführen eines Coroutums in mehreren Prozessen" sprechen.
Es gibt ein Paket, das asynchron mehrere Prozesse verarbeitet. Tatsächlich reicht es nicht aus, ein Paket herauszunehmen und die Abhängigkeit zu erhöhen ... Ich weiß also nicht, welches besser ist, weil es alt ist oder Multithread verwendet. Es war also einfach zu implementieren ... Dann musste ich es selbst schreiben.
Apropos asynchron in Python, Asyncio. Und in Python ist Multi-Process Multiprocessing.Pool. Verwenden Sie diese beiden gleichzeitig.
Das Folgende funktioniert jedoch nicht. Cororute wird wütend, weil er nicht einlegen kann.
async calc(x):
return x * 2
async def async_calc(x):
result = await calc(x)
return result
with Pool() as pool:
pool.map(calc, range(3))
Kehren wir nun zur Möglichkeit der Gurke zurück.
Object | Pickle |
---|---|
Corroutine | Unmöglich |
Lambda-Ausdruck | Unmöglich |
Funktion von def | Ja |
Mit anderen Worten, die Funktion, die einen Collout zurückgibt, kann ausgewählt werden.
Angenommen, Sie möchten Folgendes parallel ausführen.
--read_text: Eine Funktion, die Text asynchron liest --main: Funktion, die tatsächlich verarbeitet
Übergeben Sie hier eine asynchrone Funktion und ihre Argumente an eine Funktion namens _wrap_async. Ich drehe den Koroutum hinein.
from typing import List, Iterable, Callable, Any, cast
from asyncio import get_event_loop, Task
from os import cpu_count
from multiprocessing import Pool
async def read_text(fname: str) -> str:
with open(fname) as fp:
result = fp.read()
return result
async def main(fname: str) -> str:
txt = Task(read_text(fname))
result = await txt
return result
def _wrap_async(func: Callable, *args: Any) -> Any:
loop = get_event_loop()
result = loop.run_until_complete(func(*args))
loop.close()
return result
def pmap_async(func: Callable, arg: Iterable,
chunk: int = 2) -> List[Any]:
with Pool(chunk) as pool:
result = pool.starmap_async(_wrap_async,
[(func, a) for a in arg]).get()
return result
result = pmap_async(main, ['test.py'])
print(result)
Sie können es so machen. Also werde ich so etwas wie den Fehler der Kartenfunktion schreiben.
Plötzlich bin ich map (func1, map (func2, map (func3, iterator))) Ich hasse es, solchen Code zu schreiben. Selbst wenn dies getrennt ist, weiß ich nicht, wie ich das Kartenobjekt in der Mitte benennen soll. Also habe ich versucht, es wie Array of node.js zu schreiben.
Ich habe es geschrieben, aber der Code ist zu lang, also werde ich es zuletzt schreiben. Insbesondere kann es wie folgt geschrieben werden. Ich versuche, Generator so oft wie möglich zu verwenden, daher denke ich, dass der Overhead gering ist ...
async def test(x): return x * 2
ChainIter([1, 2]).async_map(test, 2)[0]
>>> 2
Jetzt können Sie das Coroutum in einer Multiprozesskette verwenden. ~~ Und es wird gesagt, dass es gegen den Kodierungsstandard verstößt und in Dörfer unterteilt wird. ~~
Es ist lang mit einer Schwanzflosse geworden
from asyncio import new_event_loop, ensure_future, Future
from typing import (Any, Callable, Iterable, cast, Coroutine,
Iterator, List, Union)
from itertools import starmap
from multiprocessing import Pool
from doctest import testmod
from functools import reduce, wraps, partial
from logging import Logger, INFO, getLogger, basicConfig
import time
logger = getLogger('ChainIter')
logger.setLevel(INFO)
def future(func: Callable) -> Callable:
@wraps(func)
def wrap(*args: Any, **kwargs: Any) -> Future:
return ensure_future(func(*args, **kwargs))
return wrap
def run_coroutine(cor: Coroutine) -> Any:
"""
Just run coroutine.
"""
loop = new_event_loop()
result = loop.run_until_complete(cor)
loop.close()
return result
def run_async(func: Callable, *args: Any, **kwargs: Any) -> Any:
"""
Assemble coroutine and run.
"""
loop = new_event_loop()
result = loop.run_until_complete(func(*args, **kwargs))
loop.close()
return result
class ChainIter:
"""
Iterator which can used by method chain like Arry of node.js.
Multi processing and asyncio can run.
"""
def __init__(self, data: Union[list, Iterable],
indexable: bool = False, max_num: int = 0):
"""
Parameters
----------
data: Iterable
It need not to be indexable.
indexable: bool
If data is indexable, indexable should be True.
"""
self.data = data
self.indexable = indexable
self.num = 0 # Iterator needs number.
self.max = len(data) if hasattr(data, '__len__') else max_num
self.bar = True
self.bar_len = 30
def map(self, func: Callable, core: int = 1) -> 'ChainIter':
"""
Chainable map.
Parameters
----------
func: Callable
Function to run.
core: int
Number of cpu cores.
If it is larger than 1, multiprocessing based on
multiprocessing.Pool will be run.
And so, If func cannot be lambda or coroutine if
it is larger than 1.
Returns
---------
ChainIter with result
>>> ChainIter([5, 6]).map(lambda x: x * 2).get()
[10, 12]
"""
logger.info(' '.join(('Running', str(func.__name__))))
if (core == 1):
return ChainIter(map(func, self.data), False, self.max)
with Pool(core) as pool:
result = pool.map_async(func, self.data).get()
return ChainIter(result, True, self.max)
def starmap(self, func: Callable, core: int = 1) -> 'ChainIter':
"""
Chainable starmap.
In this case, ChainIter.data must be Iterator of iterable objects.
Parameters
----------
func: Callable
Function to run.
core: int
Number of cpu cores.
If it is larger than 1, multiprocessing based on
multiprocessing.Pool will be run.
And so, If func cannot be lambda or coroutine if
it is larger than 1.
Returns
---------
ChainIter with result
>>> def multest2(x, y): return x * y
>>> ChainIter([5, 6]).zip([2, 3]).starmap(multest2).get()
[10, 18]
"""
logger.info(' '.join(('Running', str(func.__name__))))
if core == 1:
return ChainIter(starmap(func, self.data), False, self.max)
with Pool(core) as pool:
result = pool.starmap_async(func, self.data).get()
return ChainIter(result, True, self.max)
def filter(self, func: Callable) -> 'ChainIter':
"""
Simple filter function.
Parameters
----------
func: Callable
"""
logger.info(' '.join(('Running', str(func.__name__))))
return ChainIter(filter(func, self.data), False, 0)
def async_map(self, func: Callable, chunk: int = 1) -> 'ChainIter':
"""
Chainable map of coroutine, for example, async def function.
Parameters
----------
func: Callable
Function to run.
core: int
Number of cpu cores.
If it is larger than 1, multiprocessing based on
multiprocessing.Pool will be run.
And so, If func cannot be lambda or coroutine if
it is larger than 1.
Returns
---------
ChainIter with result
"""
logger.info(' '.join(('Running', str(func.__name__))))
if chunk == 1:
return ChainIter(starmap(run_async,
((func, a) for a in self.data)),
False, self.max)
with Pool(chunk) as pool:
result = pool.starmap_async(run_async,
((func, a) for a in self.data)).get()
return ChainIter(result, True, self.max)
def has_index(self) -> bool:
return True if self.indexable else hasattr(self.data, '__getitem__')
def __getitem__(self, num: int) -> Any:
if self.has_index():
return cast(list, self.data)[num]
self.data = tuple(self.data)
return self.data[num]
def reduce(self, func: Callable) -> Any:
"""
Simple reduce function.
Parameters
----------
func: Callable
Returns
----------
Result of reduce.
"""
logger.info(' '.join(('Running', str(func.__name__))))
return reduce(func, self.data)
def get(self, kind: type = list) -> Any:
"""
Get data as list.
Parameters
----------
kind: Callable
If you want to convert to object which is not list,
you can set it. For example, tuple, dqueue, and so on.
"""
return kind(self.data)
def zip(self, *args: Iterable) -> 'ChainIter':
"""
Simple chainable zip function.
Parameters
----------
*args: Iterators to zip.
Returns
----------
Result of func(*ChainIter, *args, **kwargs)
"""
return ChainIter(zip(self.data, *args), False, 0)
def __iter__(self) -> 'ChainIter':
self.calc()
self.max = len(cast(list, self.data))
return self
def __next__(self) -> Any:
if self.bar:
start_time = current_time = time.time()
bar_str = '\r{percent}%[{bar}{arrow}{space}]{div}'
cycle_token = ('-', '\\', '|', '/')
cycle_str = '\r[{cycle}]'
stat_str = ' | {epoch_time:.2g}sec/epoch | Speed: {speed:.2g}/sec'
progress = bar_str + stat_str
cycle = cycle_str + stat_str
prev_time = current_time
current_time = time.time()
epoch_time = current_time - prev_time
if self.max != 0:
bar_num = int((self.num + 1) / self.max * self.bar_len)
print(progress.format(
percent=int(100 * (self.num + 1) / self.max),
bar='=' * bar_num,
arrow='>',
space=' ' * (self.bar_len - bar_num),
div=' ' + str(self.num + 1) + '/' + str(self.max),
epoch_time=round(epoch_time, 3),
speed=round(1 / epoch_time, 3)
), end='')
else:
print(cycle.format(
cycle=cycle_token[self.num % 4],
div=' ' + str(self.num + 1) + '/' + str(self.max),
epoch_time=round(epoch_time, 3),
speed=round(1 / epoch_time, 3)
), end='')
self.num += 1
if self.num == self.max:
if self.bar:
print('\nComplete in {sec} sec!'.format(
sec=round(time.time()-start_time, 3)))
raise StopIteration
return self.__getitem__(self.num - 1)
def __reversed__(self) -> Iterable:
if hasattr(self.data, '__reversed__'):
return cast(list, self.data).__reversed__()
raise IndexError('Not reversible')
def __setitem__(self, key: Any, item: Any) -> None:
if hasattr(self.data, '__setitem__'):
cast(list, self.data)[key] = item
raise IndexError('Item cannot be set.')
def arg(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
"""
Use ChainIter object as argument.
It is same as func(*ChainIter, *args, **kwargs)
Parameters
----------
func: Callable
Returns
----------
Result of func(*ChainIter, *args, **kwargs)
>>> ChainIter([5, 6]).arg(sum)
11
"""
return func(tuple(self.data), *args, **kwargs)
def stararg(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
"""
Use ChainIter object as argument.
It is same as func(*tuple(ChainIter), *args, **kwargs)
Parameters
----------
func: Callable
Returns
----------
ChainIter object
>>> ChainIter([5, 6]).stararg(lambda x, y: x * y)
30
"""
return func(*tuple(self.data), *args, **kwargs)
def calc(self) -> 'ChainIter':
"""
ChainIter.data may be list, map, filter and so on.
This method translate it to list.
If you do not run parallel, it can print progress bar if you want.
Parameters
----------
max_len: int = 0
Length of data.
If 0, it print pivot bar.
Returns
----------
ChainIter object
"""
if self.bar:
res = []
start_time = current_time = time.time()
bar_str = '\r{percent}%[{bar}{arrow}{space}]{div}'
cycle_token = ('-', '\\', '|', '/')
cycle_str = '\r[{cycle}]'
stat_str = ' | {epoch_time:.2g}sec/epoch | Speed: {speed:.2g}/sec'
progress = bar_str + stat_str
cycle = cycle_str + stat_str
for n, v in enumerate(self.data):
res.append(v)
prev_time = current_time
current_time = time.time()
epoch_time = current_time - prev_time
if self.max != 0:
bar_num = int((n + 1) / self.max * self.bar_len)
print(progress.format(
percent=int(100 * (n + 1) / self.max),
bar='=' * bar_num,
arrow='>',
space=' ' * (self.bar_len - bar_num),
div=' ' + str(n + 1) + '/' + str(self.max),
epoch_time=round(epoch_time, 3),
speed=round(1 / epoch_time, 3)
), end='')
else:
print(cycle.format(
cycle=cycle_token[n % 4],
div=' ' + str(n + 1) + '/' + str(self.max),
epoch_time=round(epoch_time, 3),
speed=round(1 / epoch_time, 3)
), end='')
print('\nComplete in {sec} sec!'.format(
sec=round(time.time()-start_time, 3)))
self.data = res
return self
self.data = list(self.data)
return self
def __len__(self) -> int:
self.calc()
return len(cast(list, self.data))
def __repr__(self) -> str:
return 'ChainIter[{}]'.format(str(self.data))
def __str__(self) -> str:
return 'ChainIter[{}]'.format(str(self.data))
def print(self) -> 'ChainIter':
print(self.data)
return self
if __name__ == '__main__':
testmod()
Recommended Posts