I considered a practice of python asynchronous processing that meets the following requirements.
--I want to execute main processing and asynchronous processing in parallel. --I want to wait until the execution of asynchronous processing is completed at the specified location in the main processing. --I want to get the return value of asynchronous processing with future.
The code is a bit verbose, but I've included all the sources, including loggers, so you can copy and paste it as is.
multiprocessing1.py
from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging
#Get logger
def get_logger():
logger = logging.getLogger("multiprocesssing_test")
logger.setLevel(logging.DEBUG)
logger.propagate = False
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
ch.setFormatter(ch_formatter)
logger.addHandler(ch)
return logger
logger = get_logger()
def async_func(name, sleep_time):
#Get thread id
thread_id = th.get_ident()
logger.info(f"thread_id:{thread_id} name:{name} async_start func")
time.sleep(sleep_time)
logger.info(f"thread_id:{thread_id} name:{name} async_end of func")
return f"{thread_id}-{name}"
if __name__ == "__main__":
#Create a thread pool for thread execution
#Maximum simultaneous threads in processes
pool = ThreadPool(processes=1)
#Get thread id
thread_id = th.get_ident()
#Execute asynchronous processing. Specify the function object as the first argument and the argument as the second argument.
logger.info(f"thread_id:{thread_id}Call asynchronous processing from main")
future = pool.apply_async(async_func, ("Thread 1", 10))
#Processing that you want to execute in the main thread in parallel with asynchronous processing
logger.info(f"thread_id:{thread_id}main Start processing during asynchronous processing")
time.sleep(5)
logger.info(f"thread_id:{thread_id}main End of processing during asynchronous processing")
#Wait for the asynchronous process to finish and get the result.
result = future.get()
logger.info(f"thread_id:{thread_id}Get the result of asynchronous processing:{result}")
pool.close()
2020-10-15 16:43:27,073 - thread_id:18440 Call asynchronous processing from main
2020-10-15 16:43:27,074 - thread_id:18440 main Start processing during asynchronous processing
2020-10-15 16:43:27,074 - thread_id:18132 name:Thread 1 async_start func
2020-10-15 16:43:32,074 - thread_id:18440 main End of processing during asynchronous processing
2020-10-15 16:43:37,075 - thread_id:18132 name:Thread 1 async_end of func
2020-10-15 16:43:37,075 - thread_id:18440 Get the result of asynchronous processing:18132-Thread 1
From the log, you can see that at 16:43:27, the "main asynchronous processing in progress" and "async_func start" processes are being executed in parallel at the same time.
multiprocessing2.py
from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging
#Get logger
def get_logger():
logger = logging.getLogger("multiprocesssing_test")
logger.setLevel(logging.DEBUG)
logger.propagate = False
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
ch.setFormatter(ch_formatter)
logger.addHandler(ch)
return logger
logger = get_logger()
def async_func(name, sleep_time):
#Get thread id
thread_id = th.get_ident()
logger.info(f"thread_id:{thread_id} name:{name} async_start func")
time.sleep(sleep_time)
logger.info(f"thread_id:{thread_id} name:{name} async_end of func")
return f"{thread_id}-{name}"
if __name__ == "__main__":
#Create a thread pool for thread execution
#Maximum simultaneous threads in processes
pool = ThreadPool(processes=5)
#Get thread id
thread_id = th.get_ident()
#Execute asynchronous processing. Specify the function object as the first argument and the argument as the second argument.
logger.info(f"thread_id:{thread_id}Call asynchronous processing from main")
futures = []
for i in range(5):
future = pool.apply_async(async_func, (f"thread{i + 1}", 10)) # Tuple of args for foo
futures.append(future)
#Processing that you want to execute in the main thread in parallel with asynchronous processing
logger.info(f"thread_id:{thread_id}main Start processing during asynchronous processing")
time.sleep(5)
logger.info(f"thread_id:{thread_id}main End of processing during asynchronous processing")
#Wait for the asynchronous process to finish and get the result.
results = [future.get() for future in futures]
logger.info(f"thread_id:{thread_id}Get the result of asynchronous processing:{results}")
pool.close()
2020-10-15 16:47:41,977 - thread_id:13448 Call asynchronous processing from main
2020-10-15 16:47:41,978 - thread_id:13448 main Start processing during asynchronous processing
2020-10-15 16:47:41,979 - thread_id:23216 name:Thread 1 async_start func
2020-10-15 16:47:41,979 - thread_id:21744 name:Thread 2 async_start func
2020-10-15 16:47:41,979 - thread_id:21708 name:Thread 3 async_start func
2020-10-15 16:47:41,979 - thread_id:21860 name:Thread 4 async_start func
2020-10-15 16:47:41,979 - thread_id:22100 name:Thread 5 async_start func
2020-10-15 16:47:46,980 - thread_id:13448 main End of processing during asynchronous processing
2020-10-15 16:47:51,982 - thread_id:21744 name:Thread 2 async_end of func
2020-10-15 16:47:51,982 - thread_id:23216 name:Thread 1 async_end of func
2020-10-15 16:47:51,983 - thread_id:21708 name:Thread 3 async_end of func
2020-10-15 16:47:51,984 - thread_id:21860 name:Thread 4 async_end of func
2020-10-15 16:47:51,984 - thread_id:22100 name:Thread 5 async_end of func
2020-10-15 16:47:51,986 - thread_id:13448 Get the result of asynchronous processing:['23216-Thread 1', '21744-Thread 2', '21708-Thread 3', '21860-Su
Red 4', '22100-Thread 5']
From the log, you can see that at 16:47:41, five processes, "main asynchronous process being executed" and "async_func start", are being executed in parallel at the same time.
Also, if you reduce the number of processes by using ThreadPool (processes = 3)
etc., 3 threads will be executed first, 2 will be in the waiting state, and a new thread will be executed when completed.
asyncio1.py
import asyncio
import itertools
import time
import profile
import random
import time
import threading as th
import logging
#Get logger
def get_logger():
logger = logging.getLogger("asyncio_test")
logger.setLevel(logging.DEBUG)
logger.propagate = False
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
ch.setFormatter(ch_formatter)
logger.addHandler(ch)
return logger
logger = get_logger()
#Get something like task id
#* Since asyncio uses a generator internally
#The thread ID will be the same, and the acquisition method of the ID corresponding to the asynchronous processing will be as follows.
_next_id = itertools.count().__next__
def get_task_id():
return _next_id()
async def async_func(name, sleep_time):
#Get task id
task_id = get_task_id()
logger.info(f"task_id:{task_id} name:{name} async_start func")
await asyncio.sleep(sleep_time)
logger.info(f"task_id:{task_id} name:{name} async_end of func")
return f"{task_id}-{name}"
async def async_func_caller():
#Get task id
task_id = get_task_id()
#Generate asynchronous processing task
#* At this point, the task is only generated and not executed.
# loop.run_until_Executed when calling complete.
futures = [asyncio.ensure_future(async_func(f"task{i + 1}", 10)) for i in range(5)]
#Processing that you want to execute in the main thread in parallel with asynchronous processing
logger.info(f"task_id:{task_id} async_func_caller Start processing during asynchronous processing execution")
await asyncio.sleep(5)
logger.info(f"task_id:{task_id} async_func_caller End of processing during asynchronous processing execution")
#Wait for the asynchronous process to finish and get the result.
results = await asyncio.gather(*futures)
return results
if __name__ == "__main__":
#Create a thread pool for asynchronous processing execution
loop = asyncio.get_event_loop()
logger.info(f"main Start processing during asynchronous processing")
#Execute asynchronous processing and wait until the end
ret = loop.run_until_complete(async_func_caller())
logger.info(f"main End of processing during asynchronous processing Result:{ret}")
loop.close()
2020-10-15 16:49:40,132 -main Start processing during asynchronous processing
2020-10-15 16:49:40,134 - task_id:0 async_func_caller Start processing during asynchronous processing execution
2020-10-15 16:49:40,134 - task_id:1 name:task1 async_start func
2020-10-15 16:49:40,135 - task_id:2 name:task2 async_start func
2020-10-15 16:49:40,135 - task_id:3 name:task3 async_start func
2020-10-15 16:49:40,136 - task_id:4 name:task4 async_start func
2020-10-15 16:49:40,136 - task_id:5 name:task5 async_start func
2020-10-15 16:49:45,138 - task_id:0 async_func_caller End of processing during asynchronous processing execution
2020-10-15 16:49:50,141 - task_id:2 name:task2 async_end of func
2020-10-15 16:49:50,142 - task_id:5 name:task5 async_end of func
2020-10-15 16:49:50,142 - task_id:4 name:task4 async_end of func
2020-10-15 16:49:50,144 - task_id:1 name:task1 async_end of func
2020-10-15 16:49:50,144 - task_id:3 name:task3 async_end of func
2020-10-15 16:49:50,145 -main End of processing during asynchronous processing Result:['1-task1', '2-task2', '3-task3', '4-task4', '5-task5']
From the log, you can see that at 16:49:40, five processes, "main asynchronous process being executed" and "async_func start", are being executed in parallel at the same time.
We hope this will be helpful when implementing asynchronous processing.
Recommended Posts