Python asynchronous processing ~ Full understanding of async and await ~

About Future

When you perform a time-consuming calculation in Python and want to get the result, you usually put the process into a function and get it as the return value when you execute the function. This is called synchronous processing.

On the other hand, there is asynchronous processing as a concept different from synchronous processing. This exchanges the following between the process that requests the calculation (receiver) and the process that performs the actual calculation (sender) via an object called Future.

-(receiver) Create a Future object. -(receiver) Execute receiver by some means. -(sender) Performs a time-consuming calculation and writes the calculation result to the Future object generated by the receiver. -(receiver) Check the Future object, and if the calculation result is stored, get the calculation result.

The processing up to this point is as follows, for example.

import asyncio
import time

def f(future):
    time.sleep(5) #Time-consuming process
    future.set_result("hello")
    return

future = asyncio.futures.Future()
f(future)

if future.done():
    res = future.result()
    print(res)

When I do this, it says "hello" after waiting 5 seconds.

For the time being, the relevant code for `Future` is as follows. (Partially omitted)

Lib/asyncio/futures.py


class Future:
    _state = _PENDING
    _result = None

    def done(self):
        return self._state != _PENDING

    def result(self):
        if self._state != _FINISHED:
            raise exceptions.InvalidStateError('Result is not ready.')
        return self._result

    def set_result(self, result):
        if self._state != _PENDING:
            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
        self._result = result
        self._state = _FINISHED

Event loop

As you've already noticed, the above code is the same as a normal function call, except that it uses a Future object. This is because the receiver is executing the sender's code directly. This does not take advantage of Future.

This is where the concept of event loops comes into play. An event loop is an object that has 0 or 1 objects in 1 thread, and has the function of executing registered functions.

Let's actually use it.

import asyncio
import time

def f(future):
    time.sleep(5) #Time-consuming process
    future.set_result("hello")
    return

loop = asyncio.get_event_loop()
future = asyncio.futures.Future()
loop.call_soon(f, future)
loop.run_forever()

In the code above, we are calling ʻasyncio.get_event_loop to get the BaseEventLoopobject. Then, the functionf is registered in loopbycall_soon. Finally, the event loop is executed with loop.run_forever ()`.

When I actually do this, I'm in an infinite loop with run_forever () and the program never ends. Instead, you can automatically stop the event loop after the function f () has finished executing by writing:

res = loop.run_until_complete(future)
print(res)

How can run_until_complete () know the completion of the functionf ()? A mechanism called a future callback is used for this. In run_until_complete (), the function future.add_done_callback () is first executed, and the callback is set in future. Then run_forever () is called and the function f () is executed. If the value is then set by future.set_result () in the function f (), the callback set by ʻadd_done_callback ()will be called. In the callback set byrun_until_complete (), the event loop is stopped after the execution of f ()ends because the process of reserving the end of the event loop is performed byloop.stop ()`. ..

Note that future.set_result () is not executed and the function f () is not immediately terminated. Only the end is reserved, and the execution actually continues until return.

The code of the related library is posted.

Lib/asyncio/events.py


import contextvars
class Handle:
    def __init__(self, callback, args, loop):
        self._context = contextvars.copy_context()
        self._loop = loop
        self._callback = callback
        self._args = args

    def _run(self):
        self._context.run(self._callback, *self._args)

Lib/asyncio/base_events.py


class BaseEventLoop(events.AbstractEventLoop):
    def __init__(self):
        self._stopping = False
        self._ready = collections.deque()
        
    def _call_soon(self, callback, args, context):
        handle = events.Handle(callback, args, self, context)
        self._ready.append(handle)
        return handle
    
    def _run_once(self):
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            handle._run()
        
    def run_forever(self):
        while True:
            self._run_once()
            if self._stopping:
                break

    def run_until_complete(self, future):
        def _run_until_complete_cb(fut):
            self.stop()
        future.add_done_callback(_run_until_complete_cb)
        self.run_forever()
        return future.result()

    def stop(self):
        self._stopping = True

Lib/asyncio/futures.py


class Future:
    def add_done_callback(self, fn):
        context = contextvars.copy_context()
        self._callbacks.append((fn, context))

    def set_result(self, result):
        # ...abridgement
        for callback, ctx in self._callbacks[:]:
            self._loop.call_soon(callback, self, context=ctx)

Execution of multiple processes using an event loop

In the previous chapter, processing was executed using an event loop. However, the only thing that changed was that the function f, which performs time-consuming processing, was not executed directly, but was executed via an event loop. This does not change what you are doing.

