Traitement asynchrone de Python ~ Comprenez parfaitement async et attendez ~

À propos de Future

Lorsque vous effectuez un calcul chronophage en Python et que vous souhaitez obtenir le résultat, vous placez généralement le processus dans une fonction et l'obtenez comme valeur de retour lorsque vous exécutez la fonction. C'est ce qu'on appelle le traitement synchrone.

D'autre part, le traitement asynchrone est un concept différent du traitement synchrone. Cela implique l'interaction suivante entre le processus qui demande le calcul (récepteur) et le processus qui effectue réellement le calcul (émetteur) via un objet appelé «Future».

Le traitement jusqu'à ce point est par exemple le suivant.

import asyncio
import time

def f(future):
    time.sleep(5) #Processus chronophage
    future.set_result("hello")
    return

future = asyncio.futures.Future()
f(future)

if future.done():
    res = future.result()
    print(res)

Quand je fais cela, il dit "bonjour" après avoir attendu 5 secondes.

Pour le moment, le code pertinent pour `Future` est le suivant. (Partiellement omis)

Lib/asyncio/futures.py


class Future:
    _state = _PENDING
    _result = None

    def done(self):
        return self._state != _PENDING

    def result(self):
        if self._state != _FINISHED:
            raise exceptions.InvalidStateError('Result is not ready.')
        return self._result

    def set_result(self, result):
        if self._state != _PENDING:
            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
        self._result = result
        self._state = _FINISHED

Boucle d'événement

Comme vous l'avez déjà remarqué, le code ci-dessus est le même qu'un appel de fonction normal, sauf qu'il utilise un objet Future. En effet, le destinataire exécute directement le code de l'expéditeur. Cela ne profite pas de «Future».

C'est là que le concept de boucles d'événements entre en jeu. Une boucle d'événements est un objet qui a 0 ou 1 objet par thread et qui a pour fonction d'exécuter des fonctions enregistrées.

Utilisons-le réellement.

import asyncio
import time

def f(future):
    time.sleep(5) #Processus chronophage
    future.set_result("hello")
    return

loop = asyncio.get_event_loop()
future = asyncio.futures.Future()
loop.call_soon(f, future)
loop.run_forever()

Dans le code ci-dessus, nous appelons ʻasyncio.get_event_loop pour obtenir l'objet BaseEventLoop. Ensuite, la fonction f est enregistrée dans loop par call_soon. Enfin, la boucle d'événement est exécutée avec loop.run_forever ()`.

Quand je fais ça, je suis dans une boucle infinie avec run_forever () et le programme ne se termine jamais. Au lieu de cela, vous pouvez arrêter automatiquement la boucle d'événements après que la fonction f () ait fini de s'exécuter en écrivant:

res = loop.run_until_complete(future)
print(res)

Comment run_until_complete () peut-il connaître l'achèvement de la fonctionf ()? Cela utilise un mécanisme appelé le callback future. Dans run_until_complete (), la fonction future.add_done_callback () est d'abord exécutée, et le rappel est défini dans future. Ensuite, run_forever () est appelé et la fonction f () est exécutée. Ensuite, lorsque la valeur est définie par future.set_result () dans la fonction f (), le rappel défini par ʻadd_done_callback () est appelé. Dans le callback défini par run_until_complete (), loop.stop ()est utilisé pour réserver la fin de la boucle d'événements, donc la boucle d'événements s'arrêtera après la fin de l'exécution def ()`. ..

Notez que future.set_result () n'est pas exécutée et que la fonction f () n'est pas immédiatement terminée. La fin est uniquement réservée et l'exécution se poursuit en fait jusqu'à «return».

Le code de la bibliothèque associée est affiché.

Lib/asyncio/events.py


import contextvars
class Handle:
    def __init__(self, callback, args, loop):
        self._context = contextvars.copy_context()
        self._loop = loop
        self._callback = callback
        self._args = args

    def _run(self):
        self._context.run(self._callback, *self._args)

Lib/asyncio/base_events.py


class BaseEventLoop(events.AbstractEventLoop):
    def __init__(self):
        self._stopping = False
        self._ready = collections.deque()
        
    def _call_soon(self, callback, args, context):
        handle = events.Handle(callback, args, self, context)
        self._ready.append(handle)
        return handle
    
    def _run_once(self):
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            handle._run()
        
    def run_forever(self):
        while True:
            self._run_once()
            if self._stopping:
                break

    def run_until_complete(self, future):
        def _run_until_complete_cb(fut):
            self.stop()
        future.add_done_callback(_run_until_complete_cb)
        self.run_forever()
        return future.result()

    def stop(self):
        self._stopping = True

