How to use python multiprocessing (continued 3) apply_async in class with Pool as a member

Purpose

I've been studying since yesterday, but I feel like I've finally come close to the shape I want to use, so I'll make a note here again.

--I want to move processing asynchronously without blocking it in a separate process. ――I want to catch the end properly

I wanted to realize that. I left a note yesterday, but it feels like I've settled down here.

-How to use python multiprocessing -How to use python multiprocessing (continued) Pool -How to use python multiprocessing (continued 3) apply_async in class with Pool as a member

Specifications and implementation policy

As a specification

--The input of the process is called by the Push method that takes a parameter as an argument. This method does not block. --Internally, it has a record of in-process and end processing. --If you need to tell the end, you can implement that too

So

--Implement one class. Only the processing is an individual function. (Image to external file) --Use multiprocessing.Pool. Besides this, manage the result of apply_async. --The callback function set when applying_async is also set as a method of the class so that the queue that is a member of the class can be accessed. --(I don't know if this is good) Internally, it is managed using job_id. Implement so that the number is unique to the input processing request.

Implementation

I wrote it like this.

from multiprocessing import Pool
from time import sleep
from os import getpid, getppid, cpu_count
from datetime import datetime, time, date, timedelta
import sys, json

def _data_handler(o):
    """ json.default with dump=_data_Use as a handler"""
    if isinstance(o, (datetime, date) ):
        return o.strftime('%Y/%m/%d %H:%M:%S')
    elif hasattr(o, "isoformat"):
        return o.isoformat()
    else:
        return str(o)


class JobError(Exception):
    def __init__(self, job_id:int, pid:int, msg:str, error):
        self.job_id = job_id
        self.pid = pid
        self.msg = msg
        self.error = error


def f(*args, **kwargs):
    """Sleep for the specified time"""
    try:
        print("[{}---{}] f(args {} kwargs={})".format(getpid(), getppid(), args, kwargs))
        t = kwargs["params"]["sleep_time"]
        if t == 0:
            raise Exception("Exception!! sleep time = 0")
        sleep(t)
        return {"f_answer": 0.0, "pid": getpid(), "job_id": kwargs["job_id"]}
    except Exception as e:
        raise JobError(kwargs["job_id"], getpid(), "Exception in except in f", e)


class JobController(object):

    def __init__(self, num_cpu:int=0):
        """
Specify the number of cores to use. If it is 0, the core managed by the OS
        """
        print("main pid={} cpu_count={}".format(getpid(), cpu_count()))
        num_cpu = cpu_count()
        self._pool = Pool(num_cpu)
        self._working_jobs = {} # job_Let id be the key. In the process of
        self._closed_jobs = {} # job_Let id be the key. End
        self._new_job_id = 0 #Job to use for the next submitted job_id

    def __del__(self):
        pass #What you should do to avoid leaving zombies (not yet known).


    def my_cb(self, *args):
        """Move the result of a successfully completed Job to the buffer"""
        print("callback args={} jobid={}".format(args, args[0]["job_id"]) )
        try:
            jobid = args[0]["job_id"]
            self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
            self._closed_jobs[jobid]["end_time"] = datetime.now()
            self._closed_jobs[jobid]["successful"] = True
            self._closed_jobs[jobid]["return"] = args
        except:
            pass
        if len(self._closed_jobs) == 0:
            self._pool.join()


    def my_err_cb(self, args):
        """Move the result of the job that ended abnormally to the buffer. args is JobError"""
        print("error callback args={} job_id={}".format(args, args.job_id) )
        try:
            jobid = args.job_id
            self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
            self._closed_jobs[jobid]["end_time"] = datetime.now()
            self._closed_jobs[jobid]["successful"] = False
            self._closed_jobs[jobid]["return"] = args
        except:
            pass
        if len(self._closed_jobs) == 0:
            self._pool.join()


    def PushJob(self, params:dict):
        """Drop the Job. Arguments are given here as dictionary type data. """
        print("PushJob ", getpid(), getppid())
        res = self._pool.apply_async(f, args=(1,), kwds={"params":params, "job_id":self._new_job_id},
            callback=self.my_cb, error_callback=self.my_err_cb)
        self._working_jobs[self._new_job_id] = {"start_time": datetime.now(), "async_res": res}
        self._new_job_id += 1


    def GetCurrentWorkingJobCount(self):
        """Number of jobs in process (introduced but not finished)"""
        return len(self._working_jobs)        


    def GetCurrentClosedJobCount(self):
        """Number of completed Jobs"""
        return len(self._closed_jobs)        


if __name__ == "__main__":
    try: 
        print("main pid = {} ppid={}".format(getpid(), getppid()))
        job_controller = JobController(0)
        # 0.Submit Jobs every 5 seconds.
        for i in range(10):
            params = {"values": random.randn(3), "sleep_time": i % 7}
            job_controller.PushJob(params)
            sleep(0.5)
        #The state is output until there are no more Jobs in progress.
        while True:
            print("working_jobs {}:", job_controller.GetCurrentWorkingJobCount())
            print(json.dumps(job_controller._working_jobs, indent=2, default=_data_handler))
            print("closed_jobs {}:", job_controller.GetCurrentClosedJobCount())
            print(json.dumps(job_controller._closed_jobs, indent=2, default=_data_handler))
            if job_controller.GetCurrentWorkingJobCount() == 0:                
                break
            sleep(3)

    except:
       pass

result

For the time being, it works as expected. Both normal termination and abnormal termination are piled up in self._closd_jobs, and finally _working_jobs becomes 0. The start and end times were also recorded correctly.

--There seems to be no problem with the transfer of information and the input of processes.

It was like that.

The issues are as follows.

--The function to end the process being worked on is not implemented. ――Is it okay to continue using Pool without doing the work like clearing? Is worrisome. ――Can you change the number of cores used in Pool on the way? ――What happens when you use it with Kubernetes?

However, I wonder if I can survive the work for the time being. .. .. I am satisfied for the time being and finish. (2020/04/19, 18:17)

Postscript

――Actually, if you try to use it in your own class that inherits Thread,

NotImplementedError: pool objects cannot be passed between processes or pickled

I got an error like this. I solved it, but it seemed that I was putting my own type of data (GCP PubSub message) directly into ** kwargs (dictionary type data). I don't know if Thread is involved. (2020/04/20)

--When I try to Pool apply_async again in the apply_async function, I get AssertionError: daemonic processes are not allowed to have children. The challenge is how to deal with this. Tears (2020/05/13)

Recommended Posts

How to use python multiprocessing (continued 3) apply_async in class with Pool as a member
[Introduction to Python] How to use class in Python?
How to use __slots__ in Python class
How to use tkinter with python in pyenv
How to write a Python class
Python: How to use async with
How to use SQLite in Python
How to use Mysql in python
How to use PubChem in Python
How to create a heatmap with an arbitrary domain in Python
How to use python put in pyenv on macOS with PyCall
[Introduction to Python] How to use the in operator in a for statement?
[Python] How to make a class iterable
How to work with BigQuery in Python
How to get a stacktrace in python
How to use regular expressions in Python
How to use is and == in Python
[Python] Explains how to use the range function with a concrete example
How to use the C library in Python
How to read a CSV file with Python 2/3
[REAPER] How to play with Reascript in Python
How to clear tuples in a list (Python)
How to embed a variable in a python string
How to create a JSON file in Python
Summary of how to use MNIST in Python
How to notify a Discord channel in Python
[Python] How to draw a histogram in Matplotlib
How to output "Ketsumaimo" as standard output in Python
[Python] Created a class to play sin waves in the background with pyaudio
How to drop Google Docs in one folder in a .txt file with python
How to get a list of files in the same directory with python
[Python] How to draw a line graph with Matplotlib
How to do hash calculation with salt in Python
Explain in detail how to make sounds with python
How to convert a class object to a dictionary with SQLAlchemy
How to run tests in bulk with Python unittest
A memorandum on how to use keras.preprocessing.image in Keras
How to use python interactive mode with git bash
How to use bootstrap in Django generic class view
I want to work with a robot in python.
How to display DataFrame as a table in Markdown
Specify MinGW as the compiler to use with Python
I tried to summarize how to use pandas in python
How to use the model learned in Lobe in Python
[Python] How to create a 2D histogram with Matplotlib
How to execute a command using subprocess in Python
How to identify the element with the smallest number of characters in a Python list?
A story about how Windows 10 users created an environment to use OpenCV3 with Python 3.5
How to use fixture in Django to populate sample data associated with a user model
python3: How to use bottle (2)
[Python] How to use list 1
How to use Python argparse
Python: How to use pydub
[Python] How to use checkio
How to develop in Python
[Python] How to use input ()
How to use Python lambda
python3: How to use bottle
How to use Python bytes
How to use Fujifilm X-T3 as a webcam on Ubuntu 20.04
How to install NPI + send a message to line with python