Verwendung der Python-Multiprocessing (Fortsetzung 3) apply_async in einer Klasse mit Pool als Mitglied

Zweck

Ich habe seit gestern studiert, aber ich habe das Gefühl, dass ich endlich der Form nahe gekommen bin, die ich verwenden möchte, also werde ich hier noch einmal eine Notiz machen.

Das wollte ich realisieren. Ich habe gestern eine Nachricht hinterlassen, aber es fühlt sich an, als hätte ich mich hier niedergelassen.

Spezifikationen und Implementierungsrichtlinien

Als Spezifikation

Damit

Implementierung

Ich habe es so geschrieben.

from multiprocessing import Pool
from time import sleep
from os import getpid, getppid, cpu_count
from datetime import datetime, time, date, timedelta
import sys, json

def _data_handler(o):
    """ json.Standard mit Dump=_data_Als Handler verwenden"""
    if isinstance(o, (datetime, date) ):
        return o.strftime('%Y/%m/%d %H:%M:%S')
    elif hasattr(o, "isoformat"):
        return o.isoformat()
    else:
        return str(o)


class JobError(Exception):
    def __init__(self, job_id:int, pid:int, msg:str, error):
        self.job_id = job_id
        self.pid = pid
        self.msg = msg
        self.error = error


def f(*args, **kwargs):
    """Schlaf für die angegebene Zeit"""
    try:
        print("[{}---{}] f(args {} kwargs={})".format(getpid(), getppid(), args, kwargs))
        t = kwargs["params"]["sleep_time"]
        if t == 0:
            raise Exception("Exception!! sleep time = 0")
        sleep(t)
        return {"f_answer": 0.0, "pid": getpid(), "job_id": kwargs["job_id"]}
    except Exception as e:
        raise JobError(kwargs["job_id"], getpid(), "Exception in except in f", e)


class JobController(object):

    def __init__(self, num_cpu:int=0):
        """
Geben Sie die Anzahl der zu verwendenden Kerne an. Wenn 0, wird der Kern vom Betriebssystem verwaltet
        """
        print("main pid={} cpu_count={}".format(getpid(), cpu_count()))
        num_cpu = cpu_count()
        self._pool = Pool(num_cpu)
        self._working_jobs = {} # job_Sei id der Schlüssel. Im Verfahren
        self._closed_jobs = {} # job_Sei id der Schlüssel. Ende
        self._new_job_id = 0 #Job, der für den nächsten übergebenen Job verwendet werden soll_id

    def __del__(self):
        pass #Was ist zu tun, um das Verlassen von Zombies zu vermeiden (noch nicht bekannt)?.


    def my_cb(self, *args):
        """Verschieben Sie das Ergebnis eines erfolgreich abgeschlossenen Jobs in den Puffer"""
        print("callback args={} jobid={}".format(args, args[0]["job_id"]) )
        try:
            jobid = args[0]["job_id"]
            self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
            self._closed_jobs[jobid]["end_time"] = datetime.now()
            self._closed_jobs[jobid]["successful"] = True
            self._closed_jobs[jobid]["return"] = args
        except:
            pass
        if len(self._closed_jobs) == 0:
            self._pool.join()


    def my_err_cb(self, args):
        """Verschieben Sie das Ergebnis des Jobs, der abnormal beendet wurde, in den Puffer. args ist JobError"""
        print("error callback args={} job_id={}".format(args, args.job_id) )
        try:
            jobid = args.job_id
            self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
            self._closed_jobs[jobid]["end_time"] = datetime.now()
            self._closed_jobs[jobid]["successful"] = False
            self._closed_jobs[jobid]["return"] = args
        except:
            pass
        if len(self._closed_jobs) == 0:
            self._pool.join()


    def PushJob(self, params:dict):
        """Löschen Sie den Job. Das Argument wird hier als Wörterbuchtyp angegeben. """
        print("PushJob ", getpid(), getppid())
        res = self._pool.apply_async(f, args=(1,), kwds={"params":params, "job_id":self._new_job_id},
            callback=self.my_cb, error_callback=self.my_err_cb)
        self._working_jobs[self._new_job_id] = {"start_time": datetime.now(), "async_res": res}
        self._new_job_id += 1


    def GetCurrentWorkingJobCount(self):
        """Anzahl der laufenden Jobs (geworfen, aber nicht beendet)"""
        return len(self._working_jobs)        


    def GetCurrentClosedJobCount(self):
        """Anzahl abgeschlossener Jobs"""
        return len(self._closed_jobs)        


if __name__ == "__main__":
    try: 
        print("main pid = {} ppid={}".format(getpid(), getppid()))
        job_controller = JobController(0)
        # 0.Senden Sie Jobs alle 5 Sekunden.
        for i in range(10):
            params = {"values": random.randn(3), "sleep_time": i % 7}
            job_controller.PushJob(params)
            sleep(0.5)
        #Der Status wird ausgegeben, bis keine Jobs mehr ausgeführt werden.
        while True:
            print("working_jobs {}:", job_controller.GetCurrentWorkingJobCount())
            print(json.dumps(job_controller._working_jobs, indent=2, default=_data_handler))
            print("closed_jobs {}:", job_controller.GetCurrentClosedJobCount())
            print(json.dumps(job_controller._closed_jobs, indent=2, default=_data_handler))
            if job_controller.GetCurrentWorkingJobCount() == 0:                
                break
            sleep(3)

    except:
       pass

