[PYTHON] RDS data via stepping stones in Pandas

This is the article on the 14th day of estie Advent Calendar 2019. Please see other articles by all means: laughing:

Introduction

Hello, you're doing engineers in estie.inc, it is marusho. At estie, with the slogan "The power of technology makes the world free and enjoyable."

--Office search service estie --Real estate data visualization service estiepro

We are trying to create new value by analyzing real estate data that is updated daily.

In order to analyze data and reflect the analysis results speedily, it is necessary to easily access the DB while ensuring the security configuration. At our company, pandas <-> DB is exchanged frequently, but once converted to a csv file or entered into the bastion server, it takes some time.

So This time, I will try the basic CRUD operation directly with pandas, using the DB data via the springboard.

environment

DB assumes a common environment where you are in a private subnet and can only be accessed via a bastion server. This time, I am running on EC2 / RDS (MySQL 5.7) on AWS.

By the way, the local environment is

Install the required packages

Since DB information is handled by Python, the standard ORM SQLAlchemy is used. It also installs the MySQL driver and the SSH tunnel to put SSH on the springboard.

$ pip install SQLAlchemy PyMySQL sshtunnel

SSH config In order to connect to ssh normally, I think that you often register Host in .ssh / config. Since the Host information written in the config is used in ssh tunnel this time as well, write the connection information of the stepping stone as follows.

~/.ssh/config


Host rds_bastion
    Hostname [Stepping stone IP]
    Port 22
    User [UserName]
    IdentityFile ~/.ssh/[KeyName]

Connect to RDS

First of all, import the module and write the information necessary for connecting the DB


import pandas as pd
import sqlalchemy as sa
from sshtunnel import SSHTunnelForwarder

DB_USER = 'test_user' #DB user name
DB_PASS =  'db_passward' #DB password
ENDPOINT = 'hogehoge.fugafuga.ap-northeast-1.rds.amazonaws.com' #RDS endpoint
PORT = 3306 #port
DB_NAME = 'test_db' #DB name
CHARSET = 'utf8' #Character code

Next, use SSH port forwarding to connect to the DB over the platform.


server = SSHTunnelForwarder(ssh_address_or_host = 'rds_bastion',
                            ssh_config_file = '~/.ssh/config',
                            remote_bind_address=(ENDPOINT,PORT))
server.start()

Let's close when closing the connection


server.close()

Get the SQLAlqhemy engine with ssh connected.


#Generate SQLAlchemy connection URL
URL = f"mysql+pymysql://{DB_USER}:{DB_PASS}@127.0.0.1:{server.local_bind_port}/{DB_NAME}?charset={CHARSET}"

#Get engine
engine = sa.create_engine(URL)

I will use this engine to manipulate data in Pandas

Go to Pandas

Well, this is the main subject. Let's try create, read, update, delete operations with pandas.

As a sample, create a members table in the DB name test_db

MySQL [test_db]> SELECT * FROM members;
+----+------------------+-----+
| id | name             | age |
+----+------------------+-----+
|  1 |Aoi Yukimura| 15  |
|  2 |Hinata Kurakami| 15  |
|  3 |Kaede Saito| 16  |
|  4 |Aoba here| 13  |
+----+------------------+-----+

Read: Read

First, let's read the members table as a DataFrame using pandas.read_sql

If you want to read all the data in the table, specify the table name.

df = pd.read_sql('members', engine)
id name age
0 1 Aoi Yukimura 15
1 2 Hinata Kurakami 15
2 3 Kaede Saito 16
3 4 Aoba here 13

You can read it neatly

You can also specify the index column and the column name you want to get in a list.

df= pd.read_sql('members', engine, index_col='id', columns=['name'])
id name
1 Aoi Yukimura
2 Hinata Kurakami
3 Kaede Saito
4 Aoba here

Of course, it is also possible to specify the record in the SQL query.

df= pd.read_sql('SELECT * FROM members WHERE id = 2', engine)
id name age
1 2 Hinata Kurakami 15

Create: Create table

You can use to_sql to create a new table from the DataFrame data. You can also specify the presence or absence of index (of DataFarame) and which one to import as index.


df = pd.read_sql('SELECT * FROM members WHERE age < 14', engine)
df.to_sql('jc_members', engine, index=False, index_label='id')
MySQL [test_db]> select * from jc_members;
+------+------------------+------+
| id   | name             | age  |
+------+------------------+------+
|    4 |Aoba here| 13   |
+------+------------------+------+

Insert: Insert / update record

This can also be done with to_sql, Note that the behavior is different with the ʻif_exist` option.

If you set ʻif_exist = append`, it will be added as a new record, and if the same record exists, an error will occur.


insert_df = pd.DataFrame({'id':['5'],'name' : ['Honoka Kurosaki'],'age':['14']})
insert_df.to_sql('members', engine, index=False, index_label='id', if_exists='append')
id name age
1 Aoi Yukimura 15
2 Hinata Kurakami 15
3 Kaede Saito 16
4 Aoba here 13
5 Honoka Kurosaki 14

It's the same behavior as INSERT. It has been added properly.

However, if ʻif_exist = replace`, ** delete all the data in the specified table ** and add a DataFrame.


insert_df = pd.DataFrame({'id':['5'],'name' : ['Honoka Kurosaki'],'age':['14']})
insert_df.to_sql('members', engine, index=False, index_label='id', if_exists='replace')
id name age
5 Honoka Kurosaki 14

Be careful because it is neither UPDATE nor UPSERT, and it behaves differently from REPLACE!

It seems that operations such as updating only specific records have not yet been implemented in to_sql. I will omit this time, but the method of Use SQLAlchemy upsert There seems to be a way to change the behavior of SQL with the method option of to_sql (https://stackoverflow.com/questions/34661318/replace-rows-in-mysql-database-table-with-pandas-dataframe) So I'll give it a try.

Delete: Delete record / table

When I perform a drop / delete operation with read_sql, there is no return and an error occurs. Actually, the delete operation will be executed on the DB side.

pd.read_sql('DROP TABLE members', engine)
MySQL [test_db]> SELECT * FROM members;
ERROR 1146 (42S02): Table 'test_db.members' doesn't exist

Since this is not the intended use, it is recommended to execute the query with sqlalchemy obediently when performing the delete operation.

engine.execute('DROP TABLE members')

in conclusion

It is attractive to be able to easily convert information from remote DBs into a DataFrame. The update method seems to be out of reach of the itchy part, so I would like to keep an eye on the future development of pandas.


estie is looking for a web engineer! Wantedly Please feel free to come visit us at the office!

Recommended Posts

RDS data via stepping stones in Pandas
Overwrite data in RDS with AWS Glue
Working with 3D data structures in pandas
Read pandas data
Get Amazon RDS (PostgreSQL) data using SQL with pandas
[Memo] Text matching in pandas data frame using flashtext
Learn Pandas in 10 minutes
Sampling in imbalanced data
UnicodeDecodeError in pandas read_csv
Data visualization with pandas
Data manipulation with Pandas!
Shuffle data with pandas
[Pandas] If the first row data is in the header in DataFrame
Ingenuity to handle data with Pandas in a memory-saving manner
Graph time series data in Python using pandas and matplotlib
Comparison of data frame handling in Python (pandas), R, Pig
How to get an overview of your data in Pandas
Data science companion in python, how to specify elements in pandas
The minimum methods to remember when aggregating data in Pandas