What do you use to hit MySQL from python? SQLAlchemy, Django, [peewee](http://docs.peewee-orm.com/en/latest/ I think there are many people who use around). After using it in various ways, I settled on SQLAlchemy, but when it comes to data on the order of tens of millions or hundreds of millions, the processing is slow and very troublesome.
So, this time (aside from the story of not using sql for large data or python for high-speed processing), I will leave a memorandum divided into several items on how to speed up data processing using SQLAlchemy. Put.
Although SQLAlchemy is written as ~, it is a python speed-up TIPS mess when INSERTing and SELECTing data to DB using SQLAlchemy. Actually, it's better to separate the articles, but I'm glad that they are organized when I look back on myself, so I did this.
All the code used this time is on github, so please have a look there as well.
The DB structure used in this test is as follows.
user table
id | name | age | team_id | created_at | updated_at |
---|---|---|---|---|---|
1 | John1 | 12 | 4 | 1486030539 | 1486030539 |
2 | Kevin2 | 54 | 12 | 1486030539 | 1486030539 |
... |
team table
id | name | created_at | updated_at |
---|---|---|---|
1 | A | 1486030539 | 1486030539 |
2 | B | 1486030539 | 1486030539 |
... |
user has a team foreign key.
First, register the data in the team and user tables.
team_list = ['A', 'B',...,'Z']
user_list = [('John', 14, 'C'), ...]
Start from the place where you have the data. The number of teams is 26 from A to Z, and the number of users is 100,000.
As expected, I will not do this from the beginning, but I will start from here for comparison.
class Base(object):
def __iter__(self):
return iter(self.__dict__.items())
def dict(self):
return self.__dict__
@classmethod
def query(cls):
if not hasattr(cls, "_query"):
cls._query = database.session().query_property()
return cls._query
class User(Base):
def __repr__(self):
return '<User %r>' % (self.id)
def __init__(self, name, age, team):
self.name = name
self.age = age
self.team = team
self.updated_at = time.time()
self.created_at = time.time()
@staticmethod
def create_dict(name, age, team_id):
return {'name': name, 'age': age, 'team_id': team_id,
'updated_at': time.time(), 'created_at': time.time()}
signup_user = Table('user', metadata,
Column('id', BigInteger, nullable=False,
primary_key=True, autoincrement=True),
Column('name', Unicode(255), nullable=False),
Column('age', Integer, nullable=False),
Column('team_id', ForeignKey('team.id'), nullable=False),
Column('updated_at', BigInteger, nullable=False),
Column('created_at', BigInteger, nullable=False))
mapper(User, signup_user,
properties={
'id': signup_user.c.id,
'name': signup_user.c.name,
'age': signup_user.c.age,
'team': relationship(Team),
'updated_at': signup_user.c.updated_at,
'created_at': signup_user.c.created_at
})
User.__table__ = signup_user
Create a Table object like this. Preparation for the DB side session is like this
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import scoped_session, sessionmaker
metadata = MetaData()
engine = create_engine(uri, encoding='utf-8', pool_recycle=3600)
session = scoped_session(sessionmaker(autocommit=False,
autoflush=True,
expire_on_commit=False,
bind=_engine))
metadata.create_all(bind=_engine)
The part to register the team is omitted,
def insert_user(name, age, team):
u = User(name, age, team)
session.add(u)
session.commit()
teams = Team.query().all()
# team_dict = {'A': <Team1>, 'B': <Team2>, ...}
team_dict = {t.name: t for t in teams}
[insert_user(d[0], d[1], team_dict[d[2]]) for d in data_list]
Create Users one by one like this, and add and commit.
Obviously 0. is inefficient, so use ʻadd_all`, which allows you to add multiple records at once.
users = [User(d[0], d[1], team_dict[d[2]]) for d in data_list]
database.session().add_all(users)
database.session().commit()
It was a lot cleaner as a code.
There is bulk_save_objects in the ORM of SQLAlchemy.
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class UserTable(Base):
__tablename__ = "user"
id = Column(BigInteger, nullable=False,
primary_key=True, autoincrement=True)
name = Column(Unicode(255), nullable=False)
age = Column(Integer, nullable=False)
team_id = Column(BigInteger, nullable=False)
updated_at = Column(BigInteger, nullable=False)
created_at = Column(BigInteger, nullable=False)
Create a Table object like
session.bulk_save_objects(
[UserTable(name=d[0],
age=d[1],
team_id=team_dict[d[2]].id,
updated_at = time.time(),
created_at = time.time())
for d in data_list], return_defaults=True)
session.commit()
Insert it like this. You can see that the handling of foreign keys etc. has changed.
ORM has various advantages such as being easy to use, shortening the code, and performing fine control such as rollback behind the scenes, but the overhead of query generation is large and it becomes a bottleneck when considering speeding up. Using sqlalchemy.core will increase the effort, but you can issue queries faster than using ORM. ..
users = [{'name':d[0], 'age': d[1], 'team_id': team_dict[d[2]]['id'],
'updated_at': time.time(), 'created_at': time.time()} for d in data_list]
session.execute(User.__table__.insert(), users)
session.commit()
Let's compare the speeds from 0 to 3 together.
SqlAlchemy ORM: elapsed time of insertion: 62.205 [sec]
SqlAlchemy ORM multi insert: elapsed time of insertion: 1.421 [sec]
SqlAlchemy ORM bulk insert: elapsed time of insertion: 1.170 [sec]
SqlAlchemy core bulk insert: elapsed time of insertion: 0.261 [sec]
sqlalchemy.core ... Overwhelming ... !! It is 5 or 6 times faster than bulk insert.
Even if you use ORM, it seems good to use bulk insert.
At the time of bulk insert, I felt that I heard somewhere that it is faster to divide when inserting a certain amount of large data, so I actually tried using sqlalchemy.core (1 million cases) ..
SqlAlchemy core bulk insert (10): elapsed time of insertion: 51.066 [sec]
SqlAlchemy core bulk insert (20): elapsed time of insertion: 37.913 [sec]
SqlAlchemy core bulk insert (50): elapsed time of insertion: 27.323 [sec]
SqlAlchemy core bulk insert (100): elapsed time of insertion: 23.954 [sec]
SqlAlchemy core bulk insert (150): elapsed time of insertion: 22.607 [sec]
SqlAlchemy core bulk insert (200): elapsed time of insertion: 21.853 [sec]
SqlAlchemy core bulk insert (500): elapsed time of insertion: 20.139 [sec]
SqlAlchemy core bulk insert (750): elapsed time of insertion: 19.399 [sec]
SqlAlchemy core bulk insert (1000): elapsed time of insertion: 19.362 [sec]
SqlAlchemy core bulk insert (5000): elapsed time of insertion: 19.493 [sec]
SqlAlchemy core bulk insert (10000): elapsed time of insertion: 19.387 [sec]
SqlAlchemy core bulk insert (20000): elapsed time of insertion: 18.983 [sec]
SqlAlchemy core bulk insert (50000): elapsed time of insertion: 19.641 [sec]
SqlAlchemy core bulk insert (100000): elapsed time of insertion: 19.022 [sec]
SqlAlchemy core bulk insert (500000): elapsed time of insertion: 19.837 [sec]
Well, it seems that it was because of my mind ...
Use the team and user data registered earlier. The number of teams is 26 from A to Z, and the number of users is 1 million.
First, simply, in order of age, [{'id': 10,'name':'John','age': 34,'team':'K'}, {...}, ...]
Create a process that returns limit (100 this time) dictionary list like.
Obviously, let's tune the MySQL side first. In this example, just pasting the index in user.age will make the processing about 10 times faster. Since there are already various articles on tuning, I will omit it this time.
users = User.query().order_by(desc(User.age)).limit(limit).all()
result = [{'id': u.id, 'name': u.name, 'age': u.age, 'team': u.team.name}
for u in users]
short! It's good. It's nice to have a short code, especially if you're straddling a table connected by a foreign key.
Use the select function of sqlalchemy.
from sqlalchemy import select, desc, and_, func
u = User.__table__.c
t = Team.__table__.c
sel = select([u.id, u.name, u.age, t.name])\
.select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
.order_by(desc(u.age)).limit(limit)
result = [{'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}
for r in session.execute(sel)]
It has become quite long.
In this example, it is not directly related to sql processing, but if the number of data increases, the processing speed will improve dramatically if parallel processing is performed. For how to use it, I wrote an article before, so please have a look there.
from multiprocessing import Pool
import multiprocessing as multi
def get_user(r):
return {'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}
def select_user_multi():
u = User.__table__.c
t = Team.__table__.c
sel = select([u.id, u.name, u.age, t.name])\
.select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
.order_by(desc(u.age)).limit(limit)
p = Pool(multi.cpu_count())
result = p.map(get_user, session.execute(sel))
p.close()
return result
sqlAlchemy ORM: elapsed time: 0.3291 [sec]
sqlAlchemy core: elapsed time: 0.5837 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0096 [sec]
That ... it's late. It may be because the query was simple. The result of biting multiprocessing is much faster.
Next, let's count the number of users for each team. Complicate the query a bit and count the number of users under the age of 50 belonging to each team and create data like {'A': 1400,'B': 2122, ....}
We will process it.
Please refer to here (Notes on handling COUNT () in InnoDB) for speeding up the counting process itself. Become
ORM
def select_teams_orm():
return Team.query().all()
teams = select_teams_orm()
counts = {tm.name: User.query()\
.filter(and_(User.team == tm, User.age < 50)).count()\
for tm in teams}
As short as ever!
sqlalchemy.core
def select_teams_core():
t = Team.__table__.c
sel = select([t.id, t.name]).select_from(Team.__table__)
res = session.execute(sel)
result = [{'id': r[0], 'name': r[1]} for r in res]
return result
teams = select_teams_core()
sess = lambda sel: session.execute(sel)
u = User.__table__.c
counts = {tm['name']: sess(
select([func.count()]).select_from(User.__table__)\
.where(and_(u.team_id == tm['id'], u.age < 50))\
).scalar() for tm in teams}
Hard to understand! It is written as above to finish with one loop, but when disassembled
def create_query(team_id): #Query creation Narrow down by team id and user age
u = User.__table__.c
return select([func.count()]).select_from(User.__table__)\
.where(add_(u.team_id == team_id, u.age < 50))
queries = [create_query(tm['id']) for tm in teams] #Create queries for each team
counts = [session.execute(q) for q in queries] #Issue query
result = [{tm['name']: c.scalar()} for tm,c in zip(teams,counts)] # {'A': count, ...}Dictionary creation
It's like that. Also, the amount of code has increased considerably.
multiprocessing
Next, let's parallelize from the place where the SELECT query is thrown.
There are two points to note: ** Joblib's Parallelize cannot be used, and session created with scoped_session
cannot be used in parallel processing **.
session = sessionmaker(autocommit=False, # scoped_session is no good
autoflush=True,
expire_on_commit=False,
bind=_engine)
def count_user(team):
u = User.__table__.c
sel = select([func.count()]).select_from(User.__table__)\
.where(and_(u.team_id == team['id'], u.age < 50))
result = session.execute(sel).scalar()
return result
def count_user_multi():
teams = select_teams_core()
p = Pool(multi.cpu_count())
counts = p.map(count_user, teams)
counts = {t['name']: c for t, c in zip(teams, counts)}
p.close()
session.close()
return counts
This time, I'm throwing a query that counts for each team, but in the first place
SELECT DISTINCT(team.id), team.name, COUNT(*)
FROM user JOIN team ON team.id = user.team_id
WHERE user.age < 50 GROUP BY team.id;
If so, you only need to throw the query once, so modify it.
u = User.__table__.c
t = User.__table__.c
sel = select([func.distinct(t.id), t.name, func.count()])\
.select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
.where(u.age < 50).group_by(t.id)
counts = {r[1]: r[2] for r in database.session().execute(sel)}
sqlAlchemy ORM: elapsed time: 0.9522 [sec]
sqlAlchemy core: elapsed time: 0.7772 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0733 [sec]
--
sqlAlchemy core with fixed query: elapsed time: 0.2207 [sec]
This time using core is a bit faster than ORM. It seems that parallelization is about 10 times faster. Also, when I improved the query so that it only had to be thrown once, it was a little over three times faster than the original query, but it was not far from the parallelized one.
It was slower to use core earlier, but for each team, 100 people will be acquired in descending order of age, and the following data will be returned.
[{'id': 1, 'name': 'A', 'users': [{'id': 400, 'name': 'Kevin', 'age': 32}, {...}, ...]},
{'id': 2, 'name': 'B', 'users': [...]},
...]
I'll omit the code (see github for details), but the result is as follows.
sqlAlchemy ORM: elapsed time: 0.9782 [sec]
sqlAlchemy core: elapsed time: 0.8864 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0807 [sec]
In the case of this example, the difference between ORM and core is about 0.1 sec, so you may not need to worry about it, but since it is about 1 million, the query becomes more complicated and the number of cases increases. In that case, it seems useful to use sqlalchemy.core and parallelize it.
As mentioned above, I have seen it as INSERT and SELECT, but in summary,
--First, sql, query tuning --Insert does not increase the amount of code so much, and the speed is 5 or 6 times faster, so it seems better to use core. --bulk insert does not get faster with split inserts --Select can be speeded up by nearly 10% when moving from ORM to core, but the amount of code increases. --By parallelizing Select, it is possible to increase the speed considerably (this time, the result was close to 10 times, but it seems that this area will change greatly depending on the machine performance).
was. The goodness of ORM is almost killed, and there is talk that you should use another language anymore, but when you have to use python, it seems good to use it properly according to the data size and the contents of the application.
--Make SQL faster (http://www.geocities.jp/mickindex/database/db_optimize.html) --Speed up query generation with SQLAlchemy ORM (http://qiita.com/methane/items/342264e6b564f06abfe7) --Python sqlalchemy ~ Pass the sql statement with embedded variables to session.execute () of sqlalchemy and try to execute dynamic SQL (http://qiita.com/HirofumiYashima/items/9f459950ab9c878f7f21) --Modified SQL Alchemy (http://qiita.com/haminiku/items/895b5d06befc9c8c8c51) --Han Computer Road (http://nippondanji.blogspot.jp/2010/03/innodbcount.html)
Recommended Posts