Exécution de tâches parallèles à l'aide de concurrent.futures en Python

Présentation du module concurrent.futures

Le module concurrent.futures ajouté dans Python 3.2 offre la possibilité d'exécuter plusieurs processus en parallèle.

Il existe d'autres modules en Python appelés threading et multiprocessing, mais ceux-ci gèrent un processus de thread, tandis que le module concurrent.futures vise à gérer plusieurs processus de thread. est.

Executor

Le module concurrent.futures a une classe Executor comme classe abstraite, et deux classes sont fournies comme classes d'implémentation. Utilisez l'un des deux pour effectuer des tâches parallèles.

max_workers

Dans le constructeur Executor, le nombre maximum de tâches pouvant être exécutées en même temps est spécifié par l'argument max_workers. Si vous demandez à exécuter plus de tâches que ce qui peut être exécuté simultanément, les tâches seront ajoutées à la file d'attente, en attendant que les autres tâches se terminent, puis s'exécutent de manière séquentielle.

Méthodes pour exécuter les tâches soumettre et mapper

Executor dispose des méthodes suivantes pour exécuter des tâches parallèles.

échantillon

Ceci est un exemple utilisant concurrent.futures. Il est écrit en Python3.6, mais je pense qu'il peut être utilisé si l'environnement est 3.2 ou supérieur avec une petite modification.

Un exemple simple utilisant ThreadPoolExecutor

01_thread.py Effectuez 5 tâches dans 2 threads. La tâche est juste de dormir pendant 1 seconde.

python


def task(v):
    getLogger().info("%s start", v)
    time.sleep(1.0)
    getLogger().info("%s end", v)

def main():
    init_logger()
    getLogger().info("main start")
    with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
        for i in range(5):
            executor.submit(task, i)
        getLogger().info("submit end")
    getLogger().info("main end")
[2017-04-02 12:01:39,747] [MainThread] main start
[2017-04-02 12:01:39,748] [thread_0] 0 start
[2017-04-02 12:01:39,749] [thread_1] 1 start
[2017-04-02 12:01:39,750] [MainThread] submit end
[2017-04-02 12:01:40,755] [thread_0] 0 end
[2017-04-02 12:01:40,755] [thread_0] 2 start
[2017-04-02 12:01:40,756] [thread_1] 1 end
[2017-04-02 12:01:40,756] [thread_1] 3 start
[2017-04-02 12:01:41,761] [thread_0] 2 end
[2017-04-02 12:01:41,761] [thread_0] 4 start
[2017-04-02 12:01:41,761] [thread_1] 3 end
[2017-04-02 12:01:42,764] [thread_0] 4 end
[2017-04-02 12:01:42,765] [MainThread] main end

Recevez les résultats avec soumettre

02_future.py

Voici un exemple de réception du résultat de [soumettre].

def task(v):
    getLogger().info("%s start", v)
    time.sleep(1.0)
    getLogger().info("%s end", v)
    return v * 2

def main():
    init_logger()
    getLogger().info("main start")
    with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
        futures = []
        for i in range(5):
            futures.append(executor.submit(task, i))
        getLogger().info("submit end")
        getLogger().info([f.result() for f in futures])
    getLogger().info("main end")
[2017-04-02 12:08:23,853] [MainThread] main start
[2017-04-02 12:08:23,854] [thread_0] 0 start
[2017-04-02 12:08:23,855] [thread_1] 1 start
[2017-04-02 12:08:23,856] [MainThread] submit end
[2017-04-02 12:08:24,856] [thread_0] 0 end
[2017-04-02 12:08:24,856] [thread_0] 2 start
[2017-04-02 12:08:24,857] [thread_1] 1 end
[2017-04-02 12:08:24,857] [thread_1] 3 start
[2017-04-02 12:08:25,863] [thread_0] 2 end
[2017-04-02 12:08:25,864] [thread_0] 4 start
[2017-04-02 12:08:25,864] [thread_1] 3 end
[2017-04-02 12:08:26,867] [thread_0] 4 end
[2017-04-02 12:08:26,868] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:08:26,868] [MainThread] main end

Exemple d'ajout par lots de tâches / acquisition par lots de résultats avec carte

03_map.py Si vous souhaitez traiter des tâches dans un lot, il est plus facile d'écrire en utilisant map plutôt que submit.

def task(v):
    getLogger().info("%s start", v)
    time.sleep(1.0)
    getLogger().info("%s end", v)
    return v * 2

def main():
    init_logger()
    getLogger().info("main start")
    with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
        results = executor.map(task, range(5))
        getLogger().info("map end")
    getLogger().info(list(results))
    getLogger().info("main end")
[2017-04-02 12:10:03,997] [MainThread] main start
[2017-04-02 12:10:03,998] [thread_0] 0 start
[2017-04-02 12:10:04,000] [thread_1] 1 start
[2017-04-02 12:10:04,000] [MainThread] map end
[2017-04-02 12:10:05,005] [thread_0] 0 end
[2017-04-02 12:10:05,006] [thread_0] 2 start
[2017-04-02 12:10:05,006] [thread_1] 1 end
[2017-04-02 12:10:05,006] [thread_1] 3 start
[2017-04-02 12:10:06,007] [thread_0] 2 end
[2017-04-02 12:10:06,007] [thread_0] 4 start
[2017-04-02 12:10:06,007] [thread_1] 3 end
[2017-04-02 12:10:07,014] [thread_0] 4 end
[2017-04-02 12:10:07,014] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:10:07,014] [MainThread] main end

