[PYTHON] MemSQL processing capacity verification (application 3)

By the way, this challenge (?) Is ...

This time, I would like to perform continuous processing at regular intervals on a MemSQL table in which data is inserted continuously, extract data that falls within the specified time range, and generate a table. .. Since it is a verification in a situation where time is pushed, I can not deny the feeling that it is a little loose, but on the other hand, MemSQL, which is executing several inserts per second, received another processing request in succession. In order to confirm what kind of movement it will be in case, I will try it for the time being.

Shorten the interval of batch standard processing

As a common task, don't do it all together! There is batch processing in the world of molds, but this time there is a solution called Equalum that can build real-time streaming that we have been verifying and introducing without a program, so we assumed cooperation with it ** " Which MemSQL is the environment to realize "without being called a transaction thief", "don't worry about time", "as you wish", and "creative query processing" ** for the existing original data system. Can you go about? I would like to see.

First, modify the old Python script into a process that runs regularly. (1) Create a new table by generating a table name with the date and time information of the startup timing (2) Extract necessary information from the continuously inserted original table by specifying the time range (3) Store the returned information in the new table created in (1) I will put in the process. (This time too, it may be full of Tsukkomi places because it will be an Ayer version due to power work, but that area is flatly forgiving ...)

By the way, the SQL used this time is as follows.

SELECT ts, Category, Product, Price, Units, Card, Payment, Prefecture 
FROM Qiita_Test
WHERE ts BETWEEN 'YYYY/mm/dd HH:MM:SS.000000' AND 'YYYY/mm/dd HH:MM:SS.000000'
SORDER BY ts;

The verification was carried out on the assumption that the series of processes set this time will be executed every 30 seconds and the data from that point to 30 seconds before will be extracted as conditions. (Actually, there is a slight deviation due to other influences ... (sweat)) The load of condition extraction & new table creation is applied by periodically interrupting the table that is continuously inserted. If we can verify what happens when we call it, we will proceed with the work by saying OK for the time being.


 coding: utf-8

# Execute tasks in Python at regular intervals (power version)
# Version 2.7


import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout

# Module to import
import schedule
import time
import pymysql.cursors

# Information used in SQL statements
SQL1 = "SELECT ts, Category, Product, Price, Units, Card, Payment, Prefecture FROM Qiita_Test "
SQL3 = " ORDER BY ts"

# Snapshot table column definition
DC0 = "id BIGINT AUTO_INCREMENT, PRIMARY KEY(id), O_ts DATETIME(6), "
DC1 = "Category VARCHAR(20), Product VARCHAR(20), Price INT, Units INT, "
DC2 = "Card VARCHAR(40), Payment INT, Prefecture VARCHAR(10)"

# Column definition to write in SQL
DL1 = "O_ts, Category, Product, "
DL2 = "Price, Units, Card, Payment, "
DL3 = "Prefecture"


# Set the process to run at regular intervals here


def job():

 # Time setting to go back from the snapshot timing
    Time_Int = 30
    Time_Adj = 0

    from datetime import datetime, date, time, timedelta

 #Get current date and time information
    now = datetime.now()

 print ("Start JOB:" + now.strftime ("% Y /% m /% d% H:% M:% S"))

 # Generate a table name for snapshot and generate SQL statement
    dt = 'Qiita_' + now.strftime('%Y%m%d_%H%M%S')
    Table_Make = "CREATE TABLE IF NOT EXISTS " + dt
    SQL_Head = "INSERT INTO " + dt
    
 #End information of time setting used in SQL (Adjust with Time_Adj if correction is required)
    pre_sec = now - timedelta(seconds = Time_Int + Time_Adj)
    from_dt = pre_sec.strftime("%Y/%m/%d %H:%M:%S")

 #Start information of time setting used in SQL (Adjust with Time_Adj and +-if correction is required)
    now_sec = now + timedelta(seconds = Time_Adj)
    to_dt = now_sec.strftime("%Y/%m/%d %H:%M:%S")

 # Generate SQL statement with time range
    SQL2 = "WHERE ts BETWEEN '" + from_dt + ".000000' AND '" + to_dt + ".000000'"
    SQL_Def = SQL1 + SQL2 + SQL3

 #Connect with MemSQL
    db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
                         port=3306,
                         user='qiita',
                         password='adminqiita',
                         db='Test',
                         charset='utf8',
                         cursorclass=pymysql.cursors.DictCursor)

    with db.cursor() as cursor:
        
        cursor.arraysize = 1000 

 #Create a new table for snapshots
        cursor.execute(Table_Make +"("+DC0+DC1+DC2+")" )
        db.commit()

 #Initialize working buffer
        Tmp_Data = []

 # Send SQL for query and commit
        cursor.execute(SQL_Def)                   
        db.commit()

 # Get query results
        rows = cursor.fetchall()

 # Get query results
        for Query_Data in rows:
            for item in Query_Data.values():
                Tmp_Data.append(item)

 #Reflect in each column
            Category = str(Tmp_Data[0])            
            Product = str(Tmp_Data[1])            
            Price = str(Tmp_Data[2])            
            O_ts = str(Tmp_Data[3])
            Units = str(Tmp_Data[4])
            Prefecture = str(Tmp_Data[5])            
            Payment = str(Tmp_Data[6])              
            Card = str(Tmp_Data[7])

 #Create SQL and store in snapshot table
            DV1 = O_ts + "','" + Category + "','" + Product + "','"
            DV2 = Price + "','" + Units + "','" + Card + "','" + Payment + "','"
            DV3 = Prefecture

            SQL_Data = SQL_Head + "("+DL1+DL2+DL3+") VALUES('"+DV1+DV2+DV3+"')"

 # Insert into table for snapshot
            cursor.execute(SQL_Data)
            db.commit()

            Tmp_Data = []

 #Disconnect database connection
    db.close()

 print ("End of JOB:" + datetime.now (). strftime ("% Y /% m /% d% H:% M:% S"))
    print ("+++++++++++++++++++++++++++++++")
    print


