Multi-processus de manière asynchrone avec python

Quel est cet article?

Je viens d'écrire le code parce que je voulais rendre python plus rapide.

Ce que je voulais faire

Mes calculs scientifiques sont lents. Le traitement du processeur et io sont lents. Au fait, il y a plusieurs accélérations dans la rue.

Dans le cas de python, il semble que le multithread ne soit pas rapide à cause de GIL, donc Inévitablement, cela devient un traitement multiprocessus et asynchrone. Cependant, même si vous essayez d'introduire asyncio pour le traitement asynchrone, ** Le collout utilisé dans asyncio ne peut pas être picklé et est incompatible avec le multi-processus ** Cela ressemble à ... mais ** cela semble compatible. ** **

Cependant, je ne sais pas si ce sera plus rapide si les deux sont compatibles.

Cette fois, je ne parlerai pas de "traitement asynchrone basé sur les processus". Je parlerai de "l'exécution du coroutum dans plusieurs processus".

Réinvention des roues?

Il existe un package qui multi-processus de manière asynchrone, En fait, il ne suffit pas de souscrire un forfait et d'augmenter la dépendance ... Donc, je ne sais pas lequel est meilleur car il est vieux ou utilise le multithread. Donc, c'était facile à mettre en œuvre ... Puis j'ai dû l'écrire moi-même.

Éléments nécessaires: asyncio + Pool (). Carte

En parlant d'asynchrone en python, asyncio. Et en python, le multi-processus est le multi-traitement. Utilisez ces deux en même temps.

Exemple d'échec

Cependant, ce qui suit ne fonctionne pas. Corrout est en colère de ne pas pouvoir décaper.

async calc(x):
    return x * 2

async def async_calc(x):
    result = await calc(x)
    return result

with Pool() as pool:
    pool.map(calc, range(3))

Si vous ne pouvez pas pickle une coroutine, pourquoi ne pas lancer une fonction qui renvoie une coroutine?

Maintenant, revenons à la possibilité de cornichon.

Object Pickle
Corroutine Impossible
expression lambda Impossible
Fonction par def Oui

En d'autres termes, la fonction qui renvoie un collout peut être picklée.

Les choses nécessaires

En principe, supposons que vous souhaitiez exécuter les éléments suivants en parallèle.

--read_text: une fonction qui lit le texte de manière asynchrone --main: fonction qui traite réellement

je l'ai écrit

Ici, passez une fonction async et ses arguments à une fonction appelée _wrap_async, J'en tourne le coroutum.

from typing import List, Iterable, Callable, Any, cast
from asyncio import get_event_loop, Task
from os import cpu_count
from multiprocessing import Pool

async def read_text(fname: str) -> str:
    with open(fname) as fp:
        result = fp.read()
    return result

async def main(fname: str) -> str:
    txt = Task(read_text(fname))
    result = await txt
    return result

def _wrap_async(func: Callable, *args: Any) -> Any:
    loop = get_event_loop()
    result = loop.run_until_complete(func(*args))
    loop.close()
    return result

def pmap_async(func: Callable, arg: Iterable,
               chunk: int = 2) -> List[Any]:
    with Pool(chunk) as pool:
        result = pool.starmap_async(_wrap_async,
                                    [(func, a) for a in arg]).get()
    return result

result = pmap_async(main, ['test.py'])
print(result)

Vous pouvez le faire comme ça. Donc, j'écrirai quelque chose qui semble être un échec de la fonction de carte.

J'ai essayé de l'enchaîner comme node.js car c'est une bonne idée

Soudain, je suis map (func1, map (func2, map (func3, iterator))) Je déteste écrire du code comme celui-ci. Même si cela est séparé, je ne sais pas comment nommer l'objet de la carte au milieu. Donc, j'ai essayé de l'écrire comme Array of node.js.

Je l'ai écrit, mais le code est trop long, donc je l'écrirai en dernier. Plus précisément, il peut s'écrire comme suit. J'essaye d'utiliser autant que possible Generator, donc je pense que le surcoût est petit ...

async def test(x): return x * 2
ChainIter([1, 2]).async_map(test, 2)[0]
>>> 2

Vous pouvez maintenant utiliser le coroutum dans une chaîne multi-processus. ~~ Et il est dit que cela viole la norme de codage et il sera divisé en villages. ~~

