Wenn Sie in Python eine zeitaufwändige Berechnung durchführen und das Ergebnis erhalten möchten, fügen Sie den Prozess normalerweise in eine Funktion ein und erhalten ihn als Rückgabewert, wenn Sie die Funktion ausführen. Dies wird als synchrone Verarbeitung bezeichnet.
Andererseits gibt es eine asynchrone Verarbeitung als ein Konzept, das sich von der synchronen Verarbeitung unterscheidet. Dies tauscht Folgendes zwischen dem Prozess, der eine Berechnung anfordert (Empfänger), und dem Prozess, der die Berechnung tatsächlich ausführt (Absender), über ein Objekt namens "Zukunft" aus.
Die Verarbeitung bis zu diesem Punkt ist beispielsweise wie folgt.
import asyncio
import time
def f(future):
time.sleep(5) #Zeitaufwändiger Prozess
future.set_result("hello")
return
future = asyncio.futures.Future()
f(future)
if future.done():
res = future.result()
print(res)
Wenn ich das mache, sagt es "Hallo", nachdem ich 5 Sekunden gewartet habe.
Lib/asyncio/futures.py
class Future:
_state = _PENDING
_result = None
def done(self):
return self._state != _PENDING
def result(self):
if self._state != _FINISHED:
raise exceptions.InvalidStateError('Result is not ready.')
return self._result
def set_result(self, result):
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
Wie Sie bereits bemerkt haben, entspricht der obige Code einem regulären Funktionsaufruf, außer dass ein "Future" -Objekt verwendet wird. Dies liegt daran, dass der Empfänger den Code des Absenders direkt ausführt. Dies nutzt "Zukunft" nicht aus.
Hier kommt das Konzept der Event-Loops ins Spiel. Eine Ereignisschleife ist ein Objekt mit 0 oder 1 Objekten pro Thread und hat die Funktion, registrierte Funktionen auszuführen.
Lass es uns tatsächlich benutzen.
import asyncio
import time
def f(future):
time.sleep(5) #Zeitaufwändiger Prozess
future.set_result("hello")
return
loop = asyncio.get_event_loop()
future = asyncio.futures.Future()
loop.call_soon(f, future)
loop.run_forever()
Im obigen Code rufen wir "asyncio.get_event_loop" auf, um das "BaseEventLoop" -Objekt abzurufen. Dann wird die Funktion f
von call_soon
in loop
registriert. Schließlich wird die Ereignisschleife mit "loop.run_forever ()" ausgeführt.
Wenn ich das tatsächlich mache, bin ich mit run_forever ()
in einer Endlosschleife und das Programm endet nie. Stattdessen können Sie die Ereignisschleife automatisch stoppen, nachdem die Ausführung der Funktion f ()
abgeschlossen wurde, indem Sie Folgendes schreiben:
res = loop.run_until_complete(future)
print(res)
Wie kann run_until_complete ()
den Abschluss der Funktionf ()
erkennen? Dies verwendet einen Mechanismus, der als "zukünftiger" Rückruf bezeichnet wird.
In "run_until_complete ()" wird zuerst die Funktion "future.add_done_callback ()" ausgeführt, um den Rückruf in "future" zu setzen. Dann wird "run_forever ()" aufgerufen und die Funktion "f ()" ausgeführt. Wenn dann der Wert durch "future.set_result ()" in der Funktion "f ()" gesetzt wird, wird der durch "add_done_callback ()" gesetzte Rückruf aufgerufen. In dem durch "run_until_complete ()" festgelegten Rückruf wird "loop.stop ()" verwendet, um das Ende der Ereignisschleife zu reservieren, sodass die Ereignisschleife nach dem Ende der Ausführung von "f ()" beendet wird. ..
Beachten Sie, dass "future.set_result ()" nicht ausgeführt wird und die Funktion "f ()" nicht sofort beendet wird. Das Ende ist nur reserviert und die Ausführung wird tatsächlich bis zur Rückkehr fortgesetzt.
Lib/asyncio/events.py
import contextvars
class Handle:
def __init__(self, callback, args, loop):
self._context = contextvars.copy_context()
self._loop = loop
self._callback = callback
self._args = args
def _run(self):
self._context.run(self._callback, *self._args)
Lib/asyncio/base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
self._stopping = False
self._ready = collections.deque()
def _call_soon(self, callback, args, context):
handle = events.Handle(callback, args, self, context)
self._ready.append(handle)
return handle
def _run_once(self):
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
handle._run()
def run_forever(self):
while True:
self._run_once()
if self._stopping:
break
def run_until_complete(self, future):
def _run_until_complete_cb(fut):
self.stop()
future.add_done_callback(_run_until_complete_cb)
self.run_forever()
return future.result()
def stop(self):
self._stopping = True
Lib/asyncio/futures.py
class Future:
def add_done_callback(self, fn):
context = contextvars.copy_context()
self._callbacks.append((fn, context))
def set_result(self, result):
# ...Kürzung
for callback, ctx in self._callbacks[:]:
self._loop.call_soon(callback, self, context=ctx)
Im vorherigen Kapitel wurde die Verarbeitung mithilfe einer Ereignisschleife ausgeführt. Das einzige, was sich geändert hat, war, dass die Funktion "f", die eine zeitaufwändige Verarbeitung durchführt, nicht direkt ausgeführt wurde, sondern über eine Ereignisschleife. Dies ändert nichts an dem, was Sie tun.
Die wahre Natur der Ereignisschleife kommt ins Spiel, wenn mehrere Prozesse ausgeführt werden. Lass es uns tatsächlich tun.
import asyncio
import time
def f(future, tag):
for _ in range(3):
time.sleep(1)
print("waiting for f(%d)" % tag)
future.set_result("hello %d" % tag)
return
loop = asyncio.get_event_loop()
futures = []
for tag in range(3):
future = loop.create_future()
loop.call_soon(f, future, tag)
futures += [future]
res = loop.run_until_complete(asyncio.gather(*futures))
print(res)
Dieser Code registriert drei Prozesse. Wir verwenden auch eine neue Funktion namens "asyncio.gather", um mehrere "Future" in einer zu bündeln. Das Ergebnis dieser Ausführung ist wie folgt.
waiting for f(0)
waiting for f(0)
waiting for f(0)
waiting for f(1)
waiting for f(1)
waiting for f(1)
waiting for f(2)
waiting for f(2)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']
Beachten Sie, dass, wie Sie aus diesem Ergebnis ersehen können, "f (0)", "f (1)", "f (2)" nicht parallel laufen. Wie Sie dem Quellcode der Bibliothek entnehmen können, werden in loop.run_until_complete ()
die in loop._ready
registrierten Rückrufe nur nacheinander ausgeführt.
Lib/asyncio/tasks.py
class _GatheringFuture(futures.Future):
def __init__(self, children, *, loop=None):
super().__init__(loop=loop)
self._children = children
self._cancel_requested = False
def gather(*coros_or_futures, loop=None, return_exceptions=False):
def _done_callback(fut):
nonlocal nfinished
nfinished += 1
if nfinished == nfuts:
results = []
for fut in children:
res = fut.result()
results.append(res)
outer.set_result(results)
arg_to_fut = {}
children = []
nfuts = 0
nfinished = 0
for arg in coros_or_futures:
nfuts += 1
fut.add_done_callback(_done_callback)
children.append(fut)
outer = _GatheringFuture(children, loop=loop)
return outer
Lassen Sie uns nun den Python-Generator entgleisen und sehen. Ein Generator ist eine "Funktion, die einen Iterator zurückgibt". Durch Ausführen des Generators wird ein Generatorobjekt zurückgegeben. Das Generatorobjekt implementiert die Funktion iter (), die einen Iterator darstellt. Der Generator ist wie folgt implementiert.
def generator():
yield 1
yield 2
yield 3
return "END"
gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
print(gg.__next__())
except StopIteration as e:
print(e.value)
Hier hat "Ausbeute" die Funktion, die Verarbeitung des Inhalts des Generators vorübergehend zu stoppen. Die Generatoren können auch in zwei Schichten gestapelt werden.
def generator2():
yield 1
yield 2
yield 3
return "END"
def generator():
a = yield from generator2()
return a
gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
print(gg.__next__())
except StopIteration as e:
print(e.value)
Beide Ausführungsergebnisse
1
2
3
END
Es wird sein.
Wie im vorherigen Kapitel erwähnt, führen Sie beim Ausführen mehrerer Funktionen mit "loop.run_until_complete" die zweite Funktion aus, nachdem die Ausführung der ersten Funktion abgeschlossen ist, und so weiter. Die Funktionen werden nicht parallel, sondern nacheinander ausgeführt. Wenn Sie hier einen Generator anstelle einer Funktion verwenden, ist dies wie folgt.
import asyncio
import time
def f(tag):
for _ in range(3):
yield
time.sleep(1)
print("waiting for f(%d)" % tag)
return "hello %d" % tag
loop = asyncio.get_event_loop()
tasks = []
for tag in range(3):
task = f(tag)
tasks += [task]
res = loop.run_until_complete(asyncio.gather(*tasks))
print(res)
Hier habe ich in der Funktion f () einen Befehl Yield hinzugefügt und das Berechnungsergebnis als Return anstelle von future.set_result zurückgegeben. Das Argument "Zukunft" wird nicht mehr benötigt und wurde entfernt.
Das Ergebnis dieser Ausführung ist wie folgt.
waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']
Im vorherigen Kapitel wurde "f (0)" dreimal angezeigt, dann wurde "f (1)" angezeigt und ... wurde in "f (0)", "f (1)" geändert. , f (2)
werden nun in dieser Reihenfolge angezeigt. Dies liegt daran, dass selbst wenn mehrere Aufgaben in der Ereignisschleife registriert sind, sie alle in einem Thread ausgeführt werden. Außerdem können Ereignisschleifen die Ausführung von Python-Funktionen nicht unterbrechen, sodass sie eine Funktion so lange ausführen müssen, bis die Funktion freiwillig beendet wird, z. B. durch "return".
Wenn Sie dagegen einen Generator verwenden, wird die Ausführung der Funktion durch "Yield" unterbrochen. Da die Verarbeitung zu diesem Zeitpunkt zur Seite der Ereignisschleife zurückkehrt, ist es möglich, die von der Ereignisschleife ausgeführte Aufgabe zu wechseln.
Das kleinste Beispiel für die Verwendung eines Generators ist übrigens: (Es macht keinen Sinn, daraus einen Generator zu machen, da es nur eine Aufgabe gibt ...)
import asyncio
import time
def f():
time.sleep(5) #Zeitaufwändiger Prozess
yield
return "hello"
loop = asyncio.get_event_loop()
ret = loop.run_until_complete(f())
print(ret)
Übrigens wurde in der Version, die den Generator nicht verwendet, "loop.call_soon ()" aufgerufen und die Funktion "f ()" in der Ereignisschleife registriert, aber diejenigen, die daran zweifelten, dass dies in diesem Kapitel nicht aufgerufen wurde. Ich denke es gibt viele. Insbesondere ist es wie folgt.
Funktionsname | Streit(Zukünftige Version) | Streit(Generatorversion) |
---|---|---|
f() |
future |
Keiner |
loop.call_soon() |
f |
-- |
loop.run_until_complete() |
future |
f |
Wenn das angegebene Argument in "run_until_complete ()" ein Generatorobjekt ist (erhalten durch Aufrufen der als Generator definierten Funktion "f ()"), dann eine "Task" -Instanz (eine Unterklasse von "Future"). Generieren. Call_soon ()
wird zum Zeitpunkt dieser Generation intern aufgerufen.
Lib/asyncio/base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def run_until_complete(self, future):
future = tasks.ensure_future(future, loop=self)
future.add_done_callback(_run_until_complete_cb)
self.run_forever()
return future.result()
Lib/asyncio/tasks.py
def ensure_future(coro_or_future, loop):
if isinstance(coro_or_future, types.CoroutineType) or isinstance(coro_or_future, types.GeneratorType):
task = tasks.Task(coro_or_future, loop=loop)
return task
else:
return coro_or_future
class Task(futures.Future):
def __init__(self, coro, loop=None):
super().__init__(loop=loop)
self._coro = coro
self._loop = loop
self._context = contextvars.copy_context()
loop.call_soon(self.__step, context=self._context)
_register_task(self)
def __step(self, exc=None):
coro = self._coro
self._fut_waiter = None
try:
result = coro.send(None)
except StopIteration as exc:
super().set_result(exc.value)
else:
self._loop.call_soon(self.__step, context=self._context)
In den bisherigen Beispielen wurde "time.sleep ()" häufig verwendet. Dies dient natürlich zur Veranschaulichung der "zeitaufwändigen Verarbeitung", aber Sie möchten möglicherweise aus praktischen Gründen tatsächlich "sleep ()" verwenden. Zum Beispiel
In einem solchen Fall kann "time.sleep ()" jedoch nicht in der Unterverarbeitung verwendet werden. Dies liegt daran, dass nach Ausführung von time.sleep ()
im Unterprozess der Hauptprozess im Schlaf nicht fortgesetzt werden kann, bis time.sleep ()
endet. Dies liegt daran, dass das Sub die Ereignisschleife weiterhin belegt.
Ich möchte eine bestimmte Zeit in einer Aufgabe warten, aber ich möchte den Prozess während der Wartezeit in die Ereignisschleife zurückführen. In solchen Fällen können Sie die Funktion loop.call_later ()
verwenden. Diese Funktion führt die angegebene Funktion aus, nachdem auf die angegebene Anzahl von Sekunden gewartet wurde.
Mit dieser Eigenschaft können Sie "my_sleep ()" wie folgt implementieren:
import asyncio
import time
def my_sleep(delay):
def _cb_set_result(fut):
fut.set_result(None)
loop = asyncio.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay, _cb_set_result, future)
yield from future
def f(tag):
for i in range(3):
yield from my_sleep(1)
print("waiting for f(%d)" % tag)
return "hello %d" % tag
loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)
Dies ist eine Umschreibung des Prozesses im vorherigen Kapitel mit my_sleep ()
. Im vorherigen Kapitel haben wir 3 Sekunden auf jeden der 3 Prozesse gewartet, sodass es insgesamt 9 Sekunden dauerte. Dieser Vorgang ist jedoch in ca. 3 Sekunden abgeschlossen.
Es kann etwas komplizierter sein. Angenommen, Sie rufen eine Funktion in einer Aufgabe auf und diese Funktion versucht, "my_sleep ()" auszuführen. In diesem Fall ist es in Ordnung, die als Generator aufzurufende Funktion wie folgt zu definieren.
def g():
yield from my_sleep(10)
return "hello"
def f():
ret = yield from g()
return ret
loop = asyncio.get_event_loop()
ret = loop.run_until_complete(asyncio.gather(f()))
print(ret)
Möglicherweise haben Sie bemerkt, dass im oben beschriebenen Code "my_sleep ()" die letzte Zeile "Ausbeute aus Zukunft" lautete. Ich habe yield
verwendet, um den Wert festzulegen, der zurückgegeben werden soll, wenn der Generator __next__ ()
aufgerufen wird. Im Gegenteil, bei der Angabe eines anderen Iterators wurde "Ausbeute von" angegeben. Warum verwenden Sie "Yield from", um "Future" zurückzugeben, das kein Iterator ist, sondern nur ein Feld, dem Sie Ergebnisse zuweisen können?
Aus technischen Gründen ist die "Future" -Instanz tatsächlich ein Iterator! "Future" implementiert "iter ()", und diese Funktion sieht folgendermaßen aus:
class Future:
#....
def __iter__(self):
yield self
Das heißt, die Iteration von my_sleep ()
sieht folgendermaßen aus:
my_sleep
mit go = my_sleep (1)
2.> Generiere einen Iterator mit it = go .__ iter__ ()
(dies ist dasselbe wie go
)
3.> res = it .__ next__ ()
wird ausgeführt, um das erste Element von my_sleep
zu erhalten
4.> Die Ausführung des Inhalts von my_sleep ()
beginnt.Der Ausdruck auf der rechten Seite von "Ausbeute von" in "my_sleep ()" wird ausgewertet und "Zukunft" generiert.
it_inner = future .__ iter__ ()
wird ausgeführt.
res_inner = it_inner .__ next__ ()
wird ausgeführt. Dies ist das gleiche wie "Zukunft". 8.>res_inner
ist der Rückgabewert vonit .__ next__ ()
. Das heißt, "res = future"
Ein weiterer politischer Grund ist, dass wir in der Lage sein wollten, den Generator (oder das Collout) und "Future" in derselben Linie zu handhaben. Dies hängt auch mit dem "Warten" im nächsten Kapitel zusammen.
Lib/asyncio/tasks.py
class Task(futures.Future):
def __step(self, exc=None):
coro = self._coro
try:
result = coro.send(None)
except StopIteration as exc:
super().set_result(exc.value)
elif result != None:
result.add_done_callback(self.__wakeup, context=self._context)
else:
self._loop.call_soon(self.__step, context=self._context)
def __wakeup(self, future):
self.__step()
async`` await
Der gleiche Code wie im vorherigen Kapitel kann in Python 3.7 oder höher wie folgt geschrieben werden. (Genau genommen gibt es einen kleinen Unterschied, dass die in diesem Kapitel verwendete "Coroutine" und die im vorherigen Kapitel verwendete "Generator" ist.)
import asyncio
import time
async def f(tag):
for i in range(3):
await asyncio.sleep(1)
print("waiting for f(%d)" % tag)
return "hello %d" % tag
loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)
In diesem Format können Sie "asyncio.sleep ()" anstelle von "my_sleep ()" verwenden.
Wenn Sie nur eine Aufgabe haben, können Sie diese mit asyncio.run ()
noch einfacher schreiben.
import asyncio
import time
async def g():
await asyncio.sleep(10)
return "hello"
async def f():
return await g()
asyncio.run(f())
Lib/asyncio/tasks.py
async def sleep(delay, result=None):
loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
return await future
Lib/asyncio/runner.py
def run(main):
loop = events.new_event_loop()
return loop.run_until_complete(main)
Ich habe solche Artikel im Entwurf gesammelt, aber da andere Leute ähnliche Artikel veröffentlicht hatten, habe ich mich auch entschlossen, (?) In Eile zu veröffentlichen.
Recommended Posts