J'ai envisagé une pratique de traitement asynchrone python qui répond aux exigences suivantes.
Le code est un peu redondant, mais toutes les sources, y compris les enregistreurs, sont répertoriées afin que vous puissiez le copier et le coller tel quel.
multiprocessing1.py
from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging
#Obtenez un enregistreur
def get_logger():
logger = logging.getLogger("multiprocesssing_test")
logger.setLevel(logging.DEBUG)
logger.propagate = False
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
ch.setFormatter(ch_formatter)
logger.addHandler(ch)
return logger
logger = get_logger()
def async_func(name, sleep_time):
#Obtenir l'identifiant du fil
thread_id = th.get_ident()
logger.info(f"thread_id:{thread_id} name:{name} async_démarrer func")
time.sleep(sleep_time)
logger.info(f"thread_id:{thread_id} name:{name} async_fin de func")
return f"{thread_id}-{name}"
if __name__ == "__main__":
#Créer un pool de threads pour l'exécution des threads
#Nombre maximal de threads simultanés dans les processus
pool = ThreadPool(processes=1)
#Obtenir l'identifiant du fil
thread_id = th.get_ident()
#Exécutez un traitement asynchrone. Spécifiez l'objet fonction comme premier argument et l'argument comme deuxième argument.
logger.info(f"thread_id:{thread_id}Appeler le traitement asynchrone à partir de main")
future = pool.apply_async(async_func, ("Fil 1", 10))
#Traitement que vous souhaitez exécuter dans le thread principal en parallèle avec le traitement asynchrone
logger.info(f"thread_id:{thread_id}main Démarrer le traitement pendant le traitement asynchrone")
time.sleep(5)
logger.info(f"thread_id:{thread_id}main Fin du traitement pendant le traitement asynchrone")
#Attendez que le processus asynchrone se termine et obtenez le résultat.
result = future.get()
logger.info(f"thread_id:{thread_id}Obtenez le résultat du traitement asynchrone:{result}")
pool.close()
2020-10-15 16:43:27,073 - thread_id:18440 Appel du traitement asynchrone depuis le
2020-10-15 16:43:27,074 - thread_id:18440 main Démarrer le traitement pendant le traitement asynchrone
2020-10-15 16:43:27,074 - thread_id:18132 name:Thread 1 asynchrone_démarrer func
2020-10-15 16:43:32,074 - thread_id:18440 main Fin de traitement pendant le traitement asynchrone
2020-10-15 16:43:37,075 - thread_id:18132 name:Thread 1 asynchrone_fin de func
2020-10-15 16:43:37,075 - thread_id:18440 Obtenir le résultat du traitement asynchrone:18132-Fil 1
Depuis le journal, vous pouvez voir qu'à 16:43:27, les processus du "processus asynchrone principal en cours d'exécution" et "async_func start" sont exécutés en parallèle en même temps.
multiprocessing2.py
from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging
#Obtenez un enregistreur
def get_logger():
logger = logging.getLogger("multiprocesssing_test")
logger.setLevel(logging.DEBUG)
logger.propagate = False
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
ch.setFormatter(ch_formatter)
logger.addHandler(ch)
return logger
logger = get_logger()
def async_func(name, sleep_time):
#Obtenir l'identifiant du fil
thread_id = th.get_ident()
logger.info(f"thread_id:{thread_id} name:{name} async_démarrer func")
time.sleep(sleep_time)
logger.info(f"thread_id:{thread_id} name:{name} async_fin de func")
return f"{thread_id}-{name}"
if __name__ == "__main__":
#Créer un pool de threads pour l'exécution des threads
#Nombre maximal de threads simultanés dans les processus
pool = ThreadPool(processes=5)
#Obtenir l'identifiant du fil
thread_id = th.get_ident()
#Exécutez un traitement asynchrone. Spécifiez l'objet fonction comme premier argument et l'argument comme deuxième argument.
logger.info(f"thread_id:{thread_id}Appeler le traitement asynchrone à partir de main")
futures = []
for i in range(5):
future = pool.apply_async(async_func, (f"fil{i + 1}", 10)) # Tuple of args for foo
futures.append(future)
#Traitement que vous souhaitez exécuter dans le thread principal en parallèle avec le traitement asynchrone
logger.info(f"thread_id:{thread_id}main Démarrer le traitement pendant le traitement asynchrone")
time.sleep(5)
logger.info(f"thread_id:{thread_id}main Fin du traitement pendant le traitement asynchrone")
#Attendez que le processus asynchrone se termine et obtenez le résultat.
results = [future.get() for future in futures]
logger.info(f"thread_id:{thread_id}Obtenez le résultat du traitement asynchrone:{results}")
pool.close()
2020-10-15 16:47:41,977 - thread_id:13448 Appeler le traitement asynchrone depuis le principal
2020-10-15 16:47:41,978 - thread_id:13448 principal Démarrer le traitement pendant le traitement asynchrone
2020-10-15 16:47:41,979 - thread_id:23216 name:Thread 1 asynchrone_démarrer func
2020-10-15 16:47:41,979 - thread_id:21744 name:Thread 2 asynchrone_démarrer func
2020-10-15 16:47:41,979 - thread_id:21708 name:Thread 3 asynchrone_démarrer func
2020-10-15 16:47:41,979 - thread_id:21860 name:Thread 4 asynchrone_démarrer func
2020-10-15 16:47:41,979 - thread_id:22100 name:Thread 5 asynchrone_démarrer func
2020-10-15 16:47:46,980 - thread_id:13448 main Fin du traitement pendant le traitement asynchrone
2020-10-15 16:47:51,982 - thread_id:21744 name:Thread 2 asynchrone_fin de func
2020-10-15 16:47:51,982 - thread_id:23216 name:Thread 1 asynchrone_fin de func
2020-10-15 16:47:51,983 - thread_id:21708 name:Thread 3 asynchrone_fin de func
2020-10-15 16:47:51,984 - thread_id:21860 name:Thread 4 asynchrone_fin de func
2020-10-15 16:47:51,984 - thread_id:22100 name:Thread 5 asynchrone_fin de func
2020-10-15 16:47:51,986 - thread_id:13448 Obtenir le résultat du traitement asynchrone:['23216-Fil 1', '21744-Fil 2', '21708-Fil 3', '21860-Su
Rouge 4', '22100-Fil 5']
Depuis le journal, vous pouvez voir qu'à 16:47:41, cinq processus, "processus asynchrone principal en cours" et "async_func start", sont exécutés en parallèle en même temps.
De plus, si vous réduisez le nombre de processus en utilisant ThreadPool (processus = 3)
etc., 3 threads seront exécutés en premier, 2 seront en état d'attente et un nouveau thread sera exécuté une fois terminé.
asyncio1.py
import asyncio
import itertools
import time
import profile
import random
import time
import threading as th
import logging
#Obtenez un enregistreur
def get_logger():
logger = logging.getLogger("asyncio_test")
logger.setLevel(logging.DEBUG)
logger.propagate = False
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
ch.setFormatter(ch_formatter)
logger.addHandler(ch)
return logger
logger = get_logger()
#Obtenez quelque chose comme l'identifiant de tâche
#* Depuis asyncio utilise un générateur en interne
#L'ID de thread sera le même et la méthode d'acquisition de l'ID correspondant au traitement asynchrone sera la suivante.
_next_id = itertools.count().__next__
def get_task_id():
return _next_id()
async def async_func(name, sleep_time):
#Obtenir l'identifiant de la tâche
task_id = get_task_id()
logger.info(f"task_id:{task_id} name:{name} async_démarrer func")
await asyncio.sleep(sleep_time)
logger.info(f"task_id:{task_id} name:{name} async_fin de func")
return f"{task_id}-{name}"
async def async_func_caller():
#Obtenir l'identifiant de la tâche
task_id = get_task_id()
#Générer une tâche de traitement asynchrone
#* À ce stade, la tâche est uniquement générée et non exécutée.
# loop.run_until_Exécuté lors de l'appel terminé.
futures = [asyncio.ensure_future(async_func(f"task{i + 1}", 10)) for i in range(5)]
#Traitement que vous souhaitez exécuter dans le thread principal en parallèle avec le traitement asynchrone
logger.info(f"task_id:{task_id} async_func_appelant Démarrer le traitement pendant le traitement asynchrone")
await asyncio.sleep(5)
logger.info(f"task_id:{task_id} async_func_appelant Fin du traitement pendant le traitement asynchrone")
#Attendez que le processus asynchrone se termine et obtenez le résultat.
results = await asyncio.gather(*futures)
return results
if __name__ == "__main__":
#Créer un pool de threads pour l'exécution du traitement asynchrone
loop = asyncio.get_event_loop()
logger.info(f"main Démarrer le traitement pendant le traitement asynchrone")
#Exécutez le traitement asynchrone et attendez la fin
ret = loop.run_until_complete(async_func_caller())
logger.info(f"main Fin du traitement pendant le traitement asynchrone Résultat:{ret}")
loop.close()
2020-10-15 16:49:40,132 -main Démarrer le traitement pendant le traitement asynchrone
2020-10-15 16:49:40,134 - task_id:0 async_func_appelant Démarrer le traitement pendant le traitement asynchrone
2020-10-15 16:49:40,134 - task_id:1 name:task1 async_démarrer func
2020-10-15 16:49:40,135 - task_id:2 name:task2 async_démarrer func
2020-10-15 16:49:40,135 - task_id:3 name:task3 async_démarrer func
2020-10-15 16:49:40,136 - task_id:4 name:task4 async_démarrer func
2020-10-15 16:49:40,136 - task_id:5 name:task5 async_démarrer func
2020-10-15 16:49:45,138 - task_id:0 async_func_appelant Fin du traitement pendant le traitement asynchrone
2020-10-15 16:49:50,141 - task_id:2 name:task2 async_fin de func
2020-10-15 16:49:50,142 - task_id:5 name:task5 async_fin de func
2020-10-15 16:49:50,142 - task_id:4 name:task4 async_fin de func
2020-10-15 16:49:50,144 - task_id:1 name:task1 async_fin de func
2020-10-15 16:49:50,144 - task_id:3 name:task3 async_fin de func
2020-10-15 16:49:50,145 -main Fin du traitement pendant le traitement asynchrone Résultat:['1-task1', '2-task2', '3-task3', '4-task4', '5-task5']
Depuis le journal, vous pouvez voir qu'à 16:49:40, cinq processus, "processus asynchrone principal en cours" et "async_func start", sont exécutés en parallèle en même temps.
Nous espérons que cela vous sera utile lors de l'implémentation du traitement asynchrone.
Recommended Posts