Beispiel mit Multiprocessing, einer praktischen Multiprozessbibliothek von Python in Kombination mit SQL Alchemy Der Datenaustausch zwischen Prozessen in der Warteschlange und der Zugriff auf die Datenbank erfolgt über eine SQLALchemy-Sitzung.
from multiprocessing import (Process, Queue)
from sqlalchemy import (create_engine, MetaData)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import (sessionmaker, scoped_session)
engine = create_engine("mysql://{user}:{passwd}@{host}/{db}")
# _Session = scoped_session(sessionmaker())
Session = sessionmaker()
metadata = MetaData(engine)
Base = declarative_base()
# Base.query = _Session.query_property()
####################
#Tabellendefinition weggelassen(Artikeltabelle)
####################
class Hoge(object):
def __init__(self):
'''Cue zum Einspeisen von Multiprozessen und Cue zum Ausspucken'''
self.in_queue = Queue()
self.out_queue = Queue()
self.session = Session()
def __del__(self):
self.session.commit()
self.session.close()
def get_items(self, worker_num=4):
'''Abrufen von Daten aus der Datenbank in mehreren Prozessen
Speichern Sie die ID, die Sie erhalten möchten, in der Warteschlange, greifen Sie in mehreren Prozessen auf die Datenbank zu und rufen Sie sie ab
'''
item_ids = self.session.query(Item.id).all()
for item_id in item_ids:
self.in_queue.put(item_id[0])
jobs = []
for i in xrange(worker_num):
p = Process(target=self.worker, args=[])
p.daemon = True
jobs.append(p)
p.start()
for job in jobs:
job.join()
#Endnachricht verarbeiten
print '{name}.exitcode = {code}'.format(name=job.name, code=job.exitcode)
return True
def worker(self):
'''Multi-Prozess-Arbeiter
self.in_Nehmen Sie die ID aus der Warteschlange, holen Sie sich die Daten aus der Datenbank und selbst.out_In der Warteschlange speichern
'''
while not self.in_queue.empty():
id = self.in_queue.get()
print id,
try:
item = self.session.query(Item).filter(Item.id == id).first()
except Exception as e:
print 'error =>', e
continue
self.out_queue.put(item.name)
print 'in_queue: {0} (out_queue: {1})'\
.format(self.in_queue.qsize(), self.out_queue.qsize())
return True #Ohne das wird es nicht enden
Wenn Sie hier eine Sitzung mit scoped_session erstellen
Could not locate column in row for column …
Bitte beachten Sie, dass möglicherweise ein Fehler wie z
Ebenfalls,
Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed send(obj) IOError: [Errno 32] Broken pipe
Ich habe eine Fehlermeldung erhalten, aber ich habe es nicht verstanden, weil es nicht reproduziert wurde ...
Recommended Posts