Asynchron mit Python verarbeiten

Was ist dieser Artikel?

Ich habe den Code gerade geschrieben, weil ich Python schneller machen wollte.

Was ich machen wollte

Meine wissenschaftlichen Berechnungen sind langsam. Sowohl die CPU-Verarbeitung als auch io sind langsam. Übrigens gibt es auf der Straße verschiedene Beschleunigungen.

Im Fall von Python scheint Multithread aufgrund von GIL nicht schnell zu sein Es wird unweigerlich zu einer Multiprozess- und asynchronen Verarbeitung. Selbst wenn Sie versuchen, Asyncio für die asynchrone Verarbeitung einzuführen, ** Das in asyncio verwendete Collout kann nicht eingelegt werden und ist nicht mit Multiprozessen kompatibel ** Es sieht aus wie ... aber ** es scheint kompatibel zu sein. ** ** **

Ich bin mir jedoch nicht sicher, ob es schneller geht, wenn beide kompatibel sind.

Dieses Mal werde ich nicht über "prozessbasierte asynchrone Verarbeitung" sprechen. Ich werde über "Ausführen eines Coroutums in mehreren Prozessen" sprechen.

Neuerfindung von Rädern?

Es gibt ein Paket, das asynchron mehrere Prozesse verarbeitet. Tatsächlich reicht es nicht aus, ein Paket herauszunehmen und die Abhängigkeit zu erhöhen ... Ich weiß also nicht, welches besser ist, weil es alt ist oder Multithread verwendet. Es war also einfach zu implementieren ... Dann musste ich es selbst schreiben.

Notwendige Gegenstände: asyncio + Pool (). Map

Apropos asynchron in Python, Asyncio. Und in Python ist Multi-Process Multiprocessing.Pool. Verwenden Sie diese beiden gleichzeitig.

Fehlerbeispiel

Das Folgende funktioniert jedoch nicht. Cororute wird wütend, weil er nicht einlegen kann.

async calc(x):
    return x * 2

async def async_calc(x):
    result = await calc(x)
    return result

with Pool() as pool:
    pool.map(calc, range(3))

Wenn Sie eine Coroutine nicht einlegen können, warum nicht eine Funktion auslösen, die eine Coroutine zurückgibt?

Kehren wir nun zur Möglichkeit der Gurke zurück.

Object Pickle
Corroutine Unmöglich
Lambda-Ausdruck Unmöglich
Funktion von def Ja

Mit anderen Worten, die Funktion, die einen Collout zurückgibt, kann ausgewählt werden.

Dinge notwendig

Angenommen, Sie möchten Folgendes parallel ausführen.

--read_text: Eine Funktion, die Text asynchron liest --main: Funktion, die tatsächlich verarbeitet

ich schrieb es

Übergeben Sie hier eine asynchrone Funktion und ihre Argumente an eine Funktion namens _wrap_async. Ich drehe den Koroutum hinein.

from typing import List, Iterable, Callable, Any, cast
from asyncio import get_event_loop, Task
from os import cpu_count
from multiprocessing import Pool

async def read_text(fname: str) -> str:
    with open(fname) as fp:
        result = fp.read()
    return result

async def main(fname: str) -> str:
    txt = Task(read_text(fname))
    result = await txt
    return result

def _wrap_async(func: Callable, *args: Any) -> Any:
    loop = get_event_loop()
    result = loop.run_until_complete(func(*args))
    loop.close()
    return result

def pmap_async(func: Callable, arg: Iterable,
               chunk: int = 2) -> List[Any]:
    with Pool(chunk) as pool:
        result = pool.starmap_async(_wrap_async,
                                    [(func, a) for a in arg]).get()
    return result

result = pmap_async(main, ['test.py'])
print(result)

Sie können es so machen. Also werde ich so etwas wie den Fehler der Kartenfunktion schreiben.

Ich habe versucht, es wie node.js zu verketten, weil es eine gute Idee ist

Plötzlich bin ich map (func1, map (func2, map (func3, iterator))) Ich hasse es, solchen Code zu schreiben. Selbst wenn dies getrennt ist, weiß ich nicht, wie ich das Kartenobjekt in der Mitte benennen soll. Also habe ich versucht, es wie Array of node.js zu schreiben.

Ich habe es geschrieben, aber der Code ist zu lang, also werde ich es zuletzt schreiben. Insbesondere kann es wie folgt geschrieben werden. Ich versuche, Generator so oft wie möglich zu verwenden, daher denke ich, dass der Overhead gering ist ...

