Womit schlagen Sie MySQL von Python aus? SQLAlchemy, Django, [peewee](http://docs.peewee-orm.com/en/latest/ Ich denke, es gibt viele Leute, die herum benutzen). Ich habe es oft benutzt und bin zu SQL Alchemy gekommen, aber wenn es um zig Millionen oder Hunderte Millionen Datenbestellungen geht, ist es langsam und sehr ärgerlich.
Dieses Mal (abgesehen von der Geschichte, dass SQL nicht für große Datenmengen oder Python für die Hochgeschwindigkeitsverarbeitung verwendet wird) werde ich ein Memorandum hinterlassen, das in mehrere Punkte unterteilt ist, wie die Datenverarbeitung mit SQL Alchemy beschleunigt werden kann. Stellen.
SQLAlchemy wird als ~ geschrieben, aber es ist ein Python-Beschleunigungs-TIPPS-Durcheinander, wenn Daten mit SQLAlchemy in die Datenbank eingefügt und ausgewählt werden. Eigentlich ist es besser, die Artikel zu trennen, aber ich bin froh, dass sie organisiert sind, wenn ich auf mich selbst zurückblicke, also habe ich das getan.
Der gesamte Code, der dieses Mal verwendet wird, befindet sich in github. Schauen Sie also auch dort nach.
Die in diesem Test verwendete DB-Struktur ist wie folgt.
user table
id | name | age | team_id | created_at | updated_at |
---|---|---|---|---|---|
1 | John1 | 12 | 4 | 1486030539 | 1486030539 |
2 | Kevin2 | 54 | 12 | 1486030539 | 1486030539 |
... |
team table
id | name | created_at | updated_at |
---|---|---|---|
1 | A | 1486030539 | 1486030539 |
2 | B | 1486030539 | 1486030539 |
... |
Benutzer hat einen externen Schlüssel für das Team.
Registrieren Sie zunächst die Daten in den Team- und Benutzertabellen.
team_list = ['A', 'B',...,'Z']
user_list = [('John', 14, 'C'), ...]
Beginnen Sie dort, wo Sie die Daten haben. Die Anzahl der Teams beträgt 26 von A bis Z und die Anzahl der Benutzer beträgt 100.000.
Wie erwartet werde ich dies nicht von Anfang an tun, sondern zum Vergleich von hier aus.
class Base(object):
def __iter__(self):
return iter(self.__dict__.items())
def dict(self):
return self.__dict__
@classmethod
def query(cls):
if not hasattr(cls, "_query"):
cls._query = database.session().query_property()
return cls._query
class User(Base):
def __repr__(self):
return '<User %r>' % (self.id)
def __init__(self, name, age, team):
self.name = name
self.age = age
self.team = team
self.updated_at = time.time()
self.created_at = time.time()
@staticmethod
def create_dict(name, age, team_id):
return {'name': name, 'age': age, 'team_id': team_id,
'updated_at': time.time(), 'created_at': time.time()}
signup_user = Table('user', metadata,
Column('id', BigInteger, nullable=False,
primary_key=True, autoincrement=True),
Column('name', Unicode(255), nullable=False),
Column('age', Integer, nullable=False),
Column('team_id', ForeignKey('team.id'), nullable=False),
Column('updated_at', BigInteger, nullable=False),
Column('created_at', BigInteger, nullable=False))
mapper(User, signup_user,
properties={
'id': signup_user.c.id,
'name': signup_user.c.name,
'age': signup_user.c.age,
'team': relationship(Team),
'updated_at': signup_user.c.updated_at,
'created_at': signup_user.c.created_at
})
User.__table__ = signup_user
Erstellen Sie ein solches Tabellenobjekt. Die Vorbereitung für die DB-seitige Sitzung ist wie folgt
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import scoped_session, sessionmaker
metadata = MetaData()
engine = create_engine(uri, encoding='utf-8', pool_recycle=3600)
session = scoped_session(sessionmaker(autocommit=False,
autoflush=True,
expire_on_commit=False,
bind=_engine))
metadata.create_all(bind=_engine)
Der Teil zur Registrierung des Teams entfällt.
def insert_user(name, age, team):
u = User(name, age, team)
session.add(u)
session.commit()
teams = Team.query().all()
# team_dict = {'A': <Team1>, 'B': <Team2>, ...}
team_dict = {t.name: t for t in teams}
[insert_user(d[0], d[1], team_dict[d[2]]) for d in data_list]
Erstellen Sie Benutzer nacheinander wie folgt und fügen Sie sie hinzu und legen Sie sie fest.
Offensichtlich ist 0. ineffizient, verwenden Sie also "add_all", mit dem Sie mehrere Datensätze gleichzeitig hinzufügen können.
users = [User(d[0], d[1], team_dict[d[2]]) for d in data_list]
database.session().add_all(users)
database.session().commit()
Als Code war es viel sauberer.
Es gibt Bulk_Save_Objects im ORM von SQLAlchemy.
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class UserTable(Base):
__tablename__ = "user"
id = Column(BigInteger, nullable=False,
primary_key=True, autoincrement=True)
name = Column(Unicode(255), nullable=False)
age = Column(Integer, nullable=False)
team_id = Column(BigInteger, nullable=False)
updated_at = Column(BigInteger, nullable=False)
created_at = Column(BigInteger, nullable=False)
Erstellen Sie ein Tabellenobjekt wie
session.bulk_save_objects(
[UserTable(name=d[0],
age=d[1],
team_id=team_dict[d[2]].id,
updated_at = time.time(),
created_at = time.time())
for d in data_list], return_defaults=True)
session.commit()
Fügen Sie es so ein. Sie sehen, dass sich der Umgang mit externen Schlüsseln usw. geändert hat.
ORM ist einfach zu verwenden, der Code kann gekürzt werden und bietet verschiedene Vorteile wie Feinsteuerung wie Rollback auf der Rückseite. Der Aufwand für die Generierung von Abfragen ist jedoch groß und wird zu einem Engpass, wenn eine Beschleunigung in Betracht gezogen wird. Die Verwendung von sqlalchemy.core kann problematisch sein, Abfragen jedoch schneller als die Verwendung von ORM. ..
users = [{'name':d[0], 'age': d[1], 'team_id': team_dict[d[2]]['id'],
'updated_at': time.time(), 'created_at': time.time()} for d in data_list]
session.execute(User.__table__.insert(), users)
session.commit()
Vergleichen wir die Geschwindigkeiten von 0 bis 3 miteinander.
SqlAlchemy ORM: elapsed time of insertion: 62.205 [sec]
SqlAlchemy ORM multi insert: elapsed time of insertion: 1.421 [sec]
SqlAlchemy ORM bulk insert: elapsed time of insertion: 1.170 [sec]
SqlAlchemy core bulk insert: elapsed time of insertion: 0.261 [sec]
sqlalchemy.core ... Überwältigend ... !! Es ist 5 oder 6 mal schneller als Bulk Insert.
Selbst wenn Sie ORM verwenden, scheint es gut zu sein, Bulk Insert zu verwenden.
Zum Zeitpunkt des Masseneinfügens hatte ich das Gefühl, irgendwo gehört zu haben, dass das Teilen beim Einfügen einer bestimmten Menge großer Datenmengen schneller ist, und habe daher tatsächlich versucht, sqlalchemy.core zu verwenden (1 Million Fälle). ..
SqlAlchemy core bulk insert (10): elapsed time of insertion: 51.066 [sec]
SqlAlchemy core bulk insert (20): elapsed time of insertion: 37.913 [sec]
SqlAlchemy core bulk insert (50): elapsed time of insertion: 27.323 [sec]
SqlAlchemy core bulk insert (100): elapsed time of insertion: 23.954 [sec]
SqlAlchemy core bulk insert (150): elapsed time of insertion: 22.607 [sec]
SqlAlchemy core bulk insert (200): elapsed time of insertion: 21.853 [sec]
SqlAlchemy core bulk insert (500): elapsed time of insertion: 20.139 [sec]
SqlAlchemy core bulk insert (750): elapsed time of insertion: 19.399 [sec]
SqlAlchemy core bulk insert (1000): elapsed time of insertion: 19.362 [sec]
SqlAlchemy core bulk insert (5000): elapsed time of insertion: 19.493 [sec]
SqlAlchemy core bulk insert (10000): elapsed time of insertion: 19.387 [sec]
SqlAlchemy core bulk insert (20000): elapsed time of insertion: 18.983 [sec]
SqlAlchemy core bulk insert (50000): elapsed time of insertion: 19.641 [sec]
SqlAlchemy core bulk insert (100000): elapsed time of insertion: 19.022 [sec]
SqlAlchemy core bulk insert (500000): elapsed time of insertion: 19.837 [sec]
Nun, es scheint, dass es an meinem Verstand lag ...
Verwenden Sie die zuvor registrierten Team- und Benutzerdaten. Die Anzahl der Teams beträgt 26 von A bis Z und die Anzahl der Benutzer 1 Million.
Zunächst einfach in der Reihenfolge des Alters: "[{'id': 10, 'name': 'John', 'age': 34, 'team': 'K'}, {...}, ...]" Erstellen Sie einen Prozess, der eine Limit-Wörterbuchliste (diesmal 100) wie zurückgibt.
Lassen Sie uns natürlich zuerst die MySQL-Seite optimieren. In diesem Beispiel wird die Verarbeitung durch einfaches Einfügen des Index in user.age etwa zehnmal schneller. Da es bereits verschiedene Artikel zum Thema Tuning gibt, werde ich diesmal darauf verzichten.
users = User.query().order_by(desc(User.age)).limit(limit).all()
result = [{'id': u.id, 'name': u.name, 'age': u.age, 'team': u.team.name}
for u in users]
kurz! Das ist gut. Ich bin dankbar, dass der Code besonders kurz ist, wenn er sich über Tabellen erstreckt, die über externe Schlüssel verbunden sind.
Verwenden Sie die Auswahlfunktion von sqlalchemy.
from sqlalchemy import select, desc, and_, func
u = User.__table__.c
t = Team.__table__.c
sel = select([u.id, u.name, u.age, t.name])\
.select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
.order_by(desc(u.age)).limit(limit)
result = [{'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}
for r in session.execute(sel)]
Es ist ziemlich lang geworden.
In diesem Beispiel steht dies nicht in direktem Zusammenhang mit der SQL-Verarbeitung. Wenn sich jedoch die Anzahl der Daten erhöht, verbessert sich die Verarbeitungsgeschwindigkeit erheblich, wenn eine parallele Verarbeitung durchgeführt wird. Informationen zur Verwendung finden Sie in Ich habe bereits einen Artikel geschrieben. Schauen Sie also bitte dort nach.
from multiprocessing import Pool
import multiprocessing as multi
def get_user(r):
return {'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}
def select_user_multi():
u = User.__table__.c
t = Team.__table__.c
sel = select([u.id, u.name, u.age, t.name])\
.select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
.order_by(desc(u.age)).limit(limit)
p = Pool(multi.cpu_count())
result = p.map(get_user, session.execute(sel))
p.close()
return result
sqlAlchemy ORM: elapsed time: 0.3291 [sec]
sqlAlchemy core: elapsed time: 0.5837 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0096 [sec]
Das ... es ist spät. Dies kann daran liegen, dass die Abfrage einfach war. Das Ergebnis der beißenden Mehrfachverarbeitung ist viel schneller.
Als nächstes zählen wir die Anzahl der Benutzer für jedes Team. Komplizieren Sie die Abfrage ein wenig und zählen Sie die Anzahl der Benutzer unter 50 Jahren, die zu jedem Team gehören, und erstellen Sie Daten wie "{" A ": 1400," B ": 2122, ....}" Verarbeitung durchführen.
Weitere Informationen zur Beschleunigung des Zählvorgangs finden Sie hier (Hinweise zur Behandlung von COUNT () in InnoDB). Werden
ORM
def select_teams_orm():
return Team.query().all()
teams = select_teams_orm()
counts = {tm.name: User.query()\
.filter(and_(User.team == tm, User.age < 50)).count()\
for tm in teams}
So kurz wie immer!
sqlalchemy.core
def select_teams_core():
t = Team.__table__.c
sel = select([t.id, t.name]).select_from(Team.__table__)
res = session.execute(sel)
result = [{'id': r[0], 'name': r[1]} for r in res]
return result
teams = select_teams_core()
sess = lambda sel: session.execute(sel)
u = User.__table__.c
counts = {tm['name']: sess(
select([func.count()]).select_from(User.__table__)\
.where(and_(u.team_id == tm['id'], u.age < 50))\
).scalar() for tm in teams}
Schwer zu verstehen! Es ist wie oben geschrieben, um mit einer Schleife zu enden, aber wenn es zerlegt ist
def create_query(team_id): #Abfrageerstellung Eingeschränkt nach Team-ID und Benutzeralter
u = User.__table__.c
return select([func.count()]).select_from(User.__table__)\
.where(add_(u.team_id == team_id, u.age < 50))
queries = [create_query(tm['id']) for tm in teams] #Erstellen Sie eine Abfrage für jedes Team
counts = [session.execute(q) for q in queries] #Abfrage ausgeben
result = [{tm['name']: c.scalar()} for tm,c in zip(teams,counts)] # {'A': count, ...}Erstellen Sie ein Wörterbuch
Es ist wie es ist. Auch die Menge an Code hat erheblich zugenommen.
multiprocessing
Als nächstes parallelisieren wir von der Stelle, an der die SELECT-Abfrage ausgelöst wird.
Es sind zwei Punkte zu beachten: ** Joblibs Parallelisierung kann nicht verwendet werden, und die mit scoped_session
erstellte Sitzung kann nicht für die Parallelverarbeitung verwendet werden **.
session = sessionmaker(autocommit=False, # scoped_Sitzung ist nicht gut
autoflush=True,
expire_on_commit=False,
bind=_engine)
def count_user(team):
u = User.__table__.c
sel = select([func.count()]).select_from(User.__table__)\
.where(and_(u.team_id == team['id'], u.age < 50))
result = session.execute(sel).scalar()
return result
def count_user_multi():
teams = select_teams_core()
p = Pool(multi.cpu_count())
counts = p.map(count_user, teams)
counts = {t['name']: c for t, c in zip(teams, counts)}
p.close()
session.close()
return counts
Dieses Mal stelle ich eine Abfrage, die für jedes Team zählt, aber an erster Stelle
SELECT DISTINCT(team.id), team.name, COUNT(*)
FROM user JOIN team ON team.id = user.team_id
WHERE user.age < 50 GROUP BY team.id;
In diesem Fall müssen Sie die Abfrage nur einmal auslösen. Ändern Sie sie daher.
u = User.__table__.c
t = User.__table__.c
sel = select([func.distinct(t.id), t.name, func.count()])\
.select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
.where(u.age < 50).group_by(t.id)
counts = {r[1]: r[2] for r in database.session().execute(sel)}
sqlAlchemy ORM: elapsed time: 0.9522 [sec]
sqlAlchemy core: elapsed time: 0.7772 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0733 [sec]
--
sqlAlchemy core with fixed query: elapsed time: 0.2207 [sec]
Diesmal ist die Verwendung von Core etwas schneller als bei ORM. Es scheint, dass die Parallelisierung etwa zehnmal schneller ist. Als die Abfrage so verbessert wurde, dass sie einmal ausgelöst werden konnte, war sie etwas mehr als dreimal schneller als die ursprüngliche Abfrage, aber nicht weit von der parallelisierten Abfrage entfernt.
Es war früher langsamer, Core zu verwenden, aber für jedes Team werden 100 Personen in absteigender Reihenfolge des Alters erfasst und die folgenden Daten werden zurückgegeben.
[{'id': 1, 'name': 'A', 'users': [{'id': 400, 'name': 'Kevin', 'age': 32}, {...}, ...]},
{'id': 2, 'name': 'B', 'users': [...]},
...]
Ich werde den Code weglassen (siehe github für Details), aber das Ergebnis ist wie folgt.
sqlAlchemy ORM: elapsed time: 0.9782 [sec]
sqlAlchemy core: elapsed time: 0.8864 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0807 [sec]
In diesem Beispiel beträgt der Unterschied zwischen ORM und Kern etwa 0,1 Sekunden, sodass Sie sich möglicherweise keine Sorgen machen müssen. Da es sich jedoch um etwa 1 Million handelt, wird die Abfrage komplizierter und die Anzahl der Fälle nimmt zu. In diesem Fall erscheint es sinnvoll, sqlalchemy.core zu verwenden und zu parallelisieren.
Wie oben erwähnt, habe ich es als INSERT und SELECT gesehen, aber zusammenfassend:
--Erstens, SQL, Abfrageoptimierung --Insert erhöht die Codemenge nicht so stark und die Geschwindigkeit ist fünf- oder sechsmal höher. Daher scheint es besser, Core zu verwenden.
war. Die Güte von ORM wird fast zunichte gemacht, und es ist die Rede davon, dass Sie keine andere Sprache mehr verwenden sollten, aber wenn Sie Python verwenden müssen, scheint es gut, es entsprechend der Datengröße und dem Inhalt der Anwendung richtig zu verwenden.
Recommended Posts