The true nature of the event loop comes into play when you perform multiple processes. Let's actually do it.

import asyncio
import time

def f(future, tag):
    for _ in range(3):
        time.sleep(1)
        print("waiting for f(%d)" % tag)
    future.set_result("hello %d" % tag)
    return

loop = asyncio.get_event_loop()
futures = []
for tag in range(3):
    future = loop.create_future()
    loop.call_soon(f, future, tag)
    futures += [future]
res = loop.run_until_complete(asyncio.gather(*futures))
print(res)

This code registers three processes. We also use a new function called ʻasyncio.gather to bundle multiple Future`s into one. The result of this execution is as follows.

waiting for f(0)
waiting for f(0)
waiting for f(0)
waiting for f(1)
waiting for f(1)
waiting for f(1)
waiting for f(2)
waiting for f(2)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']

Note that as you can see from this result, f (0), f (1), f (2) are not running in parallel. As you can see from the source code of the library, in loop.run_until_complete (), the callbacks registered in loop._ready are only executed sequentially.

I will post the code of the related library.

Lib/asyncio/tasks.py


class _GatheringFuture(futures.Future):
    def __init__(self, children, *, loop=None):
        super().__init__(loop=loop)
        self._children = children
        self._cancel_requested = False

def gather(*coros_or_futures, loop=None, return_exceptions=False):
    def _done_callback(fut):
        nonlocal nfinished
        nfinished += 1
        if nfinished == nfuts:
            results = []
            for fut in children:
                res = fut.result()
                results.append(res)
            outer.set_result(results)
    arg_to_fut = {}
    children = []
    nfuts = 0
    nfinished = 0
    for arg in coros_or_futures:
        nfuts += 1
        fut.add_done_callback(_done_callback)
        children.append(fut)
    outer = _GatheringFuture(children, loop=loop)
    return outer

Generator

Now let's derail and see about the Python generator. A generator is a "function that returns an iterator". Running the generator returns a generator object. The generator object implements the function __iter__ () that represents an iterator. The generator is implemented as follows.

def generator():
    yield 1
    yield 2
    yield 3
    return "END"

gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
    print(gg.__next__())
except StopIteration as e:
    print(e.value)

Here, yield has the function of temporarily stopping the processing of the contents of the generator. The generators can also be stacked in two layers.

def generator2():
    yield 1
    yield 2
    yield 3
    return "END"

def generator():
    a = yield from generator2()
    return a

gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
    print(gg.__next__())
except StopIteration as e:
    print(e.value)

Both execution results

1
2
3
END

It will be.

Execution by generator event loop

As mentioned in the previous chapter, when executing multiple functions using loop.run_until_complete, the second function is executed after the execution of the first function is completed, and so on. The functions are not executed in parallel, but in sequence. Here, if you use a generator instead of a function, it will be as follows.

import asyncio
import time

def f(tag):
    for _ in range(3):
        yield
        time.sleep(1)
        print("waiting for f(%d)" % tag)
    return "hello %d" % tag

loop = asyncio.get_event_loop()
tasks = []
for tag in range(3):
    task = f(tag)
    tasks += [task]
res = loop.run_until_complete(asyncio.gather(*tasks))
print(res)

Here, I added a yield instruction in the functionf ()and returned the calculation result as return instead of future.set_result. The argument future is no longer needed and has been removed.

The result of this execution is as follows.

waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']

In the previous chapter, f (0) was displayed three times, then f (1) was displayed, and ... was changed to f (0), f (1). , f (2) are now displayed in that order. This is because even if multiple tasks are registered in the event loop, they are all executed within one thread. Also, the event loop cannot pause the execution of a Python function, so it has to keep executing one function until it voluntarily stops, such as by return.

On the other hand, if you use a generator, yield will pause the execution of the function. Since the processing returns to the event loop side at this timing, the task executed by the event loop can be switched.

By the way, the smallest example of using a generator is: (This makes no sense to make it a generator because there is only one task ...)

import asyncio
import time

def f():
    time.sleep(5) #Time-consuming process
    yield
    return "hello"

loop = asyncio.get_event_loop()
ret = loop.run_until_complete(f())
print(ret)

By the way, in the version that does not use a generator, the function f () was registered in the event loop by calling loop.call_soon (), but those who have doubts that this is not called in this chapter. I think there are many. Specifically, it is as follows.

Function name argument(Future version) argument(Generator version)
f() future None
loop.call_soon() f --
loop.run_until_complete() future f

Within run_until_complete (), if the given argument is a generator object (obtained by calling the functionf ()defined as a generator), then a Task instance (a subclass of Future). To generate. Call_soon () is called internally at the time of this generation.

Related library code

Lib/asyncio/base_events.py


class BaseEventLoop(events.AbstractEventLoop):
    def run_until_complete(self, future):
        future = tasks.ensure_future(future, loop=self)
        future.add_done_callback(_run_until_complete_cb)
        self.run_forever()
        return future.result()

Lib/asyncio/tasks.py


def ensure_future(coro_or_future, loop):
    if isinstance(coro_or_future, types.CoroutineType) or isinstance(coro_or_future, types.GeneratorType):
        task = tasks.Task(coro_or_future, loop=loop)
        return task
    else:
        return coro_or_future
class Task(futures.Future):
    def __init__(self, coro, loop=None):
        super().__init__(loop=loop)
        self._coro = coro
        self._loop = loop
        self._context = contextvars.copy_context()

        loop.call_soon(self.__step, context=self._context)
        _register_task(self)
    
    def __step(self, exc=None):
        coro = self._coro
        self._fut_waiter = None
        try:
            result = coro.send(None)
        except StopIteration as exc:
            super().set_result(exc.value)
        else:
            self._loop.call_soon(self.__step, context=self._context)

Perform other tasks during sleep

In the examples so far, time.sleep () was heavily used. This is, of course, to illustrate "time-consuming processing", but you may want to actually sleep () for practical purposes. For example

--Time-out processing that cancels after waiting for a certain period of time as a sub processing while performing network communication in the main processing --Display progress in sub-processes while performing time-consuming calculations in the main process

However, in such a case, time.sleep () cannot be used in the sub processing. This is because once time.sleep () is executed in the sub process, the main process cannot be continued while sleeping, until time.sleep () ends. This is because the sub will continue to occupy the event loop.

I want to wait for a certain period of time in a task, but I want to return the processing to the event loop during the waiting time. In such cases, you can use the loop.call_later () function. This function executes the given function after waiting a specified number of seconds. You can use this property to implement my_sleep () as follows:

import asyncio
import time

def my_sleep(delay):
    def _cb_set_result(fut):
        fut.set_result(None)
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    h = loop.call_later(delay, _cb_set_result, future)
    yield from future

def f(tag):
    for i in range(3):
        yield from my_sleep(1)
        print("waiting for f(%d)" % tag)
    return "hello %d" % tag

loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)

This is a rewrite of the process in the previous chapter using my_sleep (). In the previous chapter, we waited 3 seconds for each of the 3 processes, so it took a total of 9 seconds. However, this process finishes in about 3 seconds.

It can be a little more complicated. For example, suppose you are calling a function in a task and that function is trying to my_sleep (). In this case, it is okay to define the function to be called as a generator as follows.

def g():
    yield from my_sleep(10)
    return "hello"

def f():
    ret = yield from g()
    return ret

loop = asyncio.get_event_loop()
ret = loop.run_until_complete(asyncio.gather(f()))
print(ret)

Why yield from future instead of yield future?

You may have noticed that in the my_sleep () code described above, the last line was yield from future. I used yield to set the value to return when the generator__next__ ()is called. Conversely, yield from was specified when specifying another iterator. Why are you using yield from to replace Future, which is not an iterator, just a box to assign results to?

For technical reasons, the Future instance is actually an iterator! Future implements __iter__ (), and this function looks like this:

class Future:
    #....
    def __iter__(self):
        yield self

That is, the iteration of my_sleep () looks like this:

  1. yield from my_sleep (1) is executed. 1.> Create a generator object for my_sleep withgo = my_sleep (1) 2.> Generate an iterator by ʻit = go.iter () (this is the same as go) 3.> res = it .__ next __ ()is executed to get the first element ofmy_sleep4.> Execution of the contents ofmy_sleep ()` starts.
  2. The expression on the right-hand side of yield from inmy_sleep ()is evaluated and future is generated.

  3. ʻit_inner = future.iter () `is executed.

  4. res_inner = it_inner.__next__ () is executed. This is the same as future. 8.> res_inner is the return value of ʻit.next (). That is, res = future`

