A sample that uses a combination of Python's convenient multiprocess library, multiprocessing, and SQLAlchemy. Data is exchanged between processes in Queue, and DB is accessed using SQLALchemy session.
from multiprocessing import (Process, Queue)
from sqlalchemy import (create_engine, MetaData)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import (sessionmaker, scoped_session)
engine = create_engine("mysql://{user}:{passwd}@{host}/{db}")
# _Session = scoped_session(sessionmaker())
Session = sessionmaker()
metadata = MetaData(engine)
Base = declarative_base()
# Base.query = _Session.query_property()
####################
#Table definition omitted(Item table)
####################
class Hoge(object):
def __init__(self):
'''A cue to be eaten by a multi-process and a cue to be spit out'''
self.in_queue = Queue()
self.out_queue = Queue()
self.session = Session()
def __del__(self):
self.session.commit()
self.session.close()
def get_items(self, worker_num=4):
'''Get data from DB in multi-process
Store the id you want to get in the queue, access the DB in multiple processes, and retrieve it.
'''
item_ids = self.session.query(Item.id).all()
for item_id in item_ids:
self.in_queue.put(item_id[0])
jobs = []
for i in xrange(worker_num):
p = Process(target=self.worker, args=[])
p.daemon = True
jobs.append(p)
p.start()
for job in jobs:
job.join()
#Process end message
print '{name}.exitcode = {code}'.format(name=job.name, code=job.exitcode)
return True
def worker(self):
'''Multi-process worker
self.in_Get the id from the queue, get the data from the DB and self.out_Store in queue
'''
while not self.in_queue.empty():
id = self.in_queue.get()
print id,
try:
item = self.session.query(Item).filter(Item.id == id).first()
except Exception as e:
print 'error =>', e
continue
self.out_queue.put(item.name)
print 'in_queue: {0} (out_queue: {1})'\
.format(self.in_queue.qsize(), self.out_queue.qsize())
return True #It won't end without this
If you create a Session with scoped_session here
Could not locate column in row for column …
Please note that you may get an error such as
Also,
Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed send(obj) IOError: [Errno 32] Broken pipe
I got an error saying, but I didn't understand it because it wasn't reproduced ...
Recommended Posts