[Python] J'ai examiné une pratique qui peut être exécutée en parallèle avec le thread principal par traitement asynchrone (multiprocessing, asyncio)

J'ai envisagé une pratique de traitement asynchrone python qui répond aux exigences suivantes.

Le code est un peu redondant, mais toutes les sources, y compris les enregistreurs, sont répertoriées afin que vous puissiez le copier et le coller tel quel.

Cas 1 Le thread principal et le thread asynchrone sont traités en parallèle (en utilisant le multitraitement)

multiprocessing1.py


from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging

#Obtenez un enregistreur
def get_logger():
    logger = logging.getLogger("multiprocesssing_test")
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
    ch.setFormatter(ch_formatter)
    
    logger.addHandler(ch)
    return logger

logger = get_logger()

def async_func(name, sleep_time):
    #Obtenir l'identifiant du fil
    thread_id = th.get_ident()

    logger.info(f"thread_id:{thread_id} name:{name} async_démarrer func")
    time.sleep(sleep_time)
    logger.info(f"thread_id:{thread_id} name:{name} async_fin de func")

    return f"{thread_id}-{name}"

if __name__ == "__main__":
    #Créer un pool de threads pour l'exécution des threads
    #Nombre maximal de threads simultanés dans les processus
    pool = ThreadPool(processes=1)

    #Obtenir l'identifiant du fil
    thread_id = th.get_ident()

    #Exécutez un traitement asynchrone. Spécifiez l'objet fonction comme premier argument et l'argument comme deuxième argument.
    logger.info(f"thread_id:{thread_id}Appeler le traitement asynchrone à partir de main")
    future = pool.apply_async(async_func, ("Fil 1", 10))

    #Traitement que vous souhaitez exécuter dans le thread principal en parallèle avec le traitement asynchrone
    logger.info(f"thread_id:{thread_id}main Démarrer le traitement pendant le traitement asynchrone")
    time.sleep(5)
    logger.info(f"thread_id:{thread_id}main Fin du traitement pendant le traitement asynchrone")

    #Attendez que le processus asynchrone se termine et obtenez le résultat.
    result = future.get()
    logger.info(f"thread_id:{thread_id}Obtenez le résultat du traitement asynchrone:{result}")
    pool.close()

Résultat d'exécution

2020-10-15 16:43:27,073 - thread_id:18440 Appel du traitement asynchrone depuis le
2020-10-15 16:43:27,074 - thread_id:18440 main Démarrer le traitement pendant le traitement asynchrone
2020-10-15 16:43:27,074 - thread_id:18132 name:Thread 1 asynchrone_démarrer func
2020-10-15 16:43:32,074 - thread_id:18440 main Fin de traitement pendant le traitement asynchrone
2020-10-15 16:43:37,075 - thread_id:18132 name:Thread 1 asynchrone_fin de func
2020-10-15 16:43:37,075 - thread_id:18440 Obtenir le résultat du traitement asynchrone:18132-Fil 1

Depuis le journal, vous pouvez voir qu'à 16:43:27, les processus du "processus asynchrone principal en cours d'exécution" et "async_func start" sont exécutés en parallèle en même temps.

Cas 2 Traitement du thread principal et de plusieurs threads asynchrones en parallèle (en utilisant le multitraitement)

multiprocessing2.py


from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging

#Obtenez un enregistreur
def get_logger():
    logger = logging.getLogger("multiprocesssing_test")
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
    ch.setFormatter(ch_formatter)
    
    logger.addHandler(ch)
    return logger

logger = get_logger()

def async_func(name, sleep_time):
    #Obtenir l'identifiant du fil
    thread_id = th.get_ident()

    logger.info(f"thread_id:{thread_id} name:{name} async_démarrer func")
    time.sleep(sleep_time)
    logger.info(f"thread_id:{thread_id} name:{name} async_fin de func")

    return f"{thread_id}-{name}"