# Main part from here


def main():

 #Set variables to use
    Loop_Count = 3
    Count = 0

    Interval_Time = 30

 # Start time of the whole process
    from datetime import datetime
 print ("Program start date and time:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
    print

 # Every 10 minutes
    # schedule.every(10).minutes.do(job)
 # Every 2 hours
    # schedule.every(2).hours.do(job)
 # 10 o'clock every day
    # schedule.every().day.at("10:00").do(job)
 # every Monday
    # schedule.every().monday.do(job)

    schedule.every(Interval_Time).seconds.do(job)

 # Perform processing in an infinite loop
    while True:

        schedule.run_pending()

 #Mysterious spell ... ww
        time.sleep(Interval_Time)

 # Check the specified number of times
        if (Count >= Loop_Count):

            break

        else:

            Count += 1

    print
 print (str (Count) + "times: The specified number of jobs have been completed!")
    print
    from datetime import datetime
 print ("Program end date and time:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))

if __name__ == "__main__":
    main()

The result of running this script is as follows.

 Program start date and time: 2020/10/12 14:31:35

 Start of JOB: 2020/10/12 14:32:05
 End of JOB: 2020/10/12 14:32:06
+++++++++++++++++++++++++++++++

 Start of JOB: 2020/10/12 14:32:36
 End of JOB: 2020/10/12 14:32:37
+++++++++++++++++++++++++++++++

 Start of JOB: 2020/10/12 14:33:07
 End of JOB: 2020/10/12 14:33:09
+++++++++++++++++++++++++++++++


 3 times: The specified number of jobs have been completed!

 Program end date and time: 2020/10/12 14:33:39

If it works as expected, you should have 3 tables with start time information on MemSQL. First of all, I will check it with the familiar ** DBeaver ** every time.

image.png

It seems that it has been successfully completed together with the ** Qiita_Test ** table of the original table. Just in case, let's check the beginning and end of each table.

image.png

image.png

Due to the order in which the insertion script and this script were started, the initial data time is a little late, but for the time being, the extraction of the rise seems to be processed well.

Next, let's compare the end parts.

image.png

image.png The last data, TIMESTAMP (6), is 14:32:04:804694, so you can see that it fits as expected. Check the other two as well. image.png image.png I missed the data between the processing intervals, but the defined time range is covered. (I have to work out the logic ... (sweat)) image.png image.png It is safely within the range. The last table is ... image.png image.png As before, we couldn't collect 2-3 pieces of data that fit the border of time calculation in the processing process, but it seems that they are within the time range defined in the SQL statement. image.png image.png

Now, what to do with the table you took out ...

If I can control the time with a little more care, I think that the accuracy of data utilization in a more precise and short cycle will improve, but with the help of in-memory power in a hurry, I introduced a large process (for example, I introduced it before). Even if MemSQL is executed (such as replacing the continuous insertion part performed in this verification using Equalum), it is possible to freely extract data and perform the work of the next process with performance in the millisecond range. I hope you understand that it is extremely realistic.

Also, the big point is ...

In a short processing cycle, there is basically little data to handle, so it is possible to perform data extraction / aggregation processing in a simulation manner while freely assuming a situation that is extremely close to the present. Don all together in a midnight batch! Since it is not, all the data that can be handled up to that point is extracted and preprocessed from the data source that crosses the wall of the upstream silo using Equalum, and expanded in the memory space of MemSQL ( At this point, the information in the silo is spread flat in the memory space managed by the same DB), and it becomes easy to utilize the data in a silo-transparent manner.

Of course, even when you set an extraction query (including this time specification) that sets some conditions for a table that contains 100,000 lines of information expanded on the original side of this time on MemSQL, Since the performance in the millisecond range can be maintained stably, if the space for memory expansion is expected to be large, cluster the necessary IA servers to provide full in-memory data computing. It is possible to carry out.

The table extracted by batch processing this time can be read as an Excel table introduced earlier, so it can be used as a BI by utilizing the functions on Excel, and the high MySQL compatibility of MemSQL is achieved. It will be possible to immediately use it as a source material for other BI / AI and other solutions that have been utilized. ** Let's use MemSQL Vol.6: Derailment ** may be helpful.

next time···

Next time, I would like to try some external linkages using the table extracted this time. This work is not directly related to MemSQL, but I dig a little deeper into the "operational database" (I wonder if it is not relational ...) that MemSQL puts out as a message, and recent modern transactions Verify collaboration with system data computing,

** Existing data system >> Equalum >> MemSQL >> BI / AI / robot etc .. ** ** Relational <<<<<<<<<<<< >>>>>>>> Operational **

I would like to take a peek into the world of.

Acknowledgments

This verification is carried out using the official Free version (V6) of MemSQL.

** About the Free version of MemSQL ... **

We would like to thank MemSQL for providing this valuable opportunity, and if this content differs from the content published on MemSQL's official website, MemSQL's information will take precedence. Please understand that.

Recommended Posts

MemSQL processing capacity verification (application 2)
MemSQL processing capacity verification (Application 1)
MemSQL processing capacity verification (application 3)