[Python] Do your best to speed up SQLAlchemy

Introduction

What do you use to hit MySQL from python? SQLAlchemy, Django, [peewee](http://docs.peewee-orm.com/en/latest/ I think there are many people who use around). After using it in various ways, I settled on SQLAlchemy, but when it comes to data on the order of tens of millions or hundreds of millions, the processing is slow and very troublesome.

So, this time (aside from the story of not using sql for large data or python for high-speed processing), I will leave a memorandum divided into several items on how to speed up data processing using SQLAlchemy. Put.

Although SQLAlchemy is written as ~, it is a python speed-up TIPS mess when INSERTing and SELECTing data to DB using SQLAlchemy. Actually, it's better to separate the articles, but I'm glad that they are organized when I look back on myself, so I did this.

All the code used this time is on github, so please have a look there as well.

Execution environment

DB structure

The DB structure used in this test is as follows.

user table

id name age team_id created_at updated_at
1 John1 12 4 1486030539 1486030539
2 Kevin2 54 12 1486030539 1486030539
...

team table

id name created_at updated_at
1 A 1486030539 1486030539
2 B 1486030539 1486030539
...

user has a team foreign key.

INSERT

First, register the data in the team and user tables.

team_list = ['A', 'B',...,'Z']
user_list = [('John', 14, 'C'), ...]

Start from the place where you have the data. The number of teams is 26 from A to Z, and the number of users is 100,000.

0. Create SQLAlchemy Table objects one by one and Insert one by one

As expected, I will not do this from the beginning, but I will start from here for comparison.

class Base(object):

    def __iter__(self):
        return iter(self.__dict__.items())

    def dict(self):
        return self.__dict__

    @classmethod
    def query(cls):
        if not hasattr(cls, "_query"):
            cls._query = database.session().query_property()
        return cls._query
class User(Base):

    def __repr__(self):
        return '<User %r>' % (self.id)

    def __init__(self, name, age, team):
        self.name = name
        self.age = age
        self.team = team
        self.updated_at = time.time()
        self.created_at = time.time()

    @staticmethod
    def create_dict(name, age, team_id):
        return {'name': name, 'age': age, 'team_id': team_id,
                'updated_at': time.time(), 'created_at': time.time()}


signup_user = Table('user', metadata,
                    Column('id', BigInteger, nullable=False,
                           primary_key=True, autoincrement=True),
                    Column('name', Unicode(255), nullable=False),
                    Column('age', Integer, nullable=False),
                    Column('team_id', ForeignKey('team.id'), nullable=False),
                    Column('updated_at', BigInteger, nullable=False),
                    Column('created_at', BigInteger, nullable=False))

mapper(User, signup_user,
       properties={
           'id': signup_user.c.id,
           'name': signup_user.c.name,
           'age': signup_user.c.age,
           'team': relationship(Team),
           'updated_at': signup_user.c.updated_at,
           'created_at': signup_user.c.created_at
       })

User.__table__ = signup_user

Create a Table object like this. Preparation for the DB side session is like this

from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import scoped_session, sessionmaker

metadata = MetaData()
engine = create_engine(uri, encoding='utf-8', pool_recycle=3600)
session = scoped_session(sessionmaker(autocommit=False,
                                               autoflush=True,
                                               expire_on_commit=False,
                                               bind=_engine))

metadata.create_all(bind=_engine)

The part to register the team is omitted,

def insert_user(name, age, team):
    u = User(name, age, team)
    session.add(u)
    session.commit()

teams = Team.query().all()
# team_dict = {'A': <Team1>, 'B': <Team2>, ...}
team_dict = {t.name: t for t in teams}
[insert_user(d[0], d[1], team_dict[d[2]]) for d in data_list]

Create Users one by one like this, and add and commit.

1. Insert multiple items together.

Obviously 0. is inefficient, so use ʻadd_all`, which allows you to add multiple records at once.

users = [User(d[0], d[1], team_dict[d[2]]) for d in data_list]
database.session().add_all(users)
database.session().commit()

It was a lot cleaner as a code.

  1. bulk insert

There is bulk_save_objects in the ORM of SQLAlchemy.

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class UserTable(Base):
    __tablename__ = "user"
    id = Column(BigInteger,  nullable=False,
                primary_key=True, autoincrement=True)
    name = Column(Unicode(255), nullable=False)
    age = Column(Integer, nullable=False)
    team_id = Column(BigInteger, nullable=False)
    updated_at = Column(BigInteger, nullable=False)
    created_at = Column(BigInteger, nullable=False)

Create a Table object like

session.bulk_save_objects(
    [UserTable(name=d[0],
               age=d[1],
               team_id=team_dict[d[2]].id,
               updated_at = time.time(),
               created_at = time.time())
     for d in data_list], return_defaults=True)
session.commit()

Insert it like this. You can see that the handling of foreign keys etc. has changed.

3. Use sqlalchemy.core

ORM has various advantages such as being easy to use, shortening the code, and performing fine control such as rollback behind the scenes, but the overhead of query generation is large and it becomes a bottleneck when considering speeding up. Using sqlalchemy.core will increase the effort, but you can issue queries faster than using ORM. ..

users = [{'name':d[0], 'age': d[1], 'team_id': team_dict[d[2]]['id'],
          'updated_at': time.time(), 'created_at': time.time()} for d in data_list]
session.execute(User.__table__.insert(), users)
session.commit()

Comparison

Let's compare the speeds from 0 to 3 together.

SqlAlchemy ORM: elapsed time of insertion: 62.205 [sec]
SqlAlchemy ORM multi insert: elapsed time of insertion: 1.421 [sec]
SqlAlchemy ORM bulk insert: elapsed time of insertion: 1.170 [sec]
SqlAlchemy core bulk insert: elapsed time of insertion: 0.261 [sec]

sqlalchemy.core ... Overwhelming ... !! It is 5 or 6 times faster than bulk insert.

Even if you use ORM, it seems good to use bulk insert.

Bonus. No need to split bulk insert?

At the time of bulk insert, I felt that I heard somewhere that it is faster to divide when inserting a certain amount of large data, so I actually tried using sqlalchemy.core (1 million cases) ..

SqlAlchemy core bulk insert (10): elapsed time of insertion: 51.066 [sec]
SqlAlchemy core bulk insert (20): elapsed time of insertion: 37.913 [sec]
SqlAlchemy core bulk insert (50): elapsed time of insertion: 27.323 [sec]
SqlAlchemy core bulk insert (100): elapsed time of insertion: 23.954 [sec]
SqlAlchemy core bulk insert (150): elapsed time of insertion: 22.607 [sec]
SqlAlchemy core bulk insert (200): elapsed time of insertion: 21.853 [sec]
SqlAlchemy core bulk insert (500): elapsed time of insertion: 20.139 [sec]
SqlAlchemy core bulk insert (750): elapsed time of insertion: 19.399 [sec]
SqlAlchemy core bulk insert (1000): elapsed time of insertion: 19.362 [sec]
SqlAlchemy core bulk insert (5000): elapsed time of insertion: 19.493 [sec]
SqlAlchemy core bulk insert (10000): elapsed time of insertion: 19.387 [sec]
SqlAlchemy core bulk insert (20000): elapsed time of insertion: 18.983 [sec]
SqlAlchemy core bulk insert (50000): elapsed time of insertion: 19.641 [sec]
SqlAlchemy core bulk insert (100000): elapsed time of insertion: 19.022 [sec]
SqlAlchemy core bulk insert (500000): elapsed time of insertion: 19.837 [sec]

Well, it seems that it was because of my mind ...

SELECT edition

Use the team and user data registered earlier. The number of teams is 26 from A to Z, and the number of users is 1 million.

First, simply, in order of age, [{'id': 10,'name':'John','age': 34,'team':'K'}, {...}, ...] Create a process that returns limit (100 this time) dictionary list like.

0. Tuning on the MySQL side (Index etc.)

Obviously, let's tune the MySQL side first. In this example, just pasting the index in user.age will make the processing about 10 times faster. Since there are already various articles on tuning, I will omit it this time.

1. Use ORM

users = User.query().order_by(desc(User.age)).limit(limit).all()
result = [{'id': u.id, 'name': u.name, 'age': u.age, 'team': u.team.name}
          for u in users]

short! It's good. It's nice to have a short code, especially if you're straddling a table connected by a foreign key.

2. Use sqlalchemy.core

Use the select function of sqlalchemy.

from sqlalchemy import select, desc, and_, func

u = User.__table__.c
t = Team.__table__.c
sel = select([u.id, u.name, u.age, t.name])\
       .select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
       .order_by(desc(u.age)).limit(limit)
result = [{'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}
          for r in session.execute(sel)]

It has become quite long.

3. Combined with multiprocessing

In this example, it is not directly related to sql processing, but if the number of data increases, the processing speed will improve dramatically if parallel processing is performed. For how to use it, I wrote an article before, so please have a look there.

from multiprocessing import Pool
import multiprocessing as multi


def get_user(r):
    return {'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}


def select_user_multi():
    u = User.__table__.c
    t = Team.__table__.c
    sel = select([u.id, u.name, u.age, t.name])\
          .select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
          .order_by(desc(u.age)).limit(limit)
    p = Pool(multi.cpu_count())
    result = p.map(get_user, session.execute(sel))
    p.close()
    return result

Comparison

sqlAlchemy ORM: elapsed time: 0.3291 [sec]
sqlAlchemy core: elapsed time: 0.5837 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0096 [sec]

That ... it's late. It may be because the query was simple. The result of biting multiprocessing is much faster.

Bonus 1. count

Next, let's count the number of users for each team. Complicate the query a bit and count the number of users under the age of 50 belonging to each team and create data like {'A': 1400,'B': 2122, ....} We will process it.

Please refer to here (Notes on handling COUNT () in InnoDB) for speeding up the counting process itself. Become

ORM

def select_teams_orm():
    return Team.query().all()

teams = select_teams_orm()
counts = {tm.name: User.query()\
              .filter(and_(User.team == tm, User.age < 50)).count()\
              for tm in teams}

As short as ever!

sqlalchemy.core

def select_teams_core():
    t = Team.__table__.c
    sel = select([t.id, t.name]).select_from(Team.__table__)
    res = session.execute(sel)
    result = [{'id': r[0], 'name': r[1]} for r in res]
    return result

teams = select_teams_core()
sess = lambda sel: session.execute(sel)
u = User.__table__.c
counts = {tm['name']: sess(
        select([func.count()]).select_from(User.__table__)\
        .where(and_(u.team_id == tm['id'], u.age < 50))\
).scalar() for tm in teams}

Hard to understand! It is written as above to finish with one loop, but when disassembled

def create_query(team_id): #Query creation Narrow down by team id and user age
    u = User.__table__.c
    return select([func.count()]).select_from(User.__table__)\
             .where(add_(u.team_id == team_id, u.age < 50))

queries = [create_query(tm['id']) for tm in teams] #Create queries for each team
counts = [session.execute(q) for q in queries] #Issue query
result = [{tm['name']: c.scalar()} for tm,c in zip(teams,counts)] # {'A': count, ...}Dictionary creation

It's like that. Also, the amount of code has increased considerably.

multiprocessing

Next, let's parallelize from the place where the SELECT query is thrown. There are two points to note: ** Joblib's Parallelize cannot be used, and session created with scoped_session cannot be used in parallel processing **.

session = sessionmaker(autocommit=False, # scoped_session is no good
                       autoflush=True,
                       expire_on_commit=False,
                       bind=_engine)

def count_user(team):
    u = User.__table__.c
    sel = select([func.count()]).select_from(User.__table__)\
          .where(and_(u.team_id == team['id'], u.age < 50))
    result = session.execute(sel).scalar()
    return result


def count_user_multi():
    teams = select_teams_core()
    p = Pool(multi.cpu_count())
    counts = p.map(count_user, teams)
    counts = {t['name']: c for t, c in zip(teams, counts)}
    p.close()
    session.close()
    return counts

Query improvements

This time, I'm throwing a query that counts for each team, but in the first place

SELECT DISTINCT(team.id), team.name, COUNT(*)
FROM user JOIN team ON team.id = user.team_id
WHERE user.age < 50 GROUP BY team.id;

If so, you only need to throw the query once, so modify it.

u = User.__table__.c
t = User.__table__.c
sel = select([func.distinct(t.id), t.name, func.count()])\
      .select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
      .where(u.age < 50).group_by(t.id)
counts = {r[1]: r[2] for r in database.session().execute(sel)}

Comparison

sqlAlchemy ORM: elapsed time: 0.9522 [sec]
sqlAlchemy core: elapsed time: 0.7772 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0733 [sec]
--
sqlAlchemy core with fixed query: elapsed time: 0.2207 [sec]

This time using core is a bit faster than ORM. It seems that parallelization is about 10 times faster. Also, when I improved the query so that it only had to be thrown once, it was a little over three times faster than the original query, but it was not far from the parallelized one.

Bonus 2. A little more complicated query

It was slower to use core earlier, but for each team, 100 people will be acquired in descending order of age, and the following data will be returned.

[{'id': 1, 'name': 'A', 'users': [{'id': 400, 'name': 'Kevin', 'age': 32}, {...}, ...]},
 {'id': 2, 'name': 'B', 'users': [...]},
  ...]

I'll omit the code (see github for details), but the result is as follows.

sqlAlchemy ORM: elapsed time: 0.9782 [sec]
sqlAlchemy core: elapsed time: 0.8864 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0807 [sec]

In the case of this example, the difference between ORM and core is about 0.1 sec, so you may not need to worry about it, but since it is about 1 million, the query becomes more complicated and the number of cases increases. In that case, it seems useful to use sqlalchemy.core and parallelize it.

in conclusion

As mentioned above, I have seen it as INSERT and SELECT, but in summary,

--First, sql, query tuning --Insert does not increase the amount of code so much, and the speed is 5 or 6 times faster, so it seems better to use core. --bulk insert does not get faster with split inserts --Select can be speeded up by nearly 10% when moving from ORM to core, but the amount of code increases. --By parallelizing Select, it is possible to increase the speed considerably (this time, the result was close to 10 times, but it seems that this area will change greatly depending on the machine performance).

was. The goodness of ORM is almost killed, and there is talk that you should use another language anymore, but when you have to use python, it seems good to use it properly according to the data size and the contents of the application.

References

--Make SQL faster (http://www.geocities.jp/mickindex/database/db_optimize.html) --Speed up query generation with SQLAlchemy ORM (http://qiita.com/methane/items/342264e6b564f06abfe7) --Python sqlalchemy ~ Pass the sql statement with embedded variables to session.execute () of sqlalchemy and try to execute dynamic SQL (http://qiita.com/HirofumiYashima/items/9f459950ab9c878f7f21) --Modified SQL Alchemy (http://qiita.com/haminiku/items/895b5d06befc9c8c8c51) --Han Computer Road (http://nippondanji.blogspot.jp/2010/03/innodbcount.html)

Recommended Posts

[Python] Do your best to speed up SQLAlchemy
Numba to speed up as Python
How to speed up Python calculations
Wrap C/C ++ with SWIG to speed up Python processing. [Overview]
[Python] How to do PCA in Python
To do tail recursion with Python2
What to do with PYTHON release?
Roughly speed up Python with numba
Project Euler 4 Attempt to speed up
[DRF] Snippet to speed up PrimaryKeyRelatedField
Don't write Python if you want to speed it up with Python
[Python] Hit Keras from TensorFlow and TensorFlow from c ++ to speed up execution
Indispensable if you use Python! How to use Numpy to speed up operations!
How to speed up instantiation of BeautifulSoup
How to do R chartr () in Python
What to do if ipython and python start up with different versions
How to do portmanteau test with python
[Python] Introduce UIKit3 to your Django project
Entry where Python beginners do their best to knock 100 language processing little by little
Call Rust from Python to speed it up! PyO3 Tutorial: Wrapping Classes Part ➀
Organize Python tools to speed up the initial movement of data analysis competitions
Call Rust from Python to speed it up! PyO3 Tutorial: Wrapping Classes Part ➁
Python program is slow! I want to speed up! In such a case ...
Want to add type hints to your Python decorator?
I want to do Dunnett's test in Python
How to do multi-core parallel processing with python
[Python] What I did to do Unit Test
How to speed up scikit-learn like conda Numpy
Minimal implementation to do Union Find in Python
Speed: Add element to end of Python array
What to do to get google spreadsheet in python
What to do when Python starts up in Anaconda does not come out unexpectedly
"Backport" to python 2
The best tool to protect your privacy from your photos ...!
From setting up Raspberry Pi to installing Python environment
Trial and error to speed up heat map generation
Memo to create your own Box with Pepper's Python
Trial and error to speed up Android screen captures
How to set up a Python environment using pyenv
[Introduction to Udemy Python 3 + Application] 66. Creating your own exceptions
How to do hash calculation with salt in Python
Try to improve your own intro quiz in Python
Steps to install the latest Python on your Mac
[Python beginner] If __name__ == Move your hand to understand'__main__'.
Review of atcoder ABC158, up to question E (Python)
[Road to intermediate Python] Define in in your own class
What to do when "cannot import name xxx" [Python]
Multi-digit multiplication time up to 300 million digits in python
How to set up and compile your Cython environment
To do the equivalent of Ruby's ObjectSpace._id2ref in Python
I want to do something in Python when I finish
What to do when you can't bind CaboCha to Python
Call Rust from Python to speed it up! PyO3 Tutorial: Wrapping a Simple Function Part ➁