async def test(x): return x * 2
ChainIter([1, 2]).async_map(test, 2)[0]
>>> 2

Jetzt können Sie das Coroutum in einer Multiprozesskette verwenden. ~~ Und es wird gesagt, dass es gegen den Kodierungsstandard verstößt und in Dörfer unterteilt wird. ~~

Langer Code

Es ist lang mit einer Schwanzflosse geworden

from asyncio import new_event_loop, ensure_future, Future
from typing import (Any, Callable, Iterable, cast, Coroutine,
                    Iterator, List, Union)
from itertools import starmap
from multiprocessing import Pool
from doctest import testmod
from functools import reduce, wraps, partial
from logging import Logger, INFO, getLogger, basicConfig
import time


logger = getLogger('ChainIter')
logger.setLevel(INFO)


def future(func: Callable) -> Callable:
    @wraps(func)
    def wrap(*args: Any, **kwargs: Any) -> Future:
        return ensure_future(func(*args, **kwargs))
    return wrap


def run_coroutine(cor: Coroutine) -> Any:
    """
    Just run coroutine.
    """
    loop = new_event_loop()
    result = loop.run_until_complete(cor)
    loop.close()
    return result


def run_async(func: Callable, *args: Any, **kwargs: Any) -> Any:
    """
    Assemble coroutine and run.
    """
    loop = new_event_loop()
    result = loop.run_until_complete(func(*args, **kwargs))
    loop.close()
    return result