Exemple d'exécution de traitements lourds avec ProcessPoolExecutor

04_process.py Enfin, un exemple utilisant Process Pool Executor. Modifions les paramètres et voyons la différence de performances.

def task(params):
    (v, num_calc) = params
    a = float(v)
    for _ in range(num_calc):
        a = pow(a, a)
    return a

def main():
    init_logger()

    if len(sys.argv) != 5:
        print("usage: 05_process.py max_workers chunk_size num_tasks num_calc")
        sys.exit(1)
    (max_workers, chunk_size, num_tasks, num_calc) = map(int, sys.argv[1:])

    start = time()

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        params = map(lambda _: (random(), num_calc), range(num_tasks))
        results = executor.map(task, params, chunksize=chunk_size)
    getLogger().info(sum(results))

    getLogger().info("{:.3f}".format(time() - start))

Description des paramètres

Environnement d'exécution

Résultat d'exécution

Les quatre premiers sont les résultats de l'exécution d'un grand nombre de grandes tâches et les quatre derniers sont les résultats de l'exécution d'un grand nombre de petites tâches. Il semble que la spécification de chunk_size soit importante lors de l'exécution d'un grand nombre de petites tâches.

max_workers chunk_size num_tasks num_calc Temps d'exécution(sec)
1 1 100 100,000 1.954
2 1 100 100,000 1.042
1 10 100 100,000 1.922
2 10 100 100,000 1.071
1 1 10,000 1,000 3.295
2 1 10,000 1,000 3.423
1 10 10,000 1,000 2.272
2 10 10,000 1,000 1.279
1 100 10,000 1,000 2.126
2 100 10,000 1,000 1.090

Recommended Posts

Exécution de tâches parallèles à l'aide de concurrent.futures en Python
Téléchargement parallèle avec Python
Exécuter Python unittest en parallèle
Exécution de commandes externes en Python
Traduit à l'aide de googletrans en Python
Utilisation du mode Python dans le traitement
mémorandum d'exécution parallèle / asynchrone python
Programmation GUI en Python avec Appjar
Précautions lors de l'utilisation de Pit avec Python
Mesurer le temps d'exécution de la fonction en Python
Utilisation de variables globales dans les fonctions python
Voyons voir l'utilisation de l'entrée en python
Exécuter le module Python unittest dans vs2017
Puissance totale en Python (en utilisant functools)
Reconnaissance de caractères manuscrits à l'aide de KNN en Python
Exécution parallèle facile avec le sous-processus python
Essayez d'utiliser LeapMotion avec Python
Recherche de priorité de profondeur à l'aide de la pile en Python
Lors de l'utilisation d'expressions régulières en Python
Création d'interface graphique en python avec tkinter 2
Fonctionnement de la souris à l'aide de l'API Windows en Python
Notes utilisant cChardet et python3-chardet dans Python 3.3.1.
Essayez d'utiliser l'API Wunderlist en Python
Création d'interface graphique en python à l'aide de tkinter partie 1
Obtenir l'équilibre Suica en Python (en utilisant libpafe)
Gérez plusieurs environnements d'exécution à l'aide de Python venv
Hachez lentement les mots de passe en utilisant bcrypt en Python
Essayez d'utiliser l'API Kraken avec Python
Utilisation de venv dans un environnement Windows + Docker [Python]
Tweet à l'aide de l'API Twitter en Python
[Python] [Windows] Communication série en Python à l'aide de DLL
Connectez-vous à Slack à l'aide de requêtes en Python
Obtenez des données Youtube en Python à l'aide de l'API Youtube Data
Utilisation des constantes physiques dans Python scipy.constants ~ constant e ~
Scraping de sites Web à l'aide de JavaScript en Python
Développement de slack bot avec python en utilisant chat.postMessage
Dessinez une structure arborescente en Python 3 à l'aide de graphviz
Remarques sur l'utilisation de python (pydev) avec eclipse
Classification des maladies par Random Forest en utilisant Python
Téléchargez des fichiers dans n'importe quel format en utilisant Python
Notifier à l'aide du Centre de notifications lorsque l'environnement d'exécution est macOS en Python
Quadtree en Python --2
Python en optimisation
CURL en Python
Métaprogrammation avec Python
Python 3.3 avec Anaconda
Géocodage en python
SendKeys en Python
Créer un fichier GIF en utilisant Pillow en Python
Comment prendre plusieurs arguments lors d'un traitement parallèle à l'aide du multitraitement en python
Méta-analyse en Python
Pièces jointes par e-mail à l'aide de votre compte gmail avec python.
Unittest en Python
Création d'un processus de numérotation à l'aide de python dans le processus de numérotation locale DynamoDB
Essayez d'utiliser l'API BitFlyer Ligntning en Python
Obtenir des frappes lors de l'exécution en arrière-plan en Python (Windows)
Époque en Python
Discord en Python