Ergebnis

Vorerst funktioniert es wie erwartet. Sowohl die normale als auch die abnormale Beendigung werden in self._closd_jobs gestapelt, und schließlich wird _working_jobs zu 0. Die Start- und Endzeiten wurden ebenfalls korrekt aufgezeichnet.

Es war so.

Die Probleme sind wie folgt.

Ich frage mich jedoch, ob ich die Arbeit vorerst überleben kann. .. .. Ich bin vorerst zufrieden und beende. (2020/04/19, 18:17)

Nachtrag

――Wenn Sie versuchen, es in Ihrer eigenen Klasse zu verwenden, die Thread erbt,

NotImplementedError: pool objects cannot be passed between processes or pickled

Ich habe so einen Fehler bekommen. Ich habe es gelöst, aber es schien, als würde ich meinen eigenen Datentyp (GCP PubSub-Nachricht) direkt in ** kwargs (Wörterbuchtypdaten) einfügen. Ich weiß nicht, ob Thread beteiligt ist. (2020/04/20)

Recommended Posts

Verwendung der Python-Multiprocessing (Fortsetzung 3) apply_async in einer Klasse mit Pool als Mitglied
[Einführung in Python] Wie verwende ich eine Klasse in Python?
Verwendung von __slots__ in der Python-Klasse
Wie man tkinter mit Python in Pyenv benutzt
Python: So verwenden Sie Async mit
Verwendung von SQLite in Python
Wie man MySQL mit Python benutzt
Verwendung von PubChem mit Python
So verwenden Sie Python in Pyenv unter MacOS mit PyCall
[Einführung in Python] Wie verwende ich den Operator in in der for-Anweisung?
[Python] Wie man eine Klasse iterierbar macht
So arbeiten Sie mit BigQuery in Python
Wie bekomme ich Stacktrace in Python?
Verwendung regulärer Ausdrücke in Python
Verwendung ist und == in Python
[Python] Erklärt anhand eines konkreten Beispiels, wie die Bereichsfunktion verwendet wird
Verwendung der C-Bibliothek in Python
Lesen einer CSV-Datei mit Python 2/3
[REAPER] Wie man Reascript mit Python spielt
So löschen Sie einen Taple in einer Liste (Python)
Einbetten von Variablen in Python-Strings
So erstellen Sie eine JSON-Datei in Python
Zusammenfassung der Verwendung von MNIST mit Python
So benachrichtigen Sie Discord-Kanäle in Python
[Python] Wie zeichnet man mit Matplotlib ein Histogramm?
So geben Sie "Ketsumaimo" standardmäßig in Python aus
[Python] Erstellt eine Klasse, um mit pyaudio Sinuswellen im Hintergrund zu spielen
So legen Sie Google Text & Tabellen in einem Ordner zusammen in einer TXT-Datei mit Python ab
So erhalten Sie mit Python eine Liste der Dateien im selben Verzeichnis
[Python] Wie zeichnet man mit Matplotlib ein Liniendiagramm?
So führen Sie eine Hash-Berechnung mit Salt in Python durch
Erklären Sie ausführlich, wie Sie mit Python einen Sound erzeugen
So konvertieren Sie ein Klassenobjekt mit SQLAlchemy in ein Wörterbuch
So führen Sie Tests zusammen mit Python unittest aus
Ein Memorandum zur Verwendung von Keras 'keras.preprocessing.image
Verwendung des interaktiven Python-Modus mit Git Bash
Verwendung von Bootstrap in der generischen Klassenansicht von Django
Ich möchte mit einem Roboter in Python arbeiten.
So zeigen Sie DataFrame als Tabelle in Markdown an
Geben Sie MinGW als den in Python verwendeten Compiler an
Ich habe versucht zusammenzufassen, wie man Pandas von Python benutzt
Verwendung des in Lobe in Python erlernten Modells
[Python] So erstellen Sie mit Matplotlib ein zweidimensionales Histogramm
So führen Sie einen Befehl mit einem Unterprozess in Python aus
Wie identifiziere ich das Element mit der geringsten Anzahl von Zeichen in einer Python-Liste?
Eine Geschichte darüber, wie Windows 10-Benutzer eine Umgebung für die Verwendung von OpenCV3 mit Python 3.5 erstellt haben
Verwendung von Fixture in Django zur Eingabe von Beispieldaten für das Benutzermodell
python3: Verwendung der Flasche (2)
[Python] Verwendung von Liste 1
Wie benutzt man Python Argparse?
Python: Wie man pydub benutzt
[Python] Verwendung von checkio
Wie man in Python entwickelt
[Python] Verwendung von input ()
Wie benutzt man Python Lambda?
python3: Wie man eine Flasche benutzt
Verwendung von Python-Bytes
Verwendung von Fujifilm X-T3 als Webcam unter Ubuntu 20.04
So installieren Sie NPI + Senden Sie eine Nachricht an Python