Échantillon utilisant une combinaison de multitraitement et de SQLAlchemy, qui sont des bibliothèques multi-processus pratiques de Python. Les données sont échangées entre les processus dans la file d'attente et la base de données est accessible à l'aide de la session SQLALchemy.
from multiprocessing import (Process, Queue)
from sqlalchemy import (create_engine, MetaData)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import (sessionmaker, scoped_session)
engine = create_engine("mysql://{user}:{passwd}@{host}/{db}")
# _Session = scoped_session(sessionmaker())
Session = sessionmaker()
metadata = MetaData(engine)
Base = declarative_base()
# Base.query = _Session.query_property()
####################
#Définition de la table omise(Tableau des articles)
####################
class Hoge(object):
def __init__(self):
'''Cue pour alimenter en multi-processus et cue pour cracher'''
self.in_queue = Queue()
self.out_queue = Queue()
self.session = Session()
def __del__(self):
self.session.commit()
self.session.close()
def get_items(self, worker_num=4):
'''Obtenir des données de DB en multi-processus
Stockez l'identifiant que vous souhaitez obtenir dans la file d'attente, accédez à la base de données dans plusieurs processus et récupérez-le
'''
item_ids = self.session.query(Item.id).all()
for item_id in item_ids:
self.in_queue.put(item_id[0])
jobs = []
for i in xrange(worker_num):
p = Process(target=self.worker, args=[])
p.daemon = True
jobs.append(p)
p.start()
for job in jobs:
job.join()
#Message de fin de processus
print '{name}.exitcode = {code}'.format(name=job.name, code=job.exitcode)
return True
def worker(self):
'''Travailleur multi-processus
self.in_Prenez l'identifiant de la file d'attente, récupérez les données de la base de données et de vous-même.out_Stocker dans la file d'attente
'''
while not self.in_queue.empty():
id = self.in_queue.get()
print id,
try:
item = self.session.query(Item).filter(Item.id == id).first()
except Exception as e:
print 'error =>', e
continue
self.out_queue.put(item.name)
print 'in_queue: {0} (out_queue: {1})'\
.format(self.in_queue.qsize(), self.out_queue.qsize())
return True #Ça ne se terminera pas sans ça
Si vous créez une session avec scoped_session ici
Could not locate column in row for column …
Veuillez noter que vous pouvez obtenir une erreur telle que
Aussi,
Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed send(obj) IOError: [Errno 32] Broken pipe
J'ai eu une erreur en disant, mais je ne l'ai pas compris car elle n'a pas été reproduite ...
Recommended Posts