Ich habe seit gestern studiert, aber ich habe das Gefühl, dass ich endlich der Form nahe gekommen bin, die ich verwenden möchte, also werde ich hier noch einmal eine Notiz machen.
Das wollte ich realisieren. Ich habe gestern eine Nachricht hinterlassen, aber es fühlt sich an, als hätte ich mich hier niedergelassen.
Als Spezifikation
Damit
`multiprocessing.Pool```. Verwalten Sie außerdem das Ergebnis von
`apply_async```.Ich habe es so geschrieben.
from multiprocessing import Pool
from time import sleep
from os import getpid, getppid, cpu_count
from datetime import datetime, time, date, timedelta
import sys, json
def _data_handler(o):
""" json.Standard mit Dump=_data_Als Handler verwenden"""
if isinstance(o, (datetime, date) ):
return o.strftime('%Y/%m/%d %H:%M:%S')
elif hasattr(o, "isoformat"):
return o.isoformat()
else:
return str(o)
class JobError(Exception):
def __init__(self, job_id:int, pid:int, msg:str, error):
self.job_id = job_id
self.pid = pid
self.msg = msg
self.error = error
def f(*args, **kwargs):
"""Schlaf für die angegebene Zeit"""
try:
print("[{}---{}] f(args {} kwargs={})".format(getpid(), getppid(), args, kwargs))
t = kwargs["params"]["sleep_time"]
if t == 0:
raise Exception("Exception!! sleep time = 0")
sleep(t)
return {"f_answer": 0.0, "pid": getpid(), "job_id": kwargs["job_id"]}
except Exception as e:
raise JobError(kwargs["job_id"], getpid(), "Exception in except in f", e)
class JobController(object):
def __init__(self, num_cpu:int=0):
"""
Geben Sie die Anzahl der zu verwendenden Kerne an. Wenn 0, wird der Kern vom Betriebssystem verwaltet
"""
print("main pid={} cpu_count={}".format(getpid(), cpu_count()))
num_cpu = cpu_count()
self._pool = Pool(num_cpu)
self._working_jobs = {} # job_Sei id der Schlüssel. Im Verfahren
self._closed_jobs = {} # job_Sei id der Schlüssel. Ende
self._new_job_id = 0 #Job, der für den nächsten übergebenen Job verwendet werden soll_id
def __del__(self):
pass #Was ist zu tun, um das Verlassen von Zombies zu vermeiden (noch nicht bekannt)?.
def my_cb(self, *args):
"""Verschieben Sie das Ergebnis eines erfolgreich abgeschlossenen Jobs in den Puffer"""
print("callback args={} jobid={}".format(args, args[0]["job_id"]) )
try:
jobid = args[0]["job_id"]
self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
self._closed_jobs[jobid]["end_time"] = datetime.now()
self._closed_jobs[jobid]["successful"] = True
self._closed_jobs[jobid]["return"] = args
except:
pass
if len(self._closed_jobs) == 0:
self._pool.join()
def my_err_cb(self, args):
"""Verschieben Sie das Ergebnis des Jobs, der abnormal beendet wurde, in den Puffer. args ist JobError"""
print("error callback args={} job_id={}".format(args, args.job_id) )
try:
jobid = args.job_id
self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
self._closed_jobs[jobid]["end_time"] = datetime.now()
self._closed_jobs[jobid]["successful"] = False
self._closed_jobs[jobid]["return"] = args
except:
pass
if len(self._closed_jobs) == 0:
self._pool.join()
def PushJob(self, params:dict):
"""Löschen Sie den Job. Das Argument wird hier als Wörterbuchtyp angegeben. """
print("PushJob ", getpid(), getppid())
res = self._pool.apply_async(f, args=(1,), kwds={"params":params, "job_id":self._new_job_id},
callback=self.my_cb, error_callback=self.my_err_cb)
self._working_jobs[self._new_job_id] = {"start_time": datetime.now(), "async_res": res}
self._new_job_id += 1
def GetCurrentWorkingJobCount(self):
"""Anzahl der laufenden Jobs (geworfen, aber nicht beendet)"""
return len(self._working_jobs)
def GetCurrentClosedJobCount(self):
"""Anzahl abgeschlossener Jobs"""
return len(self._closed_jobs)
if __name__ == "__main__":
try:
print("main pid = {} ppid={}".format(getpid(), getppid()))
job_controller = JobController(0)
# 0.Senden Sie Jobs alle 5 Sekunden.
for i in range(10):
params = {"values": random.randn(3), "sleep_time": i % 7}
job_controller.PushJob(params)
sleep(0.5)
#Der Status wird ausgegeben, bis keine Jobs mehr ausgeführt werden.
while True:
print("working_jobs {}:", job_controller.GetCurrentWorkingJobCount())
print(json.dumps(job_controller._working_jobs, indent=2, default=_data_handler))
print("closed_jobs {}:", job_controller.GetCurrentClosedJobCount())
print(json.dumps(job_controller._closed_jobs, indent=2, default=_data_handler))
if job_controller.GetCurrentWorkingJobCount() == 0:
break
sleep(3)
except:
pass
Vorerst funktioniert es wie erwartet. Sowohl die normale als auch die abnormale Beendigung werden in self._closd_jobs gestapelt, und schließlich wird _working_jobs zu 0. Die Start- und Endzeiten wurden ebenfalls korrekt aufgezeichnet.
Es war so.
Die Probleme sind wie folgt.
Ich frage mich jedoch, ob ich die Arbeit vorerst überleben kann. .. .. Ich bin vorerst zufrieden und beende. (2020/04/19, 18:17)
――Wenn Sie versuchen, es in Ihrer eigenen Klasse zu verwenden, die Thread erbt,
NotImplementedError: pool objects cannot be passed between processes or pickled
Ich habe so einen Fehler bekommen. Ich habe es gelöst, aber es schien, als würde ich meinen eigenen Datentyp (GCP PubSub-Nachricht) direkt in ** kwargs (Wörterbuchtypdaten) einfügen. Ich weiß nicht, ob Thread beteiligt ist. (2020/04/20)
Recommended Posts