[Python] Geben Sie Ihr Bestes, um SQL Alchemy zu beschleunigen

Einführung

Womit schlagen Sie MySQL von Python aus? SQLAlchemy, Django, [peewee](http://docs.peewee-orm.com/en/latest/ Ich denke, es gibt viele Leute, die herum benutzen). Ich habe es oft benutzt und bin zu SQL Alchemy gekommen, aber wenn es um zig Millionen oder Hunderte Millionen Datenbestellungen geht, ist es langsam und sehr ärgerlich.

Dieses Mal (abgesehen von der Geschichte, dass SQL nicht für große Datenmengen oder Python für die Hochgeschwindigkeitsverarbeitung verwendet wird) werde ich ein Memorandum hinterlassen, das in mehrere Punkte unterteilt ist, wie die Datenverarbeitung mit SQL Alchemy beschleunigt werden kann. Stellen.

SQLAlchemy wird als ~ geschrieben, aber es ist ein Python-Beschleunigungs-TIPPS-Durcheinander, wenn Daten mit SQLAlchemy in die Datenbank eingefügt und ausgewählt werden. Eigentlich ist es besser, die Artikel zu trennen, aber ich bin froh, dass sie organisiert sind, wenn ich auf mich selbst zurückblicke, also habe ich das getan.

Der gesamte Code, der dieses Mal verwendet wird, befindet sich in github. Schauen Sie also auch dort nach.

Ausführungsumgebung

DB-Struktur

Die in diesem Test verwendete DB-Struktur ist wie folgt.

user table

id name age team_id created_at updated_at
1 John1 12 4 1486030539 1486030539
2 Kevin2 54 12 1486030539 1486030539
...

team table

id name created_at updated_at
1 A 1486030539 1486030539
2 B 1486030539 1486030539
...

Benutzer hat einen externen Schlüssel für das Team.

EINFÜGEN

Registrieren Sie zunächst die Daten in den Team- und Benutzertabellen.

team_list = ['A', 'B',...,'Z']
user_list = [('John', 14, 'C'), ...]

Beginnen Sie dort, wo Sie die Daten haben. Die Anzahl der Teams beträgt 26 von A bis Z und die Anzahl der Benutzer beträgt 100.000.

0. Erstellen Sie nacheinander SQLAlchemy Table-Objekte und fügen Sie sie nacheinander ein

Wie erwartet werde ich dies nicht von Anfang an tun, sondern zum Vergleich von hier aus.

class Base(object):

    def __iter__(self):
        return iter(self.__dict__.items())

    def dict(self):
        return self.__dict__

    @classmethod
    def query(cls):
        if not hasattr(cls, "_query"):
            cls._query = database.session().query_property()
        return cls._query
class User(Base):

    def __repr__(self):
        return '<User %r>' % (self.id)

    def __init__(self, name, age, team):
        self.name = name
        self.age = age
        self.team = team
        self.updated_at = time.time()
        self.created_at = time.time()

    @staticmethod
    def create_dict(name, age, team_id):
        return {'name': name, 'age': age, 'team_id': team_id,
                'updated_at': time.time(), 'created_at': time.time()}


signup_user = Table('user', metadata,
                    Column('id', BigInteger, nullable=False,
                           primary_key=True, autoincrement=True),
                    Column('name', Unicode(255), nullable=False),
                    Column('age', Integer, nullable=False),
                    Column('team_id', ForeignKey('team.id'), nullable=False),
                    Column('updated_at', BigInteger, nullable=False),
                    Column('created_at', BigInteger, nullable=False))

mapper(User, signup_user,
       properties={
           'id': signup_user.c.id,
           'name': signup_user.c.name,
           'age': signup_user.c.age,
           'team': relationship(Team),
           'updated_at': signup_user.c.updated_at,
           'created_at': signup_user.c.created_at
       })

User.__table__ = signup_user

Erstellen Sie ein solches Tabellenobjekt. Die Vorbereitung für die DB-seitige Sitzung ist wie folgt

from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import scoped_session, sessionmaker

metadata = MetaData()
engine = create_engine(uri, encoding='utf-8', pool_recycle=3600)
session = scoped_session(sessionmaker(autocommit=False,
                                               autoflush=True,
                                               expire_on_commit=False,
                                               bind=_engine))

metadata.create_all(bind=_engine)

Der Teil zur Registrierung des Teams entfällt.

def insert_user(name, age, team):
    u = User(name, age, team)
    session.add(u)
    session.commit()

teams = Team.query().all()
# team_dict = {'A': <Team1>, 'B': <Team2>, ...}
team_dict = {t.name: t for t in teams}
[insert_user(d[0], d[1], team_dict[d[2]]) for d in data_list]

Erstellen Sie Benutzer nacheinander wie folgt und fügen Sie sie hinzu und legen Sie sie fest.

1. Fügen Sie mehrere Elemente zusammen.

Offensichtlich ist 0. ineffizient, verwenden Sie also "add_all", mit dem Sie mehrere Datensätze gleichzeitig hinzufügen können.

users = [User(d[0], d[1], team_dict[d[2]]) for d in data_list]
database.session().add_all(users)
database.session().commit()

Als Code war es viel sauberer.

  1. bulk insert

Es gibt Bulk_Save_Objects im ORM von SQLAlchemy.

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class UserTable(Base):
    __tablename__ = "user"
    id = Column(BigInteger,  nullable=False,
                primary_key=True, autoincrement=True)
    name = Column(Unicode(255), nullable=False)
    age = Column(Integer, nullable=False)
    team_id = Column(BigInteger, nullable=False)
    updated_at = Column(BigInteger, nullable=False)
    created_at = Column(BigInteger, nullable=False)

Erstellen Sie ein Tabellenobjekt wie

session.bulk_save_objects(
    [UserTable(name=d[0],
               age=d[1],
               team_id=team_dict[d[2]].id,
               updated_at = time.time(),
               created_at = time.time())
     for d in data_list], return_defaults=True)
session.commit()

Fügen Sie es so ein. Sie sehen, dass sich der Umgang mit externen Schlüsseln usw. geändert hat.

3. Verwenden Sie sqlalchemy.core

ORM ist einfach zu verwenden, der Code kann gekürzt werden und bietet verschiedene Vorteile wie Feinsteuerung wie Rollback auf der Rückseite. Der Aufwand für die Generierung von Abfragen ist jedoch groß und wird zu einem Engpass, wenn eine Beschleunigung in Betracht gezogen wird. Die Verwendung von sqlalchemy.core kann problematisch sein, Abfragen jedoch schneller als die Verwendung von ORM. ..

users = [{'name':d[0], 'age': d[1], 'team_id': team_dict[d[2]]['id'],
          'updated_at': time.time(), 'created_at': time.time()} for d in data_list]
session.execute(User.__table__.insert(), users)
session.commit()

Vergleich

Vergleichen wir die Geschwindigkeiten von 0 bis 3 miteinander.

SqlAlchemy ORM: elapsed time of insertion: 62.205 [sec]
SqlAlchemy ORM multi insert: elapsed time of insertion: 1.421 [sec]
SqlAlchemy ORM bulk insert: elapsed time of insertion: 1.170 [sec]
SqlAlchemy core bulk insert: elapsed time of insertion: 0.261 [sec]

sqlalchemy.core ... Überwältigend ... !! Es ist 5 oder 6 mal schneller als Bulk Insert.

Selbst wenn Sie ORM verwenden, scheint es gut zu sein, Bulk Insert zu verwenden.

Bonus. Keine Notwendigkeit, Bulk Insert zu teilen?

Zum Zeitpunkt des Masseneinfügens hatte ich das Gefühl, irgendwo gehört zu haben, dass das Teilen beim Einfügen einer bestimmten Menge großer Datenmengen schneller ist, und habe daher tatsächlich versucht, sqlalchemy.core zu verwenden (1 Million Fälle). ..

SqlAlchemy core bulk insert (10): elapsed time of insertion: 51.066 [sec]
SqlAlchemy core bulk insert (20): elapsed time of insertion: 37.913 [sec]
SqlAlchemy core bulk insert (50): elapsed time of insertion: 27.323 [sec]
SqlAlchemy core bulk insert (100): elapsed time of insertion: 23.954 [sec]
SqlAlchemy core bulk insert (150): elapsed time of insertion: 22.607 [sec]
SqlAlchemy core bulk insert (200): elapsed time of insertion: 21.853 [sec]
SqlAlchemy core bulk insert (500): elapsed time of insertion: 20.139 [sec]
SqlAlchemy core bulk insert (750): elapsed time of insertion: 19.399 [sec]
SqlAlchemy core bulk insert (1000): elapsed time of insertion: 19.362 [sec]
SqlAlchemy core bulk insert (5000): elapsed time of insertion: 19.493 [sec]
SqlAlchemy core bulk insert (10000): elapsed time of insertion: 19.387 [sec]
SqlAlchemy core bulk insert (20000): elapsed time of insertion: 18.983 [sec]
SqlAlchemy core bulk insert (50000): elapsed time of insertion: 19.641 [sec]
SqlAlchemy core bulk insert (100000): elapsed time of insertion: 19.022 [sec]
SqlAlchemy core bulk insert (500000): elapsed time of insertion: 19.837 [sec]

Nun, es scheint, dass es an meinem Verstand lag ...

SELECT Edition

Verwenden Sie die zuvor registrierten Team- und Benutzerdaten. Die Anzahl der Teams beträgt 26 von A bis Z und die Anzahl der Benutzer 1 Million.

Zunächst einfach in der Reihenfolge des Alters: "[{'id': 10, 'name': 'John', 'age': 34, 'team': 'K'}, {...}, ...]" Erstellen Sie einen Prozess, der eine Limit-Wörterbuchliste (diesmal 100) wie zurückgibt.

0. Tuning auf der MySQL-Seite (Index etc.)

Lassen Sie uns natürlich zuerst die MySQL-Seite optimieren. In diesem Beispiel wird die Verarbeitung durch einfaches Einfügen des Index in user.age etwa zehnmal schneller. Da es bereits verschiedene Artikel zum Thema Tuning gibt, werde ich diesmal darauf verzichten.

1. Verwenden Sie ORM

users = User.query().order_by(desc(User.age)).limit(limit).all()
result = [{'id': u.id, 'name': u.name, 'age': u.age, 'team': u.team.name}
          for u in users]

kurz! Das ist gut. Ich bin dankbar, dass der Code besonders kurz ist, wenn er sich über Tabellen erstreckt, die über externe Schlüssel verbunden sind.

2. Verwenden Sie sqlalchemy.core

Verwenden Sie die Auswahlfunktion von sqlalchemy.

from sqlalchemy import select, desc, and_, func

u = User.__table__.c
t = Team.__table__.c
sel = select([u.id, u.name, u.age, t.name])\
       .select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
       .order_by(desc(u.age)).limit(limit)
result = [{'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}
          for r in session.execute(sel)]

Es ist ziemlich lang geworden.

3. Kombiniert mit Multiprocessing

In diesem Beispiel steht dies nicht in direktem Zusammenhang mit der SQL-Verarbeitung. Wenn sich jedoch die Anzahl der Daten erhöht, verbessert sich die Verarbeitungsgeschwindigkeit erheblich, wenn eine parallele Verarbeitung durchgeführt wird. Informationen zur Verwendung finden Sie in Ich habe bereits einen Artikel geschrieben. Schauen Sie also bitte dort nach.

from multiprocessing import Pool
import multiprocessing as multi


def get_user(r):
    return {'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}


def select_user_multi():
    u = User.__table__.c
    t = Team.__table__.c
    sel = select([u.id, u.name, u.age, t.name])\
          .select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
          .order_by(desc(u.age)).limit(limit)
    p = Pool(multi.cpu_count())
    result = p.map(get_user, session.execute(sel))
    p.close()
    return result

Vergleich

sqlAlchemy ORM: elapsed time: 0.3291 [sec]
sqlAlchemy core: elapsed time: 0.5837 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0096 [sec]

Das ... es ist spät. Dies kann daran liegen, dass die Abfrage einfach war. Das Ergebnis der beißenden Mehrfachverarbeitung ist viel schneller.

Bonus 1. zählen

Als nächstes zählen wir die Anzahl der Benutzer für jedes Team. Komplizieren Sie die Abfrage ein wenig und zählen Sie die Anzahl der Benutzer unter 50 Jahren, die zu jedem Team gehören, und erstellen Sie Daten wie "{" A ": 1400," B ": 2122, ....}" Verarbeitung durchführen.

Weitere Informationen zur Beschleunigung des Zählvorgangs finden Sie hier (Hinweise zur Behandlung von COUNT () in InnoDB). Werden

ORM

def select_teams_orm():
    return Team.query().all()

teams = select_teams_orm()
counts = {tm.name: User.query()\
              .filter(and_(User.team == tm, User.age < 50)).count()\
              for tm in teams}

So kurz wie immer!

sqlalchemy.core

def select_teams_core():
    t = Team.__table__.c
    sel = select([t.id, t.name]).select_from(Team.__table__)
    res = session.execute(sel)
    result = [{'id': r[0], 'name': r[1]} for r in res]
    return result

teams = select_teams_core()
sess = lambda sel: session.execute(sel)
u = User.__table__.c
counts = {tm['name']: sess(
        select([func.count()]).select_from(User.__table__)\
        .where(and_(u.team_id == tm['id'], u.age < 50))\
).scalar() for tm in teams}

Schwer zu verstehen! Es ist wie oben geschrieben, um mit einer Schleife zu enden, aber wenn es zerlegt ist

def create_query(team_id): #Abfrageerstellung Eingeschränkt nach Team-ID und Benutzeralter
    u = User.__table__.c
    return select([func.count()]).select_from(User.__table__)\
             .where(add_(u.team_id == team_id, u.age < 50))

queries = [create_query(tm['id']) for tm in teams] #Erstellen Sie eine Abfrage für jedes Team
counts = [session.execute(q) for q in queries] #Abfrage ausgeben
result = [{tm['name']: c.scalar()} for tm,c in zip(teams,counts)] # {'A': count, ...}Erstellen Sie ein Wörterbuch

Es ist wie es ist. Auch die Menge an Code hat erheblich zugenommen.

multiprocessing

Als nächstes parallelisieren wir von der Stelle, an der die SELECT-Abfrage ausgelöst wird. Es sind zwei Punkte zu beachten: ** Joblibs Parallelisierung kann nicht verwendet werden, und die mit scoped_session erstellte Sitzung kann nicht für die Parallelverarbeitung verwendet werden **.

session = sessionmaker(autocommit=False, # scoped_Sitzung ist nicht gut
                       autoflush=True,
                       expire_on_commit=False,
                       bind=_engine)

def count_user(team):
    u = User.__table__.c
    sel = select([func.count()]).select_from(User.__table__)\
          .where(and_(u.team_id == team['id'], u.age < 50))
    result = session.execute(sel).scalar()
    return result


def count_user_multi():
    teams = select_teams_core()
    p = Pool(multi.cpu_count())
    counts = p.map(count_user, teams)
    counts = {t['name']: c for t, c in zip(teams, counts)}
    p.close()
    session.close()
    return counts

Abfrageverbesserungen

Dieses Mal stelle ich eine Abfrage, die für jedes Team zählt, aber an erster Stelle

SELECT DISTINCT(team.id), team.name, COUNT(*)
FROM user JOIN team ON team.id = user.team_id
WHERE user.age < 50 GROUP BY team.id;

In diesem Fall müssen Sie die Abfrage nur einmal auslösen. Ändern Sie sie daher.

u = User.__table__.c
t = User.__table__.c
sel = select([func.distinct(t.id), t.name, func.count()])\
      .select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
      .where(u.age < 50).group_by(t.id)
counts = {r[1]: r[2] for r in database.session().execute(sel)}

Vergleich

sqlAlchemy ORM: elapsed time: 0.9522 [sec]
sqlAlchemy core: elapsed time: 0.7772 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0733 [sec]
--
sqlAlchemy core with fixed query: elapsed time: 0.2207 [sec]

Diesmal ist die Verwendung von Core etwas schneller als bei ORM. Es scheint, dass die Parallelisierung etwa zehnmal schneller ist. Als die Abfrage so verbessert wurde, dass sie einmal ausgelöst werden konnte, war sie etwas mehr als dreimal schneller als die ursprüngliche Abfrage, aber nicht weit von der parallelisierten Abfrage entfernt.

Bonus 2. Eine etwas kompliziertere Abfrage

Es war früher langsamer, Core zu verwenden, aber für jedes Team werden 100 Personen in absteigender Reihenfolge des Alters erfasst und die folgenden Daten werden zurückgegeben.

[{'id': 1, 'name': 'A', 'users': [{'id': 400, 'name': 'Kevin', 'age': 32}, {...}, ...]},
 {'id': 2, 'name': 'B', 'users': [...]},
  ...]

Ich werde den Code weglassen (siehe github für Details), aber das Ergebnis ist wie folgt.

sqlAlchemy ORM: elapsed time: 0.9782 [sec]
sqlAlchemy core: elapsed time: 0.8864 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0807 [sec]

In diesem Beispiel beträgt der Unterschied zwischen ORM und Kern etwa 0,1 Sekunden, sodass Sie sich möglicherweise keine Sorgen machen müssen. Da es sich jedoch um etwa 1 Million handelt, wird die Abfrage komplizierter und die Anzahl der Fälle nimmt zu. In diesem Fall erscheint es sinnvoll, sqlalchemy.core zu verwenden und zu parallelisieren.

abschließend

Wie oben erwähnt, habe ich es als INSERT und SELECT gesehen, aber zusammenfassend:

--Erstens, SQL, Abfrageoptimierung --Insert erhöht die Codemenge nicht so stark und die Geschwindigkeit ist fünf- oder sechsmal höher. Daher scheint es besser, Core zu verwenden.

war. Die Güte von ORM wird fast zunichte gemacht, und es ist die Rede davon, dass Sie keine andere Sprache mehr verwenden sollten, aber wenn Sie Python verwenden müssen, scheint es gut, es entsprechend der Datengröße und dem Inhalt der Anwendung richtig zu verwenden.

Verweise

Recommended Posts

[Python] Geben Sie Ihr Bestes, um SQL Alchemy zu beschleunigen
Numba als Python zu beschleunigen
So beschleunigen Sie Python-Berechnungen
[Python] Wie man PCA mit Python macht
Schwanzrekursion mit Python2 durchführen
Was tun mit PYTHON Release?
Beschleunigen Sie Python mit numba grob
Project Euler 4 Versuch zu beschleunigen
[DRF] Snippet zur Beschleunigung von PrimaryKeyRelatedField
Schreiben Sie Python nicht, wenn Sie es mit Python beschleunigen möchten
[Python] Drücken Sie Keras von TensorFlow und TensorFlow von c ++, um die Ausführung zu beschleunigen.
Unverzichtbar, wenn Sie Python verwenden! Wie man Numpy benutzt, um Berechnungen zu beschleunigen!
Wie man die schöne Suppeninstanziierung beschleunigt
So machen Sie R chartr () in Python
Was tun, wenn ipython und python mit unterschiedlichen Versionen starten?
Wie man einen Taschentest mit Python macht
[Python] Führen Sie UIKit3 in das Django-Projekt ein
Eintrag, bei dem Python-Anfänger ihr Bestes geben, um nach und nach 100 Sprachprozesse zu beenden
Rufen Sie Rust von Python an, um schneller zu werden! PyO3-Tutorial: Umschließen von Klassen Teil ➀
Organisieren Sie Python-Tools, um die anfängliche Bewegung von Datenanalyse-Wettbewerben zu beschleunigen
Rufen Sie Rust von Python an, um schneller zu werden! PyO3-Tutorial: Umschließen von Klassen Teil ➁
Python-Programm ist langsam! Ich möchte beschleunigen! In einem solchen Fall ...
Möchten Sie Python-Dekoratoren Typhinweise hinzufügen?
Ich möchte Dunnetts Test in Python machen
So führen Sie eine Mehrkern-Parallelverarbeitung mit Python durch
[Python] Was ich getan habe, um Unit Test zu machen
Wie man Scicit-Learn wie Conda Numpy beschleunigt
Minimale Implementierung von Union Find in Python
Geschwindigkeit: Element am Ende des Python-Arrays hinzufügen
Was tun, um eine Google-Tabelle in Python zu erhalten?
"Backport" zu Python 2
Das beste Werkzeug, um Ihre Privatsphäre vor Fotos zu schützen ...!
Vom Einrichten des Raspberry Pi bis zur Installation der Python-Umgebung
Versuch und Irrtum, um die Erzeugung von Wärmekarten zu beschleunigen
Memo zum Erstellen einer eigenen Box mit Peppers Python
Versuch und Irrtum, um Android-Screenshots zu beschleunigen
So richten Sie eine Python-Umgebung mit pyenv ein
[Einführung in die Udemy Python3 + -Anwendung] 66. Erstellen einer eigenen Ausnahme
So führen Sie eine Hash-Berechnung mit Salt in Python durch
Versuchen Sie, Ihr eigenes Intro-Quiz mit Python zu verbessern
Schritte zum Installieren des neuesten Python auf Ihrem Mac
[Python-Anfänger] Wenn __name__ == Bewegen Sie Ihre Hand, um '__ main__' zu verstehen.
Überprüfung des Atcoders ABC158 bis Frage E (Python)
[Road to Intermediate Python] Definieren Sie in Ihrer eigenen Klasse
Was tun, wenn "Name xxx nicht importiert werden kann" [Python]
Mehrstellige Multiplikationszeit bis zu 300 Millionen Stellen in Python
So richten Sie die Cython-Umgebung ein und kompilieren sie
Um das Äquivalent von Rubys ObjectSpace._id2ref in Python zu tun
Ich möchte am Ende etwas mit Python machen
Rufen Sie Rust von Python an, um schneller zu werden! PyO3-Tutorial: Umschließen einer einfachen Funktion Teil ➁