Lib/asyncio/futures.py


class Future:
    def add_done_callback(self, fn):
        context = contextvars.copy_context()
        self._callbacks.append((fn, context))

    def set_result(self, result):
        # ...réduction
        for callback, ctx in self._callbacks[:]:
            self._loop.call_soon(callback, self, context=ctx)

Exécution de plusieurs processus à l'aide d'une boucle d'événements

Dans le chapitre précédent, le traitement était exécuté à l'aide d'une boucle d'événements. Cependant, la seule chose qui a changé est que la fonction f, qui effectue un traitement fastidieux, n'a pas été exécutée directement, mais a été exécutée via une boucle d'événements. Cela ne change pas ce que vous faites.

La vraie nature de la boucle d'événements entre en jeu lorsque plusieurs processus sont exécutés. Faisons-le réellement.

import asyncio
import time

def f(future, tag):
    for _ in range(3):
        time.sleep(1)
        print("waiting for f(%d)" % tag)
    future.set_result("hello %d" % tag)
    return

loop = asyncio.get_event_loop()
futures = []
for tag in range(3):
    future = loop.create_future()
    loop.call_soon(f, future, tag)
    futures += [future]
res = loop.run_until_complete(asyncio.gather(*futures))
print(res)

Ce code enregistre trois processus. Il utilise également une nouvelle fonction appelée ʻasyncio.gatherpour regrouper plusieursFuture`s en un seul. Le résultat de cette exécution est le suivant.

waiting for f(0)
waiting for f(0)
waiting for f(0)
waiting for f(1)
waiting for f(1)
waiting for f(1)
waiting for f(2)
waiting for f(2)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']

Notez que comme vous pouvez le voir à partir de ce résultat, f (0), f (1), f (2) ne fonctionnent pas en parallèle. Comme vous pouvez le voir dans le code source de la bibliothèque, dans loop.run_until_complete (), les callbacks enregistrés dans loop._ready ne sont exécutés que séquentiellement.

Je publierai le code de la bibliothèque associée.

Lib/asyncio/tasks.py


class _GatheringFuture(futures.Future):
    def __init__(self, children, *, loop=None):
        super().__init__(loop=loop)
        self._children = children
        self._cancel_requested = False

def gather(*coros_or_futures, loop=None, return_exceptions=False):
    def _done_callback(fut):
        nonlocal nfinished
        nfinished += 1
        if nfinished == nfuts:
            results = []
            for fut in children:
                res = fut.result()
                results.append(res)
            outer.set_result(results)
    arg_to_fut = {}
    children = []
    nfuts = 0
    nfinished = 0
    for arg in coros_or_futures:
        nfuts += 1
        fut.add_done_callback(_done_callback)
        children.append(fut)
    outer = _GatheringFuture(children, loop=loop)
    return outer

Générateur

Maintenant, déraillons et voyons le générateur Python. Un générateur est une "fonction qui renvoie un itérateur". L'exécution du générateur renvoie un objet générateur. L'objet générateur implémente la fonction «iter ()» qui représente un itérateur. Le générateur est mis en œuvre comme suit.

def generator():
    yield 1
    yield 2
    yield 3
    return "END"

gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
    print(gg.__next__())
except StopIteration as e:
    print(e.value)

Ici, «yield» a pour fonction d'arrêter temporairement le traitement du contenu du générateur. Les générateurs peuvent également être empilés en deux couches.

def generator2():
    yield 1
    yield 2
    yield 3
    return "END"

def generator():
    a = yield from generator2()
    return a

gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
    print(gg.__next__())
except StopIteration as e:
    print(e.value)

Les deux résultats d'exécution

1
2
3
END

Ce sera.

Exécution par boucle d'événements du générateur

Comme mentionné dans le chapitre précédent, lors de l'exécution de plusieurs fonctions à l'aide de loop.run_until_complete, la deuxième fonction est exécutée après l'exécution de la première fonction, et ainsi de suite. Les fonctions ne sont pas exécutées en parallèle, mais en séquence. Ici, si vous utilisez un générateur au lieu d'une fonction, ce sera comme suit.

import asyncio
import time

def f(tag):
    for _ in range(3):
        yield
        time.sleep(1)
        print("waiting for f(%d)" % tag)
    return "hello %d" % tag

loop = asyncio.get_event_loop()
tasks = []
for tag in range(3):
    task = f(tag)
    tasks += [task]
res = loop.run_until_complete(asyncio.gather(*tasks))
print(res)

Ici, j'ai ajouté une instruction yield dans la fonction f () et renvoyé le résultat du calcul comme return au lieu de future.set_result. L'argument «futur» n'est plus nécessaire et a été supprimé.

Le résultat de cette exécution est le suivant.

waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']

Dans le chapitre précédent, «f (0)» était affiché trois fois, puis «f (1)» était affiché, et ... a été changé en «f (0)», «f (1)». , f (2) sont maintenant affichés dans cet ordre. En effet, même si plusieurs tâches sont enregistrées dans la boucle d'événements, elles sont toutes exécutées dans un thread. De plus, la boucle d'événements ne peut pas interrompre l'exécution d'une fonction Python, elle doit donc continuer à exécuter une fonction jusqu'à ce qu'elle s'arrête volontairement, comme par return.

D'un autre côté, si vous utilisez un générateur, yield suspendra l'exécution de la fonction. Puisque le traitement revient du côté de la boucle d'événements à ce moment, il est possible de commuter la tâche exécutée par la boucle d'événements.

À propos, le plus petit exemple d'utilisation d'un générateur est: (Cela n'a aucun sens d'en faire un générateur car il n'y a qu'une seule tâche ...)

import asyncio
import time

def f():
    time.sleep(5) #Processus chronophage
    yield
    return "hello"

loop = asyncio.get_event_loop()
ret = loop.run_until_complete(f())
print(ret)

D'ailleurs, dans la version qui n'utilise pas le générateur, loop.call_soon () a été appelé et la fonctionf ()a été enregistrée dans la boucle d'événements, mais ceux qui doutaient que cela n'ait pas été appelé dans ce chapitre. Je pense qu'il y en a beaucoup. Plus précisément, c'est comme suit.

Nom de la fonction argument(Version future) argument(Version du générateur)
f() future Aucun
loop.call_soon() f --
loop.run_until_complete() future f

Dans run_until_complete (), si l'argument donné est un objet générateur (obtenu en appelant la fonctionf ()définie comme générateur), alors une instance Task (une sous-classe de Future) Générer. Call_soon () est appelé en interne au moment de cette génération.

<détails>

Code de bibliothèque associé </ summary>

Lib/asyncio/base_events.py


class BaseEventLoop(events.AbstractEventLoop):
    def run_until_complete(self, future):
        future = tasks.ensure_future(future, loop=self)
        future.add_done_callback(_run_until_complete_cb)
        self.run_forever()
        return future.result()

Lib/asyncio/tasks.py


def ensure_future(coro_or_future, loop):
    if isinstance(coro_or_future, types.CoroutineType) or isinstance(coro_or_future, types.GeneratorType):
        task = tasks.Task(coro_or_future, loop=loop)
        return task
    else:
        return coro_or_future
class Task(futures.Future):
    def __init__(self, coro, loop=None):
        super().__init__(loop=loop)
        self._coro = coro
        self._loop = loop
        self._context = contextvars.copy_context()

        loop.call_soon(self.__step, context=self._context)
        _register_task(self)
    
    def __step(self, exc=None):
        coro = self._coro
        self._fut_waiter = None
        try:
            result = coro.send(None)
        except StopIteration as exc:
            super().set_result(exc.value)
        else:
            self._loop.call_soon(self.__step, context=self._context)

Effectuer d'autres tâches pendant le sommeil

Dans les exemples jusqu'à présent, «time.sleep ()» était fortement utilisé. Ceci est, bien sûr, pour illustrer le "traitement chronophage", mais vous voudrez peut-être réellement sleep () pour des raisons pratiques. Par exemple

--Traitement de temporisation qui s'annule après avoir attendu un certain temps en tant que sous-traitement tout en effectuant une communication réseau dans le traitement principal --Afficher la progression dans le sous-processus tout en effectuant des calculs chronophages dans le processus principal

Cependant, dans un tel cas, time.sleep () ne peut pas être utilisé dans le sous-traitement. C'est parce qu'une fois que time.sleep () est exécuté dans le sous-processus, le processus principal ne peut pas être poursuivi pendant le sommeil, jusqu'à ce que time.sleep () se termine. C'est parce que le sous-marin continuera à occuper la boucle d'événements.

Je souhaite attendre un certain temps dans une tâche, mais je souhaite renvoyer le processus à la boucle d'événements pendant le temps d'attente. Dans de tels cas, vous pouvez utiliser la fonction loop.call_later (). Cette fonction exécute la fonction donnée après avoir attendu le nombre de secondes spécifié. Vous pouvez utiliser cette propriété pour implémenter my_sleep () comme suit:

import asyncio
import time

def my_sleep(delay):
    def _cb_set_result(fut):
        fut.set_result(None)
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    h = loop.call_later(delay, _cb_set_result, future)
    yield from future

def f(tag):
    for i in range(3):
        yield from my_sleep(1)
        print("waiting for f(%d)" % tag)
    return "hello %d" % tag

loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)

Ceci est une réécriture du processus dans le chapitre précédent en utilisant my_sleep (). Dans le chapitre précédent, nous avons attendu 3 secondes pour chacun des 3 processus, donc cela a pris un total de 9 secondes. Cependant, ce processus se termine en 3 secondes environ.

Cela peut être un peu plus compliqué. Par exemple, supposons que vous appelez une fonction dans une tâche et que cette fonction essaie de my_sleep (). Dans ce cas, vous pouvez définir la fonction à appeler en tant que générateur comme suit.

def g():
    yield from my_sleep(10)
    return "hello"

def f():
    ret = yield from g()
    return ret

loop = asyncio.get_event_loop()
ret = loop.run_until_complete(asyncio.gather(f()))
print(ret)

Pourquoi «rendement du futur» au lieu de «rendement du futur»?

Vous avez peut-être remarqué que dans le code my_sleep () décrit ci-dessus, la dernière ligne était yield from future. J'ai utilisé yield pour définir la valeur à renvoyer lorsque le générateur __next__ () est appelé. Au contraire, «yield from» a été spécifié lors de la spécification d'un autre itérateur. Pourquoi utilisez-vous yield from pour renvoyer Future, qui n'est pas un itérateur, juste une boîte à laquelle attribuer des résultats?

Pour des raisons techniques, l'instance Future est en fait un itérateur! Future implémente __iter__ (), et cette fonction ressemble à ceci:

class Future:
    #....
    def __iter__(self):
        yield self

Autrement dit, l'itération de my_sleep () ressemble à ceci:

  1. yield from my_sleep (1) est exécuté. 1.> Créez un objet générateur pour my_sleep avec go = my_sleep (1) 2.> Générer un itérateur avec ʻit = go .__ iter__ () (c'est la même chose que go) 3.> res = it .__ next__ ()est exécuté pour obtenir le premier élément demy_sleep4.> L'exécution du contenu demy_sleep ()` démarre.
  2. L'expression sur le côté droit de yield from dans my_sleep () ʻest évaluée et future` est générée.

  3. ʻit_inner = future .__ iter__ () ʻest exécuté.

  4. res_inner = it_inner .__ next__ () est exécuté. C'est la même chose que «futur». 8.> res_inner est la valeur de retour de ʻit .__ next__ (). Autrement dit, res = future`