if __name__ == "__main__":
    #Créer un pool de threads pour l'exécution des threads
    #Nombre maximal de threads simultanés dans les processus
    pool = ThreadPool(processes=5)

    #Obtenir l'identifiant du fil
    thread_id = th.get_ident()

    #Exécutez un traitement asynchrone. Spécifiez l'objet fonction comme premier argument et l'argument comme deuxième argument.
    logger.info(f"thread_id:{thread_id}Appeler le traitement asynchrone à partir de main")
    futures = []
    for i in range(5):
        future = pool.apply_async(async_func, (f"fil{i + 1}", 10)) # Tuple of args for foo
        futures.append(future)

    #Traitement que vous souhaitez exécuter dans le thread principal en parallèle avec le traitement asynchrone
    logger.info(f"thread_id:{thread_id}main Démarrer le traitement pendant le traitement asynchrone")
    time.sleep(5)
    logger.info(f"thread_id:{thread_id}main Fin du traitement pendant le traitement asynchrone")

    #Attendez que le processus asynchrone se termine et obtenez le résultat.
    results = [future.get() for future in futures]
    logger.info(f"thread_id:{thread_id}Obtenez le résultat du traitement asynchrone:{results}")
    pool.close()

Résultat d'exécution

2020-10-15 16:47:41,977 - thread_id:13448 Appeler le traitement asynchrone depuis le principal
2020-10-15 16:47:41,978 - thread_id:13448 principal Démarrer le traitement pendant le traitement asynchrone
2020-10-15 16:47:41,979 - thread_id:23216 name:Thread 1 asynchrone_démarrer func
2020-10-15 16:47:41,979 - thread_id:21744 name:Thread 2 asynchrone_démarrer func
2020-10-15 16:47:41,979 - thread_id:21708 name:Thread 3 asynchrone_démarrer func
2020-10-15 16:47:41,979 - thread_id:21860 name:Thread 4 asynchrone_démarrer func
2020-10-15 16:47:41,979 - thread_id:22100 name:Thread 5 asynchrone_démarrer func
2020-10-15 16:47:46,980 - thread_id:13448 main Fin du traitement pendant le traitement asynchrone
2020-10-15 16:47:51,982 - thread_id:21744 name:Thread 2 asynchrone_fin de func
2020-10-15 16:47:51,982 - thread_id:23216 name:Thread 1 asynchrone_fin de func
2020-10-15 16:47:51,983 - thread_id:21708 name:Thread 3 asynchrone_fin de func
2020-10-15 16:47:51,984 - thread_id:21860 name:Thread 4 asynchrone_fin de func
2020-10-15 16:47:51,984 - thread_id:22100 name:Thread 5 asynchrone_fin de func
2020-10-15 16:47:51,986 - thread_id:13448 Obtenir le résultat du traitement asynchrone:['23216-Fil 1', '21744-Fil 2', '21708-Fil 3', '21860-Su
Rouge 4', '22100-Fil 5']

Depuis le journal, vous pouvez voir qu'à 16:47:41, cinq processus, "processus asynchrone principal en cours" et "async_func start", sont exécutés en parallèle en même temps. De plus, si vous réduisez le nombre de processus en utilisant ThreadPool (processus = 3) etc., 3 threads seront exécutés en premier, 2 seront en état d'attente et un nouveau thread sera exécuté une fois terminé.

Cas 3 Traiter le thread principal et plusieurs threads asynchrones en parallèle (en utilisant asyncio)

asyncio1.py


import asyncio
import itertools
import time
import profile
import random
import time
import threading as th
import logging

#Obtenez un enregistreur
def get_logger():
    logger = logging.getLogger("asyncio_test")
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
    ch.setFormatter(ch_formatter)
    
    logger.addHandler(ch)
    return logger

logger = get_logger()

#Obtenez quelque chose comme l'identifiant de tâche
#* Depuis asyncio utilise un générateur en interne
#L'ID de thread sera le même et la méthode d'acquisition de l'ID correspondant au traitement asynchrone sera la suivante.
_next_id = itertools.count().__next__
def get_task_id():
    return _next_id()

async def async_func(name, sleep_time):
    #Obtenir l'identifiant de la tâche
    task_id = get_task_id()

    logger.info(f"task_id:{task_id} name:{name} async_démarrer func")
    await asyncio.sleep(sleep_time)
    logger.info(f"task_id:{task_id} name:{name} async_fin de func")

    return f"{task_id}-{name}"

async def async_func_caller():
    #Obtenir l'identifiant de la tâche
    task_id = get_task_id()

    #Générer une tâche de traitement asynchrone
    #* À ce stade, la tâche est uniquement générée et non exécutée.
    # loop.run_until_Exécuté lors de l'appel terminé.
    futures = [asyncio.ensure_future(async_func(f"task{i + 1}", 10)) for i in range(5)]

    #Traitement que vous souhaitez exécuter dans le thread principal en parallèle avec le traitement asynchrone
    logger.info(f"task_id:{task_id} async_func_appelant Démarrer le traitement pendant le traitement asynchrone")
    await asyncio.sleep(5)
    logger.info(f"task_id:{task_id} async_func_appelant Fin du traitement pendant le traitement asynchrone")

    #Attendez que le processus asynchrone se termine et obtenez le résultat.
    results = await asyncio.gather(*futures)

    return results


if __name__ == "__main__":
    #Créer un pool de threads pour l'exécution du traitement asynchrone
    loop = asyncio.get_event_loop()

    logger.info(f"main Démarrer le traitement pendant le traitement asynchrone")

    #Exécutez le traitement asynchrone et attendez la fin
    ret = loop.run_until_complete(async_func_caller())
    logger.info(f"main Fin du traitement pendant le traitement asynchrone Résultat:{ret}")
    loop.close()

Résultat d'exécution

2020-10-15 16:49:40,132 -main Démarrer le traitement pendant le traitement asynchrone
2020-10-15 16:49:40,134 - task_id:0 async_func_appelant Démarrer le traitement pendant le traitement asynchrone
2020-10-15 16:49:40,134 - task_id:1 name:task1 async_démarrer func
2020-10-15 16:49:40,135 - task_id:2 name:task2 async_démarrer func
2020-10-15 16:49:40,135 - task_id:3 name:task3 async_démarrer func
2020-10-15 16:49:40,136 - task_id:4 name:task4 async_démarrer func
2020-10-15 16:49:40,136 - task_id:5 name:task5 async_démarrer func
2020-10-15 16:49:45,138 - task_id:0 async_func_appelant Fin du traitement pendant le traitement asynchrone
2020-10-15 16:49:50,141 - task_id:2 name:task2 async_fin de func
2020-10-15 16:49:50,142 - task_id:5 name:task5 async_fin de func
2020-10-15 16:49:50,142 - task_id:4 name:task4 async_fin de func
2020-10-15 16:49:50,144 - task_id:1 name:task1 async_fin de func
2020-10-15 16:49:50,144 - task_id:3 name:task3 async_fin de func
2020-10-15 16:49:50,145 -main Fin du traitement pendant le traitement asynchrone Résultat:['1-task1', '2-task2', '3-task3', '4-task4', '5-task5']

Depuis le journal, vous pouvez voir qu'à 16:49:40, cinq processus, "processus asynchrone principal en cours" et "async_func start", sont exécutés en parallèle en même temps.

Nous espérons que cela vous sera utile lors de l'implémentation du traitement asynchrone.

Recommended Posts

[Python] J'ai examiné une pratique qui peut être exécutée en parallèle avec le thread principal par traitement asynchrone (multiprocessing, asyncio)
J'ai acheté et analysé la loterie jumbo de fin d'année avec Python qui peut être exécutée dans Colaboratory
Recevez une liste des résultats du traitement parallèle en Python avec starmap
L'histoire selon laquelle sendmail qui peut être exécuté dans le terminal ne fonctionnait pas avec cron
traitement python3 qui semble utilisable dans paiza
Goroutine (contrôle parallèle) utilisable sur le terrain
J'ai étudié le prétraitement qui peut être fait avec PyCaret
Une fonction qui mesure le temps de traitement d'une méthode en python
J'ai fait un shuffle qui peut être réinitialisé (inversé) avec Python
[Python] Un programme qui trouve le nombre maximum de jouets pouvant être achetés avec votre argent
Article qui peut être une ressource humaine qui comprend et maîtrise le mécanisme de l'API (avec du code Python)
Analyse des données de pratique Python Résumé de l'apprentissage que j'ai atteint environ 10 avec 100 coups
J'ai installé Pygame avec Python 3.5.1 dans l'environnement de pyenv sur OS X
Je souhaite créer une file d'attente prioritaire pouvant être mise à jour avec Python (2.7)
J'ai enregistré PyQCheck, une bibliothèque qui peut effectuer QuickCheck avec Python, dans PyPI.
J'ai essayé de prédire les chevaux qui seront dans le top 3 avec LightGBM
Traitement asynchrone en Python: référence inverse asyncio
Afficher le résultat du traitement de la géométrie en Python
Traitement parallèle sans signification profonde en Python
Liste des outils qui peuvent être utilisés pour essayer facilement l'analyse des émotions des phrases japonaises avec Python (essayez avec google colab)
Résumé des méthodes d'analyse de données statistiques utilisant Python qui peuvent être utilisées en entreprise
Visualisation des informations géographiques de R et Python qui peuvent être exprimées par Power BI
En voici une, je vais résumer les applications équipées "d'intelligence artificielle" qui m'intéressaient
[Python] Introduction au scraping WEB | Résumé des méthodes pouvant être utilisées avec webdriver
Dans Python3.8 et versions ultérieures, le mod inverse peut être calculé avec la fonction intégrée pow.
J'ai essayé de comparer la vitesse de traitement avec dplyr de R et pandas de Python
Notes sur les connaissances Python utilisables avec AtCoder
Un mémo que j'ai touché au magasin de données avec python
Peut être utilisé avec AtCoder! Une collection de techniques pour dessiner du code court en Python!
J'ai écrit un doctest dans "J'ai essayé de simuler la probabilité d'un jeu de bingo avec Python"
[Python] Un programme pour trouver le nombre de pommes et d'oranges qui peuvent être récoltées
Traitement parallèle avec multitraitement
Récapitulatif du format des formats qui peuvent être sérialisés avec gensim
J'ai essayé de trouver l'entropie de l'image avec python
Essayez de gratter les données COVID-19 Tokyo avec Python
J'ai essayé la "correction gamma" de l'image avec Python + OpenCV
Pourquoi puis-je utiliser le module en important avec python?
Calculez des millions de chiffres dans la racine carrée de 2 avec python
J'ai écrit la grammaire de base de Python dans Jupyter Lab
J'ai évalué la stratégie de négociation du système boursier avec Python.
J'ai mesuré différentes méthodes de communication inter-processus en multitraitement de python3
[Homologie] Comptez le nombre de trous dans les données avec Python
Goroutine utilisable sur le terrain (édition errgroup.Group)
Scripts pouvant être utilisés lors de l'utilisation de Bottle en Python
Index d'évaluation pouvant être spécifié pour GridSearchCV de sklearn
J'ai essayé de l'étendre pour que la base de données puisse être utilisée avec le logiciel d'analyse de Wiire
Comprendre les probabilités et les statistiques qui peuvent être utilisées pour la gestion des progrès avec un programme python
Python Priority Queue Le mystère du résultat de la fonction heapify qui ne intéresserait pas la plupart des gens
Une histoire qui n'a pas fonctionné lorsque j'ai essayé de me connecter avec le module de requêtes Python
Prédisez le nombre de coussins qui peuvent être reçus en tant que répondants rires avec Word2Vec + Random Forest
L'histoire de la création d'un Bot qui affiche les membres actifs dans un canal spécifique de Slack avec Python
J'ai essayé d'en savoir le plus possible sur GIL que vous devriez savoir si vous faites un traitement parallèle avec Python