Another political reason is that we wanted to be able to handle generators (or coroutines) and Future in the same line. This is also connected to ʻawait` in the next chapter.

Related library code

Lib/asyncio/tasks.py


class Task(futures.Future):
    def __step(self, exc=None):
        coro = self._coro
        try:
            result = coro.send(None)
        except StopIteration as exc:
            super().set_result(exc.value)
        elif result != None:
            result.add_done_callback(self.__wakeup, context=self._context)
        else:
            self._loop.call_soon(self.__step, context=self._context)
            
    def __wakeup(self, future):
        self.__step()

Use the ʻasync ʻawait keyword

The same code as in the previous chapter can be written as follows in Python 3.7 or later. (Strictly speaking, there are some differences: coroutine is used in this chapter and generator is used in the previous chapter.)

import asyncio
import time

async def f(tag):
    for i in range(3):
        await asyncio.sleep(1)
        print("waiting for f(%d)" % tag)
    return "hello %d" % tag

loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)

In this format, you can use ʻasyncio.sleep ()instead ofmy_sleep ()`.

Also, if you have only one task, you can write it even easier using ʻasyncio.run ()`.

import asyncio
import time

async def g():
    await asyncio.sleep(10)
    return "hello"

async def f():
    return await g()

asyncio.run(f())
Related library code

Lib/asyncio/tasks.py


async def sleep(delay, result=None):
    loop = events.get_running_loop()
    future = loop.create_future()
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)
    return await future

Lib/asyncio/runner.py


def run(main):
    loop = events.new_event_loop()
    return loop.run_until_complete(main)

at the end

I was collecting such articles in the draft, but since other people had published similar articles, I also decided to publish (?) In a hurry.

Recommended Posts

Python asynchronous processing ~ Full understanding of async and await ~
Full understanding of Python threading and multiprocessing
Full understanding of Python debugging
[Python] Asynchronous request with async / await
Speed comparison of Wiktionary full text processing with F # and Python
Full understanding of the concepts of Bellman-Ford and Dijkstra
[Python] A rough understanding of iterators, iterators, and generators
python async / await curio
Various processing of Python
Full-width and half-width processing of CSV data in Python
Get an abstract understanding of Python modules and packages
Summary of date processing in Python (datetime and dateutil)
Scraping using Python 3.5 async / await
Asynchronous processing (threading) in python
Post processing of python (NG)
Source installation and installation of Python
Environment construction of python and opencv
The story of Python and the story of NaN
Python parallel processing (multiprocessing and Joblib)
Installation of SciPy and matplotlib (Python)
Python and DB: Understanding DBI cursors
Hide websockets async / await in Python3
Flow of getting the result of asynchronous processing using Django and Celery
Async / await with Kivy and tkinter
python string processing map and lambda
Coexistence of Python2 and 3 with CircleCI (1.0)
Summary of Python indexes and slices
Reputation of Python books and reference books
A bird's-eye view of parallel and asynchronous processing from a full stack perspective (JavaScript (Node.js), Kotlin, Go, Java)
Rehabilitation of Python and NLP skills starting with "100 Language Processing Knock 2015" (Chapter 1)
Installation of Visual studio code and installation of python
Basics of binarized image processing with Python
Python hand play (calculated full of mordred)
Socket communication and multi-thread processing by Python
Answers and impressions of 100 language processing knocks-Part 1
A complete understanding of Python's asynchronous programming
A rough understanding of python-fire and a memo
(Java, JavaScript, Python) Comparison of string processing
Answers and impressions of 100 language processing knocks-Part 2
Extraction of tweet.js (json.loads and eval) (Python)
Connect a lot of Python or and and
Drawing with Matrix-Reinventor of Python Image Processing-
Easy introduction of python3 series and OpenCV3
[Python] Various combinations of strings and values
The story of blackjack A processing (python)
Idempotent automation of Python and PyPI setup
Status of each Python processing system in 2020
Matrix Convolution Filtering-Reinventor of Python Image Processing-
Project Euler # 1 "Multiples of 3 and 5" in Python
[Summary of 27 languages] My number check digit calculation method
Python asynchronous processing ~ Full understanding of async and await ~
Numerical summary of data
Summary of Tensorflow / Keras
Summary of pyenv usage
python async / await curio
Summary of Python arguments
Summary of logrotate software logrotate
Summary of test method
[For beginners] A word summary of popular programming languages (2018 version)
plot the coordinates of the processing (python) list and specify the number of times in draw ()
[PyCharm 5.0 new function] Visualize and debug the thread execution status of async / await
[Python] Eliminate conditional branching by if by making full use of Enum and eval
Asynchronous processing in Python: asyncio reverse lookup reference
Correspondence summary of array operation of ruby and python
Summary of the differences between PHP and Python
The answer of "1/2" is different between python2 and 3
View the result of geometry processing in Python
Specifying the range of ruby and python arrays
Installation of Python3 and Flask [Environment construction summary]
Compare the speed of Python append and map