Referenziert: https://www.bettercodebytes.com/theadpoolexecutor-with-a-bounded-queue-in-python/
Sie können den Grad der Parallelität anpassen, indem Sie in Executor max_workers angeben. Was jedoch passiert, wenn Sie mit einer Geschwindigkeit senden, die die Anzahl der Parallelen überschreitet, wird nicht blockiert. Stattdessen scheint es im Speicher zu speichern. Aufgrund dieses Verhaltens kann das Ausführen in großer Anzahl viel Speicherplatz beanspruchen.
with ThreadPoolExecutor(max_workers=10) as executor:
for i in range(0, 1024*1024): #Viele
executor.submit(fn, i) #zu machen
#Die for-Schleife endet bald, aber der Speicherverbrauch soll groß sein
Tatsächlich verbraucht das Schreiben von Code, der 1 Million Schleifen durchläuft, etwa 2 GB Speicher. Also beschloss ich darüber nachzudenken, wie ich damit umgehen sollte.
Nach Überprüfung der internen Implementierung hat ThreadPoolExecutor intern eine Warteschlange. Submit erstellt ein Objekt namens WorkItem und stellt es in die Warteschlange. Diese interne Warteschlange hat keine Obergrenze und kann niemals blockiert werden, sodass Sie endlos senden können.
Übrigens wird der Worker-Thread zum Zeitpunkt des Packens erstellt. [Worker-Thread holt Daten aus der Warteschlange und führt sie in einer Endlosschleife aus](https://github.com/python/cpython/ blob / v3.8.6 / Lib / concurrent / futures / thread.py # L66).
Beobachten wir die Bewegung. Führen Sie beispielsweise eine Funktion aus, die 5000 Sekunden lang 0,01 Sekunden dauert. Lassen Sie uns dies mit max_workers = 10 drehen.
Betrachten Sie den Zeitstempel und den Speicher (diesmal maxrss) als Fortschritt in der for-Anweisung.
https://wandbox.org/permlink/n2P2CQssjhj1eOFw
Anhand des Zeitstempels können Sie erkennen, dass beim Senden keine Blockierung aufgetreten ist (das Senden der for-Schleife ist sofort abgeschlossen und wartet fast auf das Herunterfahren). Sie können jedoch feststellen, dass der Speicherverbrauch im Verlauf des Prozesses zunimmt.
Dies ist die erste Methode, an die ich gedacht habe. Macht die im ThreadPool Executor verwendete Warteschlange zu einer Warteschlange mit einer Größe. Instanzvariablen erben und ersetzen.
https://wandbox.org/permlink/HJN0lRBR0VBYU0Pv
Sie können dem Zeitstempel entnehmen, dass während der Schleife eine Blockierung auftritt. Die Gesamtzeit ändert sich jedoch nicht wesentlich und der Speicherverbrauch ist sehr langsam.
Der Code ist einfach, aber es fühlt sich etwas schlampig an, in die interne Implementierung zu gelangen, und ProcessPoolExecutor verfügt nicht über diese Warteschlangen, sodass diese Methode nicht funktioniert.
Da Plan 1 nicht gut genug war, suchte ich nach etwas, das ich tun konnte, und fand den Referenzartikel.
https://www.bettercodebytes.com/theadpoolexecutor-with-a-bounded-queue-in-python/
Erstellen Sie eine Klasse BoundedExecutor, die PoolExecutor unter Bezugnahme auf die Referenzquelle umschließt. Da es API-kompatibel ist (außer Map), kann es als Ersatz verwendet werden.
Die interne Implementierung steuert die Parallelität, indem sie das Semaphor zum Zeitpunkt der Übermittlung herunterzählt und das Semaphor hochzählt, wenn die Worker-Verarbeitung abgeschlossen ist. "Wenn die Worker-Verarbeitung abgeschlossen ist" ist "wenn die von add_done_callback of future registrierte Funktion aufgerufen wird, wenn sie abgeschlossen ist". (Rückruf wird aufgerufen, wenn die Verarbeitung des Arbeiters abgeschlossen ist und wenn eine Ausnahme ausgelöst wird, sodass Tsuji übereinstimmen sollte.)
https://wandbox.org/permlink/jn4nN8leJonLi2ty
Dies ergab auch das gleiche Ergebnis wie in Plan 1.
Übrigens ist es besser, die Größe der Warteschlange so zu bestimmen, dass sie größer als max_workers ist (geben Sie im Code das Argument an oder ändern Sie es so, dass bounded_ratio = 1 zu bounded_ratio = 2 wird). Wenn Sie "Anzahl der Parallelen == Warteschlangengröße" festlegen, gibt es einen Zeitpunkt, an dem die Warteschlange leer wird, die Mitarbeiter spielen und die Gesamtvervollständigung geringfügig verzögert wird. Daher ist es besser, es ein wenig zu erhöhen.
https://wandbox.org/permlink/HPrJXNGxLeXzB1x2
Recommended Posts