Code long

Il est devenu long avec une nageoire caudale

from asyncio import new_event_loop, ensure_future, Future
from typing import (Any, Callable, Iterable, cast, Coroutine,
                    Iterator, List, Union)
from itertools import starmap
from multiprocessing import Pool
from doctest import testmod
from functools import reduce, wraps, partial
from logging import Logger, INFO, getLogger, basicConfig
import time


logger = getLogger('ChainIter')
logger.setLevel(INFO)


def future(func: Callable) -> Callable:
    @wraps(func)
    def wrap(*args: Any, **kwargs: Any) -> Future:
        return ensure_future(func(*args, **kwargs))
    return wrap


def run_coroutine(cor: Coroutine) -> Any:
    """
    Just run coroutine.
    """
    loop = new_event_loop()
    result = loop.run_until_complete(cor)
    loop.close()
    return result


def run_async(func: Callable, *args: Any, **kwargs: Any) -> Any:
    """
    Assemble coroutine and run.
    """
    loop = new_event_loop()
    result = loop.run_until_complete(func(*args, **kwargs))
    loop.close()
    return result


class ChainIter:
    """
    Iterator which can used by method chain like Arry of node.js.
    Multi processing and asyncio can run.
    """
    def __init__(self, data: Union[list, Iterable],
                 indexable: bool = False, max_num: int = 0):
        """
        Parameters
        ----------
        data: Iterable
            It need not to be indexable.
        indexable: bool
            If data is indexable, indexable should be True.
        """
        self.data = data
        self.indexable = indexable
        self.num = 0  # Iterator needs number.
        self.max = len(data) if hasattr(data, '__len__') else max_num
        self.bar = True
        self.bar_len = 30

    def map(self, func: Callable, core: int = 1) -> 'ChainIter':
        """
        Chainable map.

        Parameters
        ----------
        func: Callable
            Function to run.
        core: int
            Number of cpu cores.
            If it is larger than 1, multiprocessing based on
            multiprocessing.Pool will be run.
            And so, If func cannot be lambda or coroutine if
            it is larger than 1.
        Returns
        ---------
        ChainIter with result

        >>> ChainIter([5, 6]).map(lambda x: x * 2).get()
        [10, 12]
        """
        logger.info(' '.join(('Running', str(func.__name__))))
        if (core == 1):
            return ChainIter(map(func, self.data), False, self.max)
        with Pool(core) as pool:
            result = pool.map_async(func, self.data).get()
        return ChainIter(result, True, self.max)

    def starmap(self, func: Callable, core: int = 1) -> 'ChainIter':
        """
        Chainable starmap.
        In this case, ChainIter.data must be Iterator of iterable objects.

        Parameters
        ----------
        func: Callable
            Function to run.
        core: int
            Number of cpu cores.
            If it is larger than 1, multiprocessing based on
            multiprocessing.Pool will be run.
            And so, If func cannot be lambda or coroutine if
            it is larger than 1.
        Returns
        ---------
        ChainIter with result
        >>> def multest2(x, y): return x * y
        >>> ChainIter([5, 6]).zip([2, 3]).starmap(multest2).get()
        [10, 18]
        """
        logger.info(' '.join(('Running', str(func.__name__))))
        if core == 1:
            return ChainIter(starmap(func, self.data), False, self.max)
        with Pool(core) as pool:
            result = pool.starmap_async(func, self.data).get()
        return ChainIter(result, True, self.max)

    def filter(self, func: Callable) -> 'ChainIter':
        """
        Simple filter function.

        Parameters
        ----------
        func: Callable
        """
        logger.info(' '.join(('Running', str(func.__name__))))
        return ChainIter(filter(func, self.data), False, 0)

    def async_map(self, func: Callable, chunk: int = 1) -> 'ChainIter':
        """
        Chainable map of coroutine, for example, async def function.

        Parameters
        ----------
        func: Callable
            Function to run.
        core: int
            Number of cpu cores.
            If it is larger than 1, multiprocessing based on
            multiprocessing.Pool will be run.
            And so, If func cannot be lambda or coroutine if
            it is larger than 1.
        Returns
        ---------
        ChainIter with result
        """
        logger.info(' '.join(('Running', str(func.__name__))))
        if chunk == 1:
            return ChainIter(starmap(run_async,
                                     ((func, a) for a in self.data)),
                             False, self.max)
        with Pool(chunk) as pool:
            result = pool.starmap_async(run_async,
                                        ((func, a) for a in self.data)).get()
        return ChainIter(result, True, self.max)

    def has_index(self) -> bool:
        return True if self.indexable else hasattr(self.data, '__getitem__')

    def __getitem__(self, num: int) -> Any:
        if self.has_index():
            return cast(list, self.data)[num]
        self.data = tuple(self.data)
        return self.data[num]

    def reduce(self, func: Callable) -> Any:
        """
        Simple reduce function.

        Parameters
        ----------
        func: Callable

        Returns
        ----------
        Result of reduce.
        """
        logger.info(' '.join(('Running', str(func.__name__))))
        return reduce(func, self.data)

    def get(self, kind: type = list) -> Any:
        """
        Get data as list.

        Parameters
        ----------
        kind: Callable
            If you want to convert to object which is not list,
            you can set it. For example, tuple, dqueue, and so on.
        """
        return kind(self.data)

    def zip(self, *args: Iterable) -> 'ChainIter':
        """
        Simple chainable zip function.
        Parameters
        ----------
        *args: Iterators to zip.

        Returns
        ----------
        Result of func(*ChainIter, *args, **kwargs)
        """
        return ChainIter(zip(self.data, *args), False, 0)

    def __iter__(self) -> 'ChainIter':
        self.calc()
        self.max = len(cast(list, self.data))
        return self

    def __next__(self) -> Any:
        if self.bar:
            start_time = current_time = time.time()
            bar_str = '\r{percent}%[{bar}{arrow}{space}]{div}'
            cycle_token = ('-', '\\', '|', '/')
            cycle_str = '\r[{cycle}]'
            stat_str = ' | {epoch_time:.2g}sec/epoch | Speed: {speed:.2g}/sec'
            progress = bar_str + stat_str
            cycle = cycle_str + stat_str

            prev_time = current_time
            current_time = time.time()
            epoch_time = current_time - prev_time
            if self.max != 0:
                bar_num = int((self.num + 1) / self.max * self.bar_len)
                print(progress.format(
                    percent=int(100 * (self.num + 1) / self.max),
                    bar='=' * bar_num,
                    arrow='>',
                    space=' ' * (self.bar_len - bar_num),
                    div=' ' + str(self.num + 1) + '/' + str(self.max),
                    epoch_time=round(epoch_time, 3),
                    speed=round(1 / epoch_time, 3)
                    ), end='')
            else:
                print(cycle.format(
                    cycle=cycle_token[self.num % 4],
                    div=' ' + str(self.num + 1) + '/' + str(self.max),
                    epoch_time=round(epoch_time, 3),
                    speed=round(1 / epoch_time, 3)
                    ), end='')
        self.num += 1
        if self.num == self.max:
            if self.bar:
                print('\nComplete in {sec} sec!'.format(
                    sec=round(time.time()-start_time, 3)))
            raise StopIteration
        return self.__getitem__(self.num - 1)

    def __reversed__(self) -> Iterable:
        if hasattr(self.data, '__reversed__'):
            return cast(list, self.data).__reversed__()
        raise IndexError('Not reversible')

    def __setitem__(self, key: Any, item: Any) -> None:
        if hasattr(self.data, '__setitem__'):
            cast(list, self.data)[key] = item
        raise IndexError('Item cannot be set.')

    def arg(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
        """
        Use ChainIter object as argument.
        It is same as func(*ChainIter, *args, **kwargs)

        Parameters
        ----------
        func: Callable

        Returns
        ----------
        Result of func(*ChainIter, *args, **kwargs)
        >>> ChainIter([5, 6]).arg(sum)
        11
        """
        return func(tuple(self.data), *args, **kwargs)

    def stararg(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
        """
        Use ChainIter object as argument.
        It is same as func(*tuple(ChainIter), *args, **kwargs)

        Parameters
        ----------
        func: Callable

        Returns
        ----------
        ChainIter object
        >>> ChainIter([5, 6]).stararg(lambda x, y: x * y)
        30
        """
        return func(*tuple(self.data), *args, **kwargs)

    def calc(self) -> 'ChainIter':
        """
        ChainIter.data may be list, map, filter and so on.
        This method translate it to list.
        If you do not run parallel, it can print progress bar if you want.

        Parameters
        ----------
        max_len: int = 0
            Length of data.
            If 0, it print pivot bar.

        Returns
        ----------
        ChainIter object
        """
        if self.bar:
            res = []
            start_time = current_time = time.time()
            bar_str = '\r{percent}%[{bar}{arrow}{space}]{div}'
            cycle_token = ('-', '\\', '|', '/')
            cycle_str = '\r[{cycle}]'
            stat_str = ' | {epoch_time:.2g}sec/epoch | Speed: {speed:.2g}/sec'
            progress = bar_str + stat_str
            cycle = cycle_str + stat_str
            for n, v in enumerate(self.data):
                res.append(v)
                prev_time = current_time
                current_time = time.time()
                epoch_time = current_time - prev_time
                if self.max != 0:
                    bar_num = int((n + 1) / self.max * self.bar_len)
                    print(progress.format(
                        percent=int(100 * (n + 1) / self.max),
                        bar='=' * bar_num,
                        arrow='>',
                        space=' ' * (self.bar_len - bar_num),
                        div=' ' + str(n + 1) + '/' + str(self.max),
                        epoch_time=round(epoch_time, 3),
                        speed=round(1 / epoch_time, 3)
                        ), end='')
                else:
                    print(cycle.format(
                        cycle=cycle_token[n % 4],
                        div=' ' + str(n + 1) + '/' + str(self.max),
                        epoch_time=round(epoch_time, 3),
                        speed=round(1 / epoch_time, 3)
                        ), end='')
            print('\nComplete in {sec} sec!'.format(
                sec=round(time.time()-start_time, 3)))
            self.data = res
            return self
        self.data = list(self.data)
        return self

    def __len__(self) -> int:
        self.calc()
        return len(cast(list, self.data))

    def __repr__(self) -> str:
        return 'ChainIter[{}]'.format(str(self.data))

    def __str__(self) -> str:
        return 'ChainIter[{}]'.format(str(self.data))

    def print(self) -> 'ChainIter':
        print(self.data)
        return self


if __name__ == '__main__':
    testmod()

Recommended Posts

Multi-processus de manière asynchrone avec python
Déboguer un programme multi-processus python avec VSCode
FizzBuzz en Python3
Grattage avec Python
Statistiques avec python
Grattage avec Python
Twilio avec Python
Intégrer avec Python
Jouez avec 2016-Python
AES256 avec python
python commence par ()
avec syntaxe (Python)
Bingo avec python
Zundokokiyoshi avec python
[Python] À propos du multi-processus
Excel avec Python
Micro-ordinateur avec Python
Cast avec python
Communication série avec Python
Django 1.11 a démarré avec Python3.6
Jugement des nombres premiers avec Python
Python avec eclipse + PyDev.
Communication de socket avec Python
Analyse de données avec python 2
Grattage en Python (préparation)
Essayez de gratter avec Python.
Apprendre Python avec ChemTHEATER 03
"Orienté objet" appris avec python
Exécutez Python avec VBA
Manipuler yaml avec python
Résolvez AtCoder 167 avec python
Communication série avec python
[Python] Utiliser JSON avec Python
Apprendre Python avec ChemTHEATER 05-1
Apprenez Python avec ChemTHEATER
Exécutez prepDE.py avec python3
1.1 Premiers pas avec Python
Collecter des tweets avec Python
Binarisation avec OpenCV / Python
3. 3. Programmation IA avec Python
Méthode Kernel avec Python
Non bloquant avec Python + uWSGI
Grattage avec Python + PhantomJS
Publier des tweets avec python
Conduisez WebDriver avec python
Utiliser mecab avec Python 3
[Python] Redirection avec CGIHTTPServer
Analyse vocale par python
Pensez à yaml avec python
Premiers pas avec Python
Utiliser DynamoDB avec Python
Getter Zundko avec python
Gérez Excel avec python
Loi d'Ohm avec Python
Jugement des nombres premiers avec python
Résoudre des maths avec Python
Python à partir de Windows 7
Carte thermique par Python + matplotlib
Programmation Python avec Atom
Apprendre Python avec ChemTHEATER 02
Utilisez Python 3.8 avec Anaconda