[PYTHON] Beispiel für die Implementierung einer Jobwarteschlange mit Tornados Collout

Dies ist ein Memo, als ich mit Tornados Collout eine Jobwarteschlange erstellt habe.

Stellen Sie Jobs (Funktionen und Argumente) mit der WorkerQueue-Put-Methode in die Warteschlange. Sobald der aktuell ausgeführte Job beendet ist, werden die Jobs in der Warteschlange einzeln ausgeführt, beginnend mit dem ältesten.

Ich verwende die in Version 4.2 von Tornado implementierte Warteschlange. Tornado-Warteschlangen ähneln der Python-Standardbibliotheks-Synchronisierungswarteschlange (queue.Queue), mit der Ausnahme, dass put und get return tornado.concurrent.Future.

filename


from concurrent.futures import ProcessPoolExecutor
import time

from tornado import ioloop, gen, process
from tornado.queues import Queue


class WorkerQueue(object):
    def __init__(self):
        self.queue = Queue()
        self.current_worker_id = None
        self.current_worker = None
        self.queued_ids = []
        self._dispatcher()

    def put(self, id_, func, args):
        worker = Worker(func, args)
        self.queued_ids.append(id_)
        self.queue.put_nowait((id_, worker))
        print("Put: {}".format(id_))

    def status(self, id_):
        if id_ in self.queued_ids:
            return "Queued"
        elif id_ == self.current_worker_id:
            return "Running"
        else:
            return "Ready"

    @gen.coroutine
    def _dispatcher(self):
        while 1:
            id_, worker = yield self.queue.get()
            self.queued_ids.remove(id_)
            self.current_worker_id = id_
            self.current_worker = worker
            print("Start: {}".format(id_))
            res = yield self.current_worker.execute()
            self.current_worker_id = None
            self.current_worker = None
            print("{} is {}.".format(id_, res))


class Worker(object):
    def __init__(self, func, args):
        self.func = func
        self.args = args

    @gen.coroutine
    def execute(self):
        with ProcessPoolExecutor(process.cpu_count()) as exec_:
            res = yield exec_.submit(self.func, *self.args)
        return res


def job(sec):
    time.sleep(sec)
    return "done"


@gen.coroutine
def run():
    q = WorkerQueue()
    q.put("Job1", job, (2,))
    q.put("Job2", job, (2,))
    print("Job1 <{}>".format(q.status("Job1")))
    print("Job2 <{}>".format(q.status("Job2")))
    yield gen.sleep(1)
    print("Job1 <{}>".format(q.status("Job1")))
    print("Job2 <{}>".format(q.status("Job2")))
    yield gen.sleep(2)
    print("Job1 <{}>".format(q.status("Job1")))
    print("Job2 <{}>".format(q.status("Job2")))
    yield gen.sleep(2)
    print("Job1 <{}>".format(q.status("Job1")))
    print("Job2 <{}>".format(q.status("Job2")))


if __name__ == "__main__":
    ioloop.IOLoop.current().run_sync(run)

Ausführungsergebnis

Put: Job1
Put: Job2
Job1 <Queued>
Job2 <Queued>
Start: Job1
Job1 <Running>
Job2 <Queued>
Job1 is done
Start: Job2
Job1 <Ready>
Job2 <Running>
Job2 is done
Job1 <Ready>
Job2 <Ready>

(Hinzugefügt am 2015.11.7) Wenn Sie mit einer großen Datenmenge arbeiten, wird die Speichernutzung zu einem Problem. Daher müssen Maßnahmen ergriffen werden, z. B. die Länge der Warteschlange begrenzen und den Produzenten warten lassen.

Recommended Posts

Beispiel für die Implementierung einer Jobwarteschlange mit Tornados Collout
Asynchrone Verarbeitung mit Linebot in der Jobwarteschlange
Beispiel für die Verwendung von Lambda
Abnormalitätserkennung durch Auto-Encoder mit Keras [Implementierungsbeispiel für Anfänger]