This is a memo when I made a job queue with Tornado's coroutine.
Queue jobs (functions and arguments) with the WorkerQueue put method. As soon as the currently running job is finished, the jobs in the queue will be executed one by one, starting with the oldest one.
I am using the Queue implemented in version 4.2 of Tornado. Tornado queues are similar to the Python standard library synchronous queue (queue.Queue), except that put and get return tornado.concurrent.Future.
filename
from concurrent.futures import ProcessPoolExecutor
import time
from tornado import ioloop, gen, process
from tornado.queues import Queue
class WorkerQueue(object):
def __init__(self):
self.queue = Queue()
self.current_worker_id = None
self.current_worker = None
self.queued_ids = []
self._dispatcher()
def put(self, id_, func, args):
worker = Worker(func, args)
self.queued_ids.append(id_)
self.queue.put_nowait((id_, worker))
print("Put: {}".format(id_))
def status(self, id_):
if id_ in self.queued_ids:
return "Queued"
elif id_ == self.current_worker_id:
return "Running"
else:
return "Ready"
@gen.coroutine
def _dispatcher(self):
while 1:
id_, worker = yield self.queue.get()
self.queued_ids.remove(id_)
self.current_worker_id = id_
self.current_worker = worker
print("Start: {}".format(id_))
res = yield self.current_worker.execute()
self.current_worker_id = None
self.current_worker = None
print("{} is {}.".format(id_, res))
class Worker(object):
def __init__(self, func, args):
self.func = func
self.args = args
@gen.coroutine
def execute(self):
with ProcessPoolExecutor(process.cpu_count()) as exec_:
res = yield exec_.submit(self.func, *self.args)
return res
def job(sec):
time.sleep(sec)
return "done"
@gen.coroutine
def run():
q = WorkerQueue()
q.put("Job1", job, (2,))
q.put("Job2", job, (2,))
print("Job1 <{}>".format(q.status("Job1")))
print("Job2 <{}>".format(q.status("Job2")))
yield gen.sleep(1)
print("Job1 <{}>".format(q.status("Job1")))
print("Job2 <{}>".format(q.status("Job2")))
yield gen.sleep(2)
print("Job1 <{}>".format(q.status("Job1")))
print("Job2 <{}>".format(q.status("Job2")))
yield gen.sleep(2)
print("Job1 <{}>".format(q.status("Job1")))
print("Job2 <{}>".format(q.status("Job2")))
if __name__ == "__main__":
ioloop.IOLoop.current().run_sync(run)
Execution result
Put: Job1
Put: Job2
Job1 <Queued>
Job2 <Queued>
Start: Job1
Job1 <Running>
Job2 <Queued>
Job1 is done
Start: Job2
Job1 <Ready>
Job2 <Running>
Job2 is done
Job1 <Ready>
Job2 <Ready>
(Added on 2015.11.7) When dealing with a large amount of data, memory usage becomes a problem, so it is necessary to take measures such as limiting the queue length and making the producer wait.