class ChainIter:
    """
    Iterator which can used by method chain like Arry of node.js.
    Multi processing and asyncio can run.
    """
    def __init__(self, data: Union[list, Iterable],
                 indexable: bool = False, max_num: int = 0):
        """
        Parameters
        ----------
        data: Iterable
            It need not to be indexable.
        indexable: bool
            If data is indexable, indexable should be True.
        """
        self.data = data
        self.indexable = indexable
        self.num = 0  # Iterator needs number.
        self.max = len(data) if hasattr(data, '__len__') else max_num
        self.bar = True
        self.bar_len = 30

    def map(self, func: Callable, core: int = 1) -> 'ChainIter':
        """
        Chainable map.

        Parameters
        ----------
        func: Callable
            Function to run.
        core: int
            Number of cpu cores.
            If it is larger than 1, multiprocessing based on
            multiprocessing.Pool will be run.
            And so, If func cannot be lambda or coroutine if
            it is larger than 1.
        Returns
        ---------
        ChainIter with result

        >>> ChainIter([5, 6]).map(lambda x: x * 2).get()
        [10, 12]
        """
        logger.info(' '.join(('Running', str(func.__name__))))
        if (core == 1):
            return ChainIter(map(func, self.data), False, self.max)
        with Pool(core) as pool:
            result = pool.map_async(func, self.data).get()
        return ChainIter(result, True, self.max)

    def starmap(self, func: Callable, core: int = 1) -> 'ChainIter':
        """
        Chainable starmap.
        In this case, ChainIter.data must be Iterator of iterable objects.

        Parameters
        ----------
        func: Callable
            Function to run.
        core: int
            Number of cpu cores.
            If it is larger than 1, multiprocessing based on
            multiprocessing.Pool will be run.
            And so, If func cannot be lambda or coroutine if
            it is larger than 1.
        Returns
        ---------
        ChainIter with result
        >>> def multest2(x, y): return x * y
        >>> ChainIter([5, 6]).zip([2, 3]).starmap(multest2).get()
        [10, 18]
        """
        logger.info(' '.join(('Running', str(func.__name__))))
        if core == 1:
            return ChainIter(starmap(func, self.data), False, self.max)
        with Pool(core) as pool:
            result = pool.starmap_async(func, self.data).get()
        return ChainIter(result, True, self.max)

    def filter(self, func: Callable) -> 'ChainIter':
        """
        Simple filter function.

        Parameters
        ----------
        func: Callable
        """
        logger.info(' '.join(('Running', str(func.__name__))))
        return ChainIter(filter(func, self.data), False, 0)

    def async_map(self, func: Callable, chunk: int = 1) -> 'ChainIter':
        """
        Chainable map of coroutine, for example, async def function.

        Parameters
        ----------
        func: Callable
            Function to run.
        core: int
            Number of cpu cores.
            If it is larger than 1, multiprocessing based on
            multiprocessing.Pool will be run.
            And so, If func cannot be lambda or coroutine if
            it is larger than 1.
        Returns
        ---------
        ChainIter with result
        """
        logger.info(' '.join(('Running', str(func.__name__))))
        if chunk == 1:
            return ChainIter(starmap(run_async,
                                     ((func, a) for a in self.data)),
                             False, self.max)
        with Pool(chunk) as pool:
            result = pool.starmap_async(run_async,
                                        ((func, a) for a in self.data)).get()
        return ChainIter(result, True, self.max)

    def has_index(self) -> bool:
        return True if self.indexable else hasattr(self.data, '__getitem__')

    def __getitem__(self, num: int) -> Any:
        if self.has_index():
            return cast(list, self.data)[num]
        self.data = tuple(self.data)
        return self.data[num]

    def reduce(self, func: Callable) -> Any:
        """
        Simple reduce function.

        Parameters
        ----------
        func: Callable

        Returns
        ----------
        Result of reduce.
        """
        logger.info(' '.join(('Running', str(func.__name__))))
        return reduce(func, self.data)

    def get(self, kind: type = list) -> Any:
        """
        Get data as list.

        Parameters
        ----------
        kind: Callable
            If you want to convert to object which is not list,
            you can set it. For example, tuple, dqueue, and so on.
        """
        return kind(self.data)

    def zip(self, *args: Iterable) -> 'ChainIter':
        """
        Simple chainable zip function.
        Parameters
        ----------
        *args: Iterators to zip.

        Returns
        ----------
        Result of func(*ChainIter, *args, **kwargs)
        """
        return ChainIter(zip(self.data, *args), False, 0)

    def __iter__(self) -> 'ChainIter':
        self.calc()
        self.max = len(cast(list, self.data))
        return self

    def __next__(self) -> Any:
        if self.bar:
            start_time = current_time = time.time()
            bar_str = '\r{percent}%[{bar}{arrow}{space}]{div}'
            cycle_token = ('-', '\\', '|', '/')
            cycle_str = '\r[{cycle}]'
            stat_str = ' | {epoch_time:.2g}sec/epoch | Speed: {speed:.2g}/sec'
            progress = bar_str + stat_str
            cycle = cycle_str + stat_str

            prev_time = current_time
            current_time = time.time()
            epoch_time = current_time - prev_time
            if self.max != 0:
                bar_num = int((self.num + 1) / self.max * self.bar_len)
                print(progress.format(
                    percent=int(100 * (self.num + 1) / self.max),
                    bar='=' * bar_num,
                    arrow='>',
                    space=' ' * (self.bar_len - bar_num),
                    div=' ' + str(self.num + 1) + '/' + str(self.max),
                    epoch_time=round(epoch_time, 3),
                    speed=round(1 / epoch_time, 3)
                    ), end='')
            else:
                print(cycle.format(
                    cycle=cycle_token[self.num % 4],
                    div=' ' + str(self.num + 1) + '/' + str(self.max),
                    epoch_time=round(epoch_time, 3),
                    speed=round(1 / epoch_time, 3)
                    ), end='')
        self.num += 1
        if self.num == self.max:
            if self.bar:
                print('\nComplete in {sec} sec!'.format(
                    sec=round(time.time()-start_time, 3)))
            raise StopIteration
        return self.__getitem__(self.num - 1)

    def __reversed__(self) -> Iterable:
        if hasattr(self.data, '__reversed__'):
            return cast(list, self.data).__reversed__()
        raise IndexError('Not reversible')

    def __setitem__(self, key: Any, item: Any) -> None:
        if hasattr(self.data, '__setitem__'):
            cast(list, self.data)[key] = item
        raise IndexError('Item cannot be set.')

    def arg(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
        """
        Use ChainIter object as argument.
        It is same as func(*ChainIter, *args, **kwargs)

        Parameters
        ----------
        func: Callable

        Returns
        ----------
        Result of func(*ChainIter, *args, **kwargs)
        >>> ChainIter([5, 6]).arg(sum)
        11
        """
        return func(tuple(self.data), *args, **kwargs)

    def stararg(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
        """
        Use ChainIter object as argument.
        It is same as func(*tuple(ChainIter), *args, **kwargs)

        Parameters
        ----------
        func: Callable

        Returns
        ----------
        ChainIter object
        >>> ChainIter([5, 6]).stararg(lambda x, y: x * y)
        30
        """
        return func(*tuple(self.data), *args, **kwargs)

    def calc(self) -> 'ChainIter':
        """
        ChainIter.data may be list, map, filter and so on.
        This method translate it to list.
        If you do not run parallel, it can print progress bar if you want.

        Parameters
        ----------
        max_len: int = 0
            Length of data.
            If 0, it print pivot bar.

        Returns
        ----------
        ChainIter object
        """
        if self.bar:
            res = []
            start_time = current_time = time.time()
            bar_str = '\r{percent}%[{bar}{arrow}{space}]{div}'
            cycle_token = ('-', '\\', '|', '/')
            cycle_str = '\r[{cycle}]'
            stat_str = ' | {epoch_time:.2g}sec/epoch | Speed: {speed:.2g}/sec'
            progress = bar_str + stat_str
            cycle = cycle_str + stat_str
            for n, v in enumerate(self.data):
                res.append(v)
                prev_time = current_time
                current_time = time.time()
                epoch_time = current_time - prev_time
                if self.max != 0:
                    bar_num = int((n + 1) / self.max * self.bar_len)
                    print(progress.format(
                        percent=int(100 * (n + 1) / self.max),
                        bar='=' * bar_num,
                        arrow='>',
                        space=' ' * (self.bar_len - bar_num),
                        div=' ' + str(n + 1) + '/' + str(self.max),
                        epoch_time=round(epoch_time, 3),
                        speed=round(1 / epoch_time, 3)
                        ), end='')
                else:
                    print(cycle.format(
                        cycle=cycle_token[n % 4],
                        div=' ' + str(n + 1) + '/' + str(self.max),
                        epoch_time=round(epoch_time, 3),
                        speed=round(1 / epoch_time, 3)
                        ), end='')
            print('\nComplete in {sec} sec!'.format(
                sec=round(time.time()-start_time, 3)))
            self.data = res
            return self
        self.data = list(self.data)
        return self

    def __len__(self) -> int:
        self.calc()
        return len(cast(list, self.data))

    def __repr__(self) -> str:
        return 'ChainIter[{}]'.format(str(self.data))

    def __str__(self) -> str:
        return 'ChainIter[{}]'.format(str(self.data))

    def print(self) -> 'ChainIter':
        print(self.data)
        return self


if __name__ == '__main__':
    testmod()

Recommended Posts

Asynchron mit Python verarbeiten
Debuggen Sie das Python-Multiprozessprogramm mit VSCode
FizzBuzz in Python3
Scraping mit Python
Statistik mit Python
Scraping mit Python
Twilio mit Python
In Python integrieren
Spielen Sie mit 2016-Python
AES256 mit Python
Python beginnt mit ()
mit Syntax (Python)
Bingo mit Python
Zundokokiyoshi mit Python
[Python] Über Multi-Prozess
Excel mit Python
Mikrocomputer mit Python
Mit Python besetzen
Serielle Kommunikation mit Python
Django 1.11 wurde mit Python3.6 gestartet
Primzahlbeurteilung mit Python
Python mit Eclipse + PyDev.
Socket-Kommunikation mit Python
Datenanalyse mit Python 2
Scraping in Python (Vorbereitung)
Versuchen Sie es mit Python.
Python lernen mit ChemTHEATER 03
"Objektorientiert" mit Python gelernt
Führen Sie Python mit VBA aus
Umgang mit Yaml mit Python
Löse AtCoder 167 mit Python
Serielle Kommunikation mit Python
[Python] Verwenden Sie JSON mit Python
Python lernen mit ChemTHEATER 05-1
Lerne Python mit ChemTHEATER
Führen Sie prepDE.py mit python3 aus
1.1 Erste Schritte mit Python
Tweets mit Python sammeln
Binarisierung mit OpenCV / Python
3. 3. KI-Programmierung mit Python
Kernel-Methode mit Python
Nicht blockierend mit Python + uWSGI
Scraping mit Python + PhantomJS
Tweets mit Python posten
Fahren Sie WebDriver mit Python
Verwenden Sie Mecab mit Python 3
[Python] Mit CGIHTTPServer umleiten
Sprachanalyse mit Python
Denken Sie an Yaml mit Python
Erste Schritte mit Python
Verwenden Sie DynamoDB mit Python
Zundko Getter mit Python
Behandle Excel mit Python
Ohmsches Gesetz mit Python
Primzahlbeurteilung mit Python
Löse Mathe mit Python
Python ab Windows 7
Heatmap von Python + matplotlib
Python-Programmierung mit Atom
Python lernen mit ChemTHEATER 02
Verwenden Sie Python 3.8 mit Anaconda