Ich habe eine Praxis der asynchronen Python-Verarbeitung in Betracht gezogen, die die folgenden Anforderungen erfüllt.
Der Code ist etwas redundant, aber alle Quellen, einschließlich der Logger, werden aufgelistet, damit Sie ihn so kopieren und einfügen können, wie er ist.
multiprocessing1.py
from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging
#Holen Sie sich Logger
def get_logger():
logger = logging.getLogger("multiprocesssing_test")
logger.setLevel(logging.DEBUG)
logger.propagate = False
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
ch.setFormatter(ch_formatter)
logger.addHandler(ch)
return logger
logger = get_logger()
def async_func(name, sleep_time):
#Thread-ID abrufen
thread_id = th.get_ident()
logger.info(f"thread_id:{thread_id} name:{name} async_starte func")
time.sleep(sleep_time)
logger.info(f"thread_id:{thread_id} name:{name} async_Ende der Funktion")
return f"{thread_id}-{name}"
if __name__ == "__main__":
#Erstellen Sie einen Thread-Pool für die Thread-Ausführung
#Maximale gleichzeitige Threads in Prozessen
pool = ThreadPool(processes=1)
#Thread-ID abrufen
thread_id = th.get_ident()
#Führen Sie die asynchrone Verarbeitung aus. Geben Sie das Funktionsobjekt als erstes Argument und das Argument als zweites Argument an.
logger.info(f"thread_id:{thread_id}Rufen Sie die asynchrone Verarbeitung von main auf")
future = pool.apply_async(async_func, ("Faden 1", 10))
#Verarbeitung, die Sie parallel zur asynchronen Verarbeitung im Hauptthread ausführen möchten
logger.info(f"thread_id:{thread_id}main Startet die Verarbeitung während der asynchronen Verarbeitung")
time.sleep(5)
logger.info(f"thread_id:{thread_id}main Ende der Verarbeitung während der asynchronen Verarbeitung")
#Warten Sie, bis der asynchrone Prozess abgeschlossen ist, und erhalten Sie das Ergebnis.
result = future.get()
logger.info(f"thread_id:{thread_id}Holen Sie sich das Ergebnis der asynchronen Verarbeitung:{result}")
pool.close()
2020-10-15 16:43:27,073 - thread_id:18440 Asynchrone Verarbeitung von main aufrufen
2020-10-15 16:43:27,074 - thread_id:18440 main Startet die Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:43:27,074 - thread_id:18132 name:Thread 1 asynchron_starte func
2020-10-15 16:43:32,074 - thread_id:18440 main Ende der Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:43:37,075 - thread_id:18132 name:Thread 1 asynchron_Ende der Funktion
2020-10-15 16:43:37,075 - thread_id:18440 Ruft das Ergebnis der asynchronen Verarbeitung ab:18132-Faden 1
Aus dem Protokoll können Sie ersehen, dass die Prozesse "Asynchrone Hauptverarbeitung läuft" und "async_func start" parallel um 16:43:27 ausgeführt werden.
multiprocessing2.py
from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging
#Holen Sie sich Logger
def get_logger():
logger = logging.getLogger("multiprocesssing_test")
logger.setLevel(logging.DEBUG)
logger.propagate = False
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
ch.setFormatter(ch_formatter)
logger.addHandler(ch)
return logger
logger = get_logger()
def async_func(name, sleep_time):
#Thread-ID abrufen
thread_id = th.get_ident()
logger.info(f"thread_id:{thread_id} name:{name} async_starte func")
time.sleep(sleep_time)
logger.info(f"thread_id:{thread_id} name:{name} async_Ende der Funktion")
return f"{thread_id}-{name}"
if __name__ == "__main__":
#Erstellen Sie einen Thread-Pool für die Thread-Ausführung
#Maximale gleichzeitige Threads in Prozessen
pool = ThreadPool(processes=5)
#Thread-ID abrufen
thread_id = th.get_ident()
#Führen Sie die asynchrone Verarbeitung aus. Geben Sie das Funktionsobjekt als erstes Argument und das Argument als zweites Argument an.
logger.info(f"thread_id:{thread_id}Rufen Sie die asynchrone Verarbeitung von main auf")
futures = []
for i in range(5):
future = pool.apply_async(async_func, (f"Faden{i + 1}", 10)) # Tuple of args for foo
futures.append(future)
#Verarbeitung, die Sie parallel zur asynchronen Verarbeitung im Hauptthread ausführen möchten
logger.info(f"thread_id:{thread_id}main Startet die Verarbeitung während der asynchronen Verarbeitung")
time.sleep(5)
logger.info(f"thread_id:{thread_id}main Ende der Verarbeitung während der asynchronen Verarbeitung")
#Warten Sie, bis der asynchrone Prozess abgeschlossen ist, und erhalten Sie das Ergebnis.
results = [future.get() for future in futures]
logger.info(f"thread_id:{thread_id}Holen Sie sich das Ergebnis der asynchronen Verarbeitung:{results}")
pool.close()
2020-10-15 16:47:41,977 - thread_id:13448 Asynchrone Verarbeitung von main aufrufen
2020-10-15 16:47:41,978 - thread_id:13448 main Startet die Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:47:41,979 - thread_id:23216 name:Thread 1 asynchron_starte func
2020-10-15 16:47:41,979 - thread_id:21744 name:Thread 2 asynchron_starte func
2020-10-15 16:47:41,979 - thread_id:21708 name:Thread 3 asynchron_starte func
2020-10-15 16:47:41,979 - thread_id:21860 name:Thread 4 asynchron_starte func
2020-10-15 16:47:41,979 - thread_id:22100 name:Thread 5 asynchron_starte func
2020-10-15 16:47:46,980 - thread_id:13448 main Ende der Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:47:51,982 - thread_id:21744 name:Thread 2 asynchron_Ende der Funktion
2020-10-15 16:47:51,982 - thread_id:23216 name:Thread 1 asynchron_Ende der Funktion
2020-10-15 16:47:51,983 - thread_id:21708 name:Thread 3 asynchron_Ende der Funktion
2020-10-15 16:47:51,984 - thread_id:21860 name:Thread 4 asynchron_Ende der Funktion
2020-10-15 16:47:51,984 - thread_id:22100 name:Thread 5 asynchron_Ende der Funktion
2020-10-15 16:47:51,986 - thread_id:13448 Ruft das Ergebnis der asynchronen Verarbeitung ab:['23216-Faden 1', '21744-Faden 2', '21708-Faden 3', '21860-Su
Rot 4', '22100-Faden 5']
Aus dem Protokoll können Sie ersehen, dass um 16:47:41 Uhr fünf Prozesse, "Hauptasynchroner Prozess in Bearbeitung" und "async_func start", gleichzeitig parallel ausgeführt werden. Wenn Sie die Anzahl der Prozesse mithilfe von "ThreadPool (Prozesse = 3)" usw. reduzieren, werden zuerst 3 Threads ausgeführt, 2 befinden sich im Wartezustand und nach Abschluss wird ein neuer Thread ausgeführt.
asyncio1.py
import asyncio
import itertools
import time
import profile
import random
import time
import threading as th
import logging
#Holen Sie sich Logger
def get_logger():
logger = logging.getLogger("asyncio_test")
logger.setLevel(logging.DEBUG)
logger.propagate = False
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
ch.setFormatter(ch_formatter)
logger.addHandler(ch)
return logger
logger = get_logger()
#Holen Sie sich so etwas wie Task-ID
#* Da asyncio intern einen Generator verwendet
#Die Thread-ID ist dieselbe, und die Erfassungsmethode der ID, die der asynchronen Verarbeitung entspricht, lautet wie folgt.
_next_id = itertools.count().__next__
def get_task_id():
return _next_id()
async def async_func(name, sleep_time):
#Aufgaben-ID abrufen
task_id = get_task_id()
logger.info(f"task_id:{task_id} name:{name} async_starte func")
await asyncio.sleep(sleep_time)
logger.info(f"task_id:{task_id} name:{name} async_Ende der Funktion")
return f"{task_id}-{name}"
async def async_func_caller():
#Aufgaben-ID abrufen
task_id = get_task_id()
#Generieren Sie eine asynchrone Verarbeitungsaufgabe
#* Zu diesem Zeitpunkt wird die Aufgabe nur generiert und nicht ausgeführt.
# loop.run_until_Wird ausgeführt, wenn der Aufruf abgeschlossen ist.
futures = [asyncio.ensure_future(async_func(f"task{i + 1}", 10)) for i in range(5)]
#Verarbeitung, die Sie parallel zur asynchronen Verarbeitung im Hauptthread ausführen möchten
logger.info(f"task_id:{task_id} async_func_Anrufer Startet die Verarbeitung während der asynchronen Verarbeitung")
await asyncio.sleep(5)
logger.info(f"task_id:{task_id} async_func_Anrufer Ende der Verarbeitung während der asynchronen Verarbeitung")
#Warten Sie, bis der asynchrone Prozess abgeschlossen ist, und erhalten Sie das Ergebnis.
results = await asyncio.gather(*futures)
return results
if __name__ == "__main__":
#Erstellen Sie einen Thread-Pool für die asynchrone Verarbeitung
loop = asyncio.get_event_loop()
logger.info(f"main Startet die Verarbeitung während der asynchronen Verarbeitung")
#Führen Sie die asynchrone Verarbeitung aus und warten Sie bis zum Ende
ret = loop.run_until_complete(async_func_caller())
logger.info(f"main Ende der Verarbeitung während der asynchronen Verarbeitung Ergebnis:{ret}")
loop.close()
2020-10-15 16:49:40,132 -main Startet die Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:49:40,134 - task_id:0 async_func_Anrufer Startet die Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:49:40,134 - task_id:1 name:task1 async_starte func
2020-10-15 16:49:40,135 - task_id:2 name:task2 async_starte func
2020-10-15 16:49:40,135 - task_id:3 name:task3 async_starte func
2020-10-15 16:49:40,136 - task_id:4 name:task4 async_starte func
2020-10-15 16:49:40,136 - task_id:5 name:task5 async_starte func
2020-10-15 16:49:45,138 - task_id:0 async_func_Anrufer Ende der Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:49:50,141 - task_id:2 name:task2 async_Ende der Funktion
2020-10-15 16:49:50,142 - task_id:5 name:task5 async_Ende der Funktion
2020-10-15 16:49:50,142 - task_id:4 name:task4 async_Ende der Funktion
2020-10-15 16:49:50,144 - task_id:1 name:task1 async_Ende der Funktion
2020-10-15 16:49:50,144 - task_id:3 name:task3 async_Ende der Funktion
2020-10-15 16:49:50,145 -main Ende der Verarbeitung während der asynchronen Verarbeitung Ergebnis:['1-task1', '2-task2', '3-task3', '4-task4', '5-task5']
Aus dem Protokoll können Sie ersehen, dass um 16:49:40 Uhr fünf Prozesse, "Asynchroner Hauptprozess in Bearbeitung" und "Start von async_func", gleichzeitig parallel ausgeführt werden.
Wir hoffen, dass dies bei der Implementierung der asynchronen Verarbeitung hilfreich ist.
Recommended Posts