Comment utiliser le multitraitement python (suite 3) apply_async en classe avec Pool en tant que membre

Objectif

J'étudie depuis hier, mais je sens que je me suis enfin rapproché de la forme que je veux utiliser, alors je vais faire une note ici à nouveau.

--Je souhaite exécuter le traitement pour une entrée asynchrone sans le bloquer dans un processus séparé. ――Je veux bien attraper la fin

Je voulais m'en rendre compte. J'ai laissé un mot hier, mais j'ai l'impression de m'être installé ici.

Spécifications et politique de mise en œuvre

Comme spécification

Alors

la mise en oeuvre

Je l'ai écrit comme ça.

from multiprocessing import Pool
from time import sleep
from os import getpid, getppid, cpu_count
from datetime import datetime, time, date, timedelta
import sys, json

def _data_handler(o):
    """ json.par défaut avec vidage=_data_Utiliser comme gestionnaire"""
    if isinstance(o, (datetime, date) ):
        return o.strftime('%Y/%m/%d %H:%M:%S')
    elif hasattr(o, "isoformat"):
        return o.isoformat()
    else:
        return str(o)


class JobError(Exception):
    def __init__(self, job_id:int, pid:int, msg:str, error):
        self.job_id = job_id
        self.pid = pid
        self.msg = msg
        self.error = error


def f(*args, **kwargs):
    """Dormez pendant la durée spécifiée"""
    try:
        print("[{}---{}] f(args {} kwargs={})".format(getpid(), getppid(), args, kwargs))
        t = kwargs["params"]["sleep_time"]
        if t == 0:
            raise Exception("Exception!! sleep time = 0")
        sleep(t)
        return {"f_answer": 0.0, "pid": getpid(), "job_id": kwargs["job_id"]}
    except Exception as e:
        raise JobError(kwargs["job_id"], getpid(), "Exception in except in f", e)


class JobController(object):

    def __init__(self, num_cpu:int=0):
        """
Spécifiez le nombre de cœurs à utiliser. Si 0, le noyau géré par le système d'exploitation
        """
        print("main pid={} cpu_count={}".format(getpid(), cpu_count()))
        num_cpu = cpu_count()
        self._pool = Pool(num_cpu)
        self._working_jobs = {} # job_Laissez id être la clé. Dans le processus de
        self._closed_jobs = {} # job_Laissez id être la clé. Fin
        self._new_job_id = 0 #Travail à utiliser pour le prochain travail soumis_id

    def __del__(self):
        pass #Que faire pour éviter de laisser des zombies (pas encore connu).


    def my_cb(self, *args):
        """Déplacer le résultat d'un Job terminé avec succès vers la mémoire tampon"""
        print("callback args={} jobid={}".format(args, args[0]["job_id"]) )
        try:
            jobid = args[0]["job_id"]
            self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
            self._closed_jobs[jobid]["end_time"] = datetime.now()
            self._closed_jobs[jobid]["successful"] = True
            self._closed_jobs[jobid]["return"] = args
        except:
            pass
        if len(self._closed_jobs) == 0:
            self._pool.join()


    def my_err_cb(self, args):
        """Déplacez le résultat du travail qui s'est terminé anormalement dans la mémoire tampon. args est JobError"""
        print("error callback args={} job_id={}".format(args, args.job_id) )
        try:
            jobid = args.job_id
            self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
            self._closed_jobs[jobid]["end_time"] = datetime.now()
            self._closed_jobs[jobid]["successful"] = False
            self._closed_jobs[jobid]["return"] = args
        except:
            pass
        if len(self._closed_jobs) == 0:
            self._pool.join()


    def PushJob(self, params:dict):
        """Supprimez le travail. L'argument est donné ici sous forme de données de type dictionnaire. """
        print("PushJob ", getpid(), getppid())
        res = self._pool.apply_async(f, args=(1,), kwds={"params":params, "job_id":self._new_job_id},
            callback=self.my_cb, error_callback=self.my_err_cb)
        self._working_jobs[self._new_job_id] = {"start_time": datetime.now(), "async_res": res}
        self._new_job_id += 1


    def GetCurrentWorkingJobCount(self):
        """Nombre de travaux en cours (lancés mais non terminés)"""
        return len(self._working_jobs)        


    def GetCurrentClosedJobCount(self):
        """Nombre de travaux terminés"""
        return len(self._closed_jobs)        


if __name__ == "__main__":
    try: 
        print("main pid = {} ppid={}".format(getpid(), getppid()))
        job_controller = JobController(0)
        # 0.Soumettre des travaux toutes les 5 secondes.
        for i in range(10):
            params = {"values": random.randn(3), "sleep_time": i % 7}
            job_controller.PushJob(params)
            sleep(0.5)
        #L'état est affiché jusqu'à ce qu'il n'y ait plus de Jobs en cours.
        while True:
            print("working_jobs {}:", job_controller.GetCurrentWorkingJobCount())
            print(json.dumps(job_controller._working_jobs, indent=2, default=_data_handler))
            print("closed_jobs {}:", job_controller.GetCurrentClosedJobCount())
            print(json.dumps(job_controller._closed_jobs, indent=2, default=_data_handler))
            if job_controller.GetCurrentWorkingJobCount() == 0:                
                break
            sleep(3)

    except:
       pass

