Python 3.7.4 Windows 10
Bei der Verwendung von ProcessPoolExecutor für die parallele Verarbeitung ist ein Fehler aufgetreten. Unten ist der Code.
import concurrent
import concurrent.futures
class Process:
def __init__(self):
self.process_list = []
self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
# self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) #Funktioniert mit der Thread-Verarbeitung
def _process(self, n):
#Verarbeitungseinheit
return 1
def start_process(self, n):
#Ausführungsabteilung
self.process_list.append(self.executor.submit(self._process, n))
def get_result(self):
#Ergebnisse bekommen
print("wait...")
concurrent.futures.wait(self.process_list, return_when=concurrent.futures.FIRST_EXCEPTION)
print("all processes were finished")
res_list = [res.result() for res in self.process_list]
print("got result")
self.executor.shutdown(wait=True)
print("shutdown")
self.process_list = []
return res_list
if __name__ == "__main__":
process = Process()
for i in range(10):
process.start_process(i)
result = process.get_result()
print(result)
Wenn ich diesen Code ausführe, heißt es "res_list = [res.result () für res in self.process_list]"
TypeError: can't pickle _thread.RLock objects
Fehler tritt auf.
ThreadPoolExecutor funktioniert übrigens.
Teil bearbeiten
def _process(self, n):
#Verarbeitungseinheit
return 1
Von
@staticmethod
def _process(n):
#Verarbeitungseinheit
return 1
Ändern. Wenn die Verarbeitungseinheit die Instanzvariable von sich selbst (self) verwendet, fügen Sie sie dem Argument hinzu.
Wenn Sie Ihre eigene Instanzmethode verwendet haben, müssen Sie diese Methode in eine Methode ändern, die nicht selbst als Argument verwendet (statische Methode, Klassenmethode usw.).
Ich denke, es ist in Ordnung, wenn Sie kein Objekt übergeben, das ein Objekt enthält, das nicht ausgewählt werden kann (ProcessPoolExecutor, queue.Queue, threading.Lock, threading.RLock usw.), als Argument für die an ProcessPoolExecutor übergebene Methode.
Dieses Problem ist darauf zurückzuführen, dass die Klasseninstanz einen ProcessPoolExecutor enthält, ein Objekt, das nicht ausgewählt werden kann, und über das Selbstargument der Instanzmethode an mehrere Prozesse übergeben wird.
Gemäß der ProcessPoolExecutor-Dokumentation
ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. (Quelle: https://docs.python.org/ja/3/library/concurrent.futures.html#processpoolexecutor)
Wie geschrieben steht, können nur Objekte ausgeführt und zurückgegeben werden, die eingelegt werden können.
Daher kann dies vermieden werden, indem die Instanzmethode nicht verwendet wird (unter Verwendung einer Methode, die sich nicht selbst als Argument verwendet).
Dies ist ein Beispiel für Änderungen bei der Verwendung von Instanzvariablen. self.calc
und self.hoge
sind Instanzvariablen.
"""
Wenn Sie Instanzvariablen verwendet haben
"""
import concurrent
import concurrent.futures
class Calc:
def __init__(self, a):
self.a = a
def calc(self, n):
return self.a + n
class Process:
def __init__(self):
self.process_list = []
self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
# self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
self.calc = Calc(10) #Instanz der zu verarbeitenden Klasse
self.hoge = 3 #Instanzvariable
def _process_bad(self, n):
res = self.calc.calc(n) * self.hoge
return res
@staticmethod
def _process(calc, n, hoge):
res = calc.calc(n) * hoge
return res
def start_process(self, n):
#Ausführungsabteilung
# self.process_list.append(self.executor.submit(self._process_bad, n)) # NG
self.process_list.append(self.executor.submit(self._process, self.calc, n, self.hoge)) # OK
def get_result(self):
#Kürzung
if __name__ == "__main__":
process = Process()
for i in range(10):
process.start_process(i)
result = process.get_result()
print(result)
Wenn Sie es wie "_process_bad ()" geschrieben haben, müssen Sie alle in der Methode verwendeten Variablen als Argumente wie "_process ()" übergeben.
Außerdem darf die Klasse "Calc" keine Objekte enthalten, die nicht eingelegt werden können.
Übrigens funktioniert es nicht nur als statische Methode, sondern auch, wenn Sie es zu einer Klassenmethode machen oder eine Methode außerhalb der Klasse aufrufen.
#Beispiel einer Klassenmethode
@classmethod
def _process(cls, calc, n, hoge):
res = calc.calc(n) * hoge
return res
Als ich auf dieses Problem stieß, wurde ich heruntergefahren, bevor ich die Ergebnisse erhielt, wie unten gezeigt.
def get_result(self):
#Ergebnisse bekommen
self.executor.shutdown(wait=True) #Herunterfahren, bevor Sie bekommen
res_list = [res.result() for res in self.process_list]
self.process_list = []
return res_list
[1] https://bugs.python.org/issue29423 (using concurrent.futures.ProcessPoolExecutor in class giving 'TypeError: can't pickle _thread.RLock objects' in 3.6, but not 3.5)
Ich habe auch falsch verstanden, dass es kein Problem gibt, wenn ich es nicht einmal in den Rückgabewert setze, wenn es nicht eingelegt werden kann, also kannte ich die Ursache für eine Weile nicht. Es war ein Multiprozessprozess, den ich mit Fehlern geschrieben habe, aber der Prozess, den ich ausführen wollte, hatte eine Ausführungszeit, die sich nicht wesentlich von der Multithread-Verarbeitung unterschied. Bei Verwendung einer externen Bibliothek können möglicherweise sogar mehrere CPUs für die Multithread-Verarbeitung verwendet werden. Referenz [2] https://qiita.com/walkure/items/2751b5b8932873e7a5bf (Lambda-Ausdruck kann in ProcessPoolExector.map nicht an func übergeben werden) [3] https://qiita.com/kokumura/items/2e3afc1034d5aa7c6012 (concurrent.ut) Nutzungsnotiz) [4] https://docs.python.org/ja/3.6/library/concurrent.futures.html (17.4. Concurrent.futures - Parallele Taskausführung) [5] https://qiita.com/walkure/items/2751b5b8932873e7a5bf (Lambda-Ausdruck kann in ProcessPoolExector.map nicht an func übergeben werden) [6] https://qiita.com/kaitolucifer/items/e4ace07bd8e112388c75#4-concurrentfutures (Vollständiges Verständnis von Python-Threading und Multiprocessing)