Ich habe ein Manager-Objekt für die Statusverwaltung zwischen Prozessen verwendet.
#Liste alle 3 Sekunden anzeigen def list_print(test_list): while True: print(str(test_list)) time.sleep(3)
#Fügen Sie der Liste alle 2 Sekunden ein hinzu def list_append(test_list): while True: test_list.append("a") time.sleep(2)
if name == 'main': manager = Manager() test_list=manager.list() print("Liste vor Funktionsausführung" +str(test_list)) p1=Process(target=list_print,args=(test_list)) p2=Process(target=list_append,args=(test_list)) p1.start() p2.start() p1.join() p2.join()
<h2> 4. Ausführungsergebnis </ h2>
Traceback (most recent call last): Traceback (most recent call last): File "/Applications/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap self.run() File "/Applications/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run self._target(*self._args, **self._kwargs) File "/Applications/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap self.run() File "/Applications/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: list_append() missing 1 required positional argument: 'test_list' TypeError: list_print() missing 1 required positional argument: 'test_list'
Anscheinend war der Weg, das Argument zu nehmen, schlecht und ein Fehler trat auf.
Ich habe mich gefragt, ob das Manager-Objekt keine Listentypen unterstützt, aber es heißt offiziell, dass es Listen unterstützt.
Nachdem ich die Referenz eine Weile gelesen hatte, fand ich eine Lösung.
<h2> 5. Lösung </ h2>
Es wurde gelöst, indem ein Wörterbuch als erstes Argument der auszuführenden Funktion definiert wurde.
Es definiert einen leeren Wörterbuch-Dummy und nimmt ihn als Argument.
Der Code ist unten.
```python
from multiprocessing import Process,Manager
import time
#Liste anzeigen
def list_print(dummy, test_list):
while True:
print(str(test_list))
time.sleep(3)
#Fügen Sie der Liste ein hinzu
def list_append(dummy, test_list):
while True:
test_list.append("a")
time.sleep(2)
if __name__ == '__main__':
manager = Manager()
#Definieren Sie ein leeres Wörterbuch
dummy = manager.dict()
test_list=manager.list()
print("Liste vor Funktionsausführung" +str(test_list))
#Fügen Sie dem ersten Argument ein leeres Wörterbuch hinzu
p1=Process(target=list_print,args=(dummy, test_list))
p2=Process(target=list_append,args=(dummy, test_list))
p1.start()
p2.start()
p1.join()
p2.join()
Liste vor Funktionsausführung[]
[]
['a', 'a']
['a', 'a', 'a']
['a', 'a', 'a', 'a', 'a']
das ist alles.
Recommended Posts