Une autre raison politique est que nous voulions être capables de gérer le générateur (ou collout) et «Future» dans la même ligne. Ceci est également lié à ʻawait` dans le chapitre suivant.

<détails>

Code de bibliothèque associé </ summary>

Lib/asyncio/tasks.py


class Task(futures.Future):
    def __step(self, exc=None):
        coro = self._coro
        try:
            result = coro.send(None)
        except StopIteration as exc:
            super().set_result(exc.value)
        elif result != None:
            result.add_done_callback(self.__wakeup, context=self._context)
        else:
            self._loop.call_soon(self.__step, context=self._context)
            
    def __wakeup(self, future):
        self.__step()

Utilisez le mot-clé ʻasync ʻawait

Le même code que dans le chapitre précédent peut être écrit comme suit en Python 3.7 ou version ultérieure. (À proprement parler, il y a une légère différence entre le fait que celui utilisé dans ce chapitre est «coroutine» et celui utilisé dans le chapitre précédent est «générateur».)

import asyncio
import time

async def f(tag):
    for i in range(3):
        await asyncio.sleep(1)
        print("waiting for f(%d)" % tag)
    return "hello %d" % tag

loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)

Dans ce format, vous pouvez utiliser ʻasyncio.sleep () au lieu de my_sleep () `.

De plus, si vous n'avez qu'une seule tâche, vous pouvez l'écrire encore plus facilement en utilisant ʻasyncio.run () `.

import asyncio
import time

async def g():
    await asyncio.sleep(10)
    return "hello"

async def f():
    return await g()

asyncio.run(f())

<détails>

Code de bibliothèque associé </ summary>

Lib/asyncio/tasks.py


async def sleep(delay, result=None):
    loop = events.get_running_loop()
    future = loop.create_future()
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)
    return await future

Lib/asyncio/runner.py


def run(main):
    loop = events.new_event_loop()
    return loop.run_until_complete(main)

à la fin

Je rassemblais de tels articles dans le brouillon, mais comme d'autres personnes avaient publié des articles similaires, j'ai également décidé de publier (?) À la hâte.

Recommended Posts