résultat

Pour le moment, cela fonctionne comme prévu. La terminaison normale et la terminaison anormale sont empilées dans self._closd_jobs, et finalement _working_jobs devient 0. Les heures de début et de fin ont également été enregistrées correctement.

C'était comme ça.

Les problèmes sont les suivants.

Cependant, je me demande si je peux survivre au travail pour le moment. .. .. Je suis satisfait pour le moment et je termine. (2020/04/19, 18:17)

Postscript

――En fait, si vous essayez de l'utiliser dans votre propre classe qui hérite de Thread,

NotImplementedError: pool objects cannot be passed between processes or pickled

J'ai eu une erreur comme celle-ci. Je l'ai résolu, mais il semblait que je mettais mon propre type de données (message GCP PubSub) directement dans ** kwargs (données de type dictionnaire). Je ne sais pas si Thread est impliqué. (2020/04/20)

--Lorsque j'essaye à nouveau de Pool apply_async dans la fonction apply_async, j'obtiens ```AssertionError: les processus démoniaques ne sont pas autorisés à avoir des enfants '' Le défi est de faire face à cela. Larmes (2020/05/13)

Recommended Posts

Comment utiliser le multitraitement python (suite 3) apply_async en classe avec Pool en tant que membre
[Introduction à Python] Comment utiliser la classe en Python?
Comment utiliser __slots__ dans la classe Python
Comment utiliser tkinter avec python dans pyenv
Python: comment utiliser async avec
Comment utiliser SQLite en Python
Comment utiliser Mysql avec python
Comment utiliser PubChem avec Python
Pour utiliser python, mettez pyenv sur macOS avec PyCall
[Introduction à Python] Comment utiliser l'opérateur in dans l'instruction for?
[Python] Comment rendre une classe itérable
Comment utiliser BigQuery en Python
Comment obtenir stacktrace en python
Comment utiliser les expressions régulières en Python
Comment utiliser is et == en Python
[Python] Explique comment utiliser la fonction range avec un exemple concret
Comment utiliser la bibliothèque C en Python
Comment lire un fichier CSV avec Python 2/3
[REAPER] Comment jouer à Reascript avec Python
Comment effacer un taple dans une liste (Python)
Comment incorporer des variables dans des chaînes python
Comment créer un fichier JSON en Python
Résumé de l'utilisation de MNIST avec Python
Comment notifier les canaux Discord en Python
[Python] Comment dessiner un histogramme avec Matplotlib
Comment générer "Ketsumaimo" en standard en Python
[Python] A créé une classe pour jouer des vagues de péché en arrière-plan avec pyaudio
Comment déposer Google Docs dans un dossier dans un fichier .txt avec python
Comment obtenir une liste de fichiers dans le même répertoire avec python
[Python] Comment dessiner un graphique linéaire avec Matplotlib
Comment faire un calcul de hachage avec Salt en Python
Expliquez en détail comment créer un son avec python
Comment convertir un objet de classe en dictionnaire avec SQLAlchemy
Comment exécuter des tests avec Python unittest
Un mémorandum sur l'utilisation de keras.preprocessing.image de Keras
Comment utiliser le mode interactif python avec git bash
Comment utiliser le bootstrap dans la vue de classe générique Django
Je veux travailler avec un robot en python.
Comment afficher DataFrame sous forme de tableau dans Markdown
Spécifiez MinGW comme compilateur utilisé dans Python
J'ai essayé de résumer comment utiliser les pandas de python
Comment utiliser le modèle appris dans Lobe en Python
[Python] Comment créer un histogramme bidimensionnel avec Matplotlib
Comment exécuter une commande à l'aide d'un sous-processus en Python
Comment identifier l'élément avec le plus petit nombre de caractères dans une liste Python?
Une histoire sur la façon dont les utilisateurs de Windows 10 ont créé un environnement pour utiliser OpenCV3 avec Python 3.5
Comment utiliser fixture dans Django pour saisir des exemples de données associés au modèle utilisateur
python3: Comment utiliser la bouteille (2)
[Python] Comment utiliser la liste 1
Comment utiliser Python Argparse
Python: comment utiliser pydub
[Python] Comment utiliser checkio
Comment développer en Python
[Python] Comment utiliser input ()
Comment utiliser Python lambda
python3: Comment utiliser la bouteille
Comment utiliser les octets Python
Comment utiliser Fujifilm X-T3 comme webcam sur Ubuntu 20.04
Comment installer NPI + envoyer un